מדריך מקיף על Elastic Stack - חלק 2 - logstash

זה החלק השני במדריך על Elastic Stack. בחלק הראשון עסקנו ב-filebeat. בחלק הזה נעסוק בחלק השני בשרשרת הכלים - logstash:


Logstash

הכלי הזה מיועד לעשות parsing למידע שהוא מקבל ולשים אותו ב-elasticsearch. ניתן בעזרתו גם להעביר מידע מ-DB שהוא SQL ל-elasticsearch אבל על זה נכתוב בפעם אחרת (אי"ה).

קובץ הקונפיגורציה נמצא בתיקייה /etc/logstash וקוראים לו logstash.yml. אבל במקרה שלנו אין לנו צורך לעשות שום שינוי מהגדרות ה-default. במקום זה אנחנו נגדיר ל-Logstash מה שנקרא pipeline שזה כמו תוכנית עבודה שאומרת מאיפה לקבל את המידע, מה לעשות איתו ואיפה לשים אותו. 

אז ניצור קובץ חדש בתיקייה /etc/logstash ונקרא לו first-pipeline.conf ונכניס לו את הטקסט הבא:

input {

    beats {

        port => "5044"

    }

}

#filter {

#}

output {

# output to the console

    stdout { codec => rubydebug }

}

הקובץ בנוי משלושה חלקים:

·         input – בחלק הזה מגדירים מאיפה מגיע המידע. בתור התחלה הגדרנו שהמידע מגיע ב-filebeat מ-port 5044 שכמו שאתם זוכרים זה ה-port שהגדרנו ל-filebeat לשלוח אליו את המידע (מוסבר בחלק הראשון).

·         filter – בחלק הזה מגדירים פעולות שאנחנו מבצעים על המידע. זה לא שדה חובה ולכן בתור התחלה הוא ב-comment, אבל נשתמש בו עוד מעט כי בו אנחנו מגדירים את ה-parsing

·         output – בחלק הזה מגדירים לאן ישלח המידע לאחר כל הפעולות שבוצעו ב-filter. בתור התחלה הגדרנו שכל המידע ישלח פשוט ל-console וכך נוכל לראות בקלות מה בדיוק בוצע במידע. בהמשך נשנה את זה כדי לשלוח את המידע ל-elasticsearch.

לצורך התרגול ניצור קובץ לוג בתיקייה /my_logs בשם myproject.log ונכניס לו את הטקסט הבא:

2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.

2020/02/24 22:25:20.990 ERROR: Transaction Name: SEND_MESSAGE from device id 445566 ended with error HOST_NOT_AVAILABLE Request time: 4 ms, DB time: 4 ms.

נריץ את logstash ע"י הפקודה:

/usr/share/logstash/bin/logstash -f /etc/logstash/first-pipeline.conf --config.reload.automatic

הפרמטר config.reload.automatic מגדיר ל-logstash להתעדכן בצורה אוטומטית כל פעם ש- first-pipeline.conf משתנה כך שלא נצטרך כל פעם לעצור ולהריץ מחדש את logstash.

לאחר מכן נריץ את filebeat כדי שיתחיל לשלוח את המידע ל-Logstash ע"י הפקודה:

/usr/share/filebeat/bin/filebeat -e -c /etc/filebeat/filebeat.yml -d "publish"

אם הכל עובד כשורה אנחנו נראה ש-logstash ידפיס את המידע הבא (הורדתי כמה שדות שמיוחדים למכונה שלי וכנראה שגם אצלכם יהיו כמו שדות ייחודיים לכם. השארתי את השדות החשובים):

{
       "message" => "2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.",
    "@timestamp" => 2020-05-21T10:07:18.082Z,
         "input" => {
        "type" => "log"
    },
         "agent" => {
                "type" => "filebeat",
             "version" => "7.6.2"
    },
           "log" => {
        "offset" => 0,
          "file" => {
            "path" => "/my_logs/myproject.log"
        }
    },
      "@version" => "1",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
           "ecs" => {
        "version" => "1.4.0"
    }
}
{
       "message" => "2020/02/24 22:25:20.990 ERROR: Transaction Name: SEND_MESSAGE from device id 445566 ended with error HOST_NOT_AVAILABLE Request time: 4 ms, DB time: 4 ms.",
    "@timestamp" => 2020-05-21T10:07:18.082Z,
         "input" => {
        "type" => "log"
    },
         "agent" => {
                "type" => "filebeat",
             "version" => "7.6.2"
    },
           "log" => {
        "offset" => 142,
          "file" => {
            "path" => "/my_logs/myproject.log"
        }
    },
      "@version" => "1",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
           "ecs" => {
        "version" => "1.4.0"
    }
}


קיבלנו שני אובייקטים של json אחד עבור כל שורה מהלוג. כל השורה מהלוג נמצאת בתוך השדה message. בנוסף יש שדה של timestamp שכרגע מייצג את הזמן שבו ה-logstash פעל (נשנה זאת בהמשך). ויש עוד כמה נתונים כלליים.

מה שאנחנו רוצים שיהיה בסופו של דבר, זה שכל מידע שאנחנו רוצים לחלץ מהשורה של הלוג יקבל שדה בפני עצמו. לצורך כך נתחיל להשתמש בחלק ה-filter.

ישנם הרבה סוגי פילטרים שניתן להשתמש בהם, אך ללא ספק השימושי מכולם הוא grok. הוא נועד לחלץ נתונים מתוך מידע שהוא Unstructured.

אז בתור התחלה נסביר את ה-syntax של grok. כל שדה שאנחנו רוצים לחלץ נגדיר בצורה הבאה:

%{SYNTAX:SEMANTIC}

כאשר SYNTAX זוהי אחת התבניות המוגדרות של grok (התבניות מוגדרות כאן). ו-SEMANTIC זה שם השדה שאנחנו רוצים ליצור.

נתחיל עם הסבר על שורת לוג פשוטה. נניח שיש לנו שורת לוג כזו:

Transaction Name: GET_MESSAGE

ואנחנו רוצים לחלץ את GET_MESSAGE לתוך שדה שנקרא transaction. נכתוב את הדבר הבא ב-filter:

filter {
    grok {
            match => { "message" => ".*: %{USERNAME:transaction}.*"}
    }
}

הסבר: התוים ".*" פירושם – כל תו שהוא אפס פעמים ויותר (בדומה ל-Regex). התו ":" פשוט אומר ל-grok למצוא נקודותיים. עד כאן בעצם מצאנו את הנקודותיים הראשונים בשורה. לאחר מכן יש רווח " " ואז מגיע חילוץ המידע. 

התבנית USERNAME כפי שמוגדרת ב-grok כאן היא:

USERNAME [a-zA-Z0-9._-]+

וזו תבנית טובה לחילוץ המידע שלנו שכולל אותיות וקו תחתון בלבד.

כדי לשחק עם grok עד שמגיעים לפילטור המדוייק כדאי להשתמש עם grok debugger. יש אחד online כאן ויש גם אחד מובנה ב-Kibana. אני מזהיר מראש שהוא לפעמים לא עובד כמו שצריך, וכשאתה בטוח בעצמך והוא מזייף כדאי מאוד לנסות במערכת Elastic אמיתית. יצא לי כבר כמה פעמים שהוא נתן תוצאות לא נכונות ובמערכת אמיתית הדברים עבדו אחרת. אבל בשביל דברים פשוטים הוא די טוב.

במקרה הפשוט שלנו, זה מה שיצא (לחצו על התמונה כדי לראות אותה גדולה יותר):


זה בדיוק מה שרצינו.

זו היתה דוגמה פשוטה כדי שניכנס לעניינים. עכשיו נלמד איך לחלץ מידע משורה מורכבת יותר. ניקח את השורה הראשונה שהכנסנו לקובץ הלוג שיצרנו לעיל

2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.

השורה הזו מורכבת מהרבה שדות ולכן כדאי לפרק אותה לחלקים. בהתחלה ניקח את תחילת הלוג שמכילה את התאריך ואת ה-transaction. לאחר מכן כשנמצא ביטוי נכון לחילוץ המידע ניקח עוד חלק מהשורה וכן הלאה עד שנסיים את כל השורה. החלק שעליו נעבוד עכשיו הוא:

2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE

יש מקרים שבהם שום תבנית של grok לא מתאימה לנו ולכן יש צורך להעזר בספריה שנקראת Oniguruma שהיא כבר מובנת ב-grok. היא מאפשרת לנו לכתוב ביטויים ב-regex. לצורך כך אנחנו משתמשים ב-syntax שונה שנראה כך:

(?<field_name>the pattern here)

נשתמש ביכולת הזו כדי לחלץ את התאריך והשעה. לוקח קצת זמן לשחק עם זה עד שמגיעים לביטוי הנכון. במקרה שלנו בסופו של דבר זה הביטוי שנצטרך:

(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY} %{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}

שימו לב שהסוגריים העגולים שסימנתי באדום מכילים את כל הביטוי שמחלץ את התאריך והשעה. השם datetime הוא השם של השדה החדש שאנחנו רוצים שיווצר. לאחר מכן השתמשנו בתבניות של grok כדי לחלץ כל חלק מהתאריך והשעה. בנוסף יווצרו לנו גם שדות נוספים עבור כל חלק מהתאריך והשעה. נכניס הכל ל-grok debugger:


ונקבל:




מצוין!

עכשיו נמשיך לחלץ את שאר השדות של השורה הראשונה בלוג שלנו:

2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.

ממליץ מאוד לנסות לעשות את זה לבד, כדי ללמוד את כל המשחקים עם grok.

בסופו של דבר הביטוי השלם שבניתי כדי לחלץ את כל השדות הוא:

(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY} %{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}.* completed %{WORD:status}.* Request time: %{BASE10NUM:request_time}.* DB time: %{BASE10NUM:db_time}.*

נראה מה ייצא לנו מה-grok debugger:


שימוש בפילטר date

כמו שראינו בהתחלה, logstash יוצר לנו אוטומטית שדה שנקרא timestamp וב-default הוא מכניס לשם את התאריך של זמן יצירת אובייקט ה-json. מה שאנחנו רוצים זה שהשדה הזה יכיל את התאריך שחילצנו מתוך שורת הלוג. זה מאוד יעזור לנו אח"כ בשביל ליצור גרפים ושאילתות לפי זמן. לצורך כך נשתמש בפילטר שנקרא date. זה פילטר שמיועד בשביל חילוץ של תאריכים וזמנים.

צריך לזכור ש-logstash מבצע את הפילטרים שאנחנו מגדירים לו אחד אחרי השני, ולכן הפילטר date יגיע אחרי הפילטר grok ויעבוד על שדה ש-grok יצר. במקרה שלנו נשתמש בשדה datetime. וכך נכתוב בשדה של ה-filter ב- first-pipeline.conf:

filter {

    grok {

            match => { "message" => 

            "(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY} %{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}.* completed %{WORD:status}.* Request time: %{BASE10NUM:request_time}.* DB time: %{BASE10NUM:db_time}.*"

            }

    }

    date { match => [ "datetime", "yyyy/MM/dd HH:mm:ss.SSS" ] }

}


כדי להבין את זה טוב כדאי מאוד לקרוא קצת על הפילטר date פה. הפילטר מאפשר לבנות את ה-Parsing כך שיתאים בדיוק עם הצורה שבה התאריך והשעה כתובים בלוג.

נבדוק את זה על מערכת אמיתית

אני מזכיר שאין צורך לעצור ולהפעיל מחדש את logstash, כיון שהשתמשנו בפרמטר config.reload.automatic שדואג לכך ש-Logstash מתעדכן אוטומטית כל פעם שהקובץ first-pipeline.conf משתנה.

נעצור את filebeat, ונמחוק את ה-registry שלו כמו שמוסבר בחלק הראשון כדי שישלח שוב את קובץ הלוג שלנו:

rm -rf /usr/share/filebeat/bin/data/registry

נריץ שוב את filebeat:

/usr/share/filebeat/bin/filebeat -e -c /etc/filebeat/filebeat.yml -d "publish"

ונראה מה עכשיו מוציא לנו logstash עבור השורה הראשונה בלוג:

{
             "ecs" => {
        "version" => "1.4.0"
    },
            "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
           "input" => {
        "type" => "log"
    },
         "message" => "2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from 
device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.",
          "status" => "successfully",
    "request_time" => "3",
           "agent" => {
             "version" => "7.6.2",
                "type" => "filebeat"
    },
        "datetime" => "2020/02/24 22:25:17.337",
        "@version" => "1",
      "@timestamp" => 2020-02-24T20:25:17.337Z,
             "log" => {
        "offset" => 0,
          "file" => {
            "path" => "/my_logs/myproject.log"
        }
    },
         "db_time" => "4",
     "transaction" => "GET_MESSAGE",           
}

אפשר לראות עכשיו שהשדה timestamp מכיל את התאריך שחולץ מתוך השורה הראשונה בלוג.

שימוש בפילטר mutate

אם שמתם לב, יש לנו עכשיו שדה מיותר. השדה datetime כבר לא נצרך כיון שהתאריך והשעה עכשיו נמצאים בשדה timestamp. לכן כדאי למחוק את השדה datetime. לצורך כך נשתמש בפילטר mutate.

השימוש פשוט מאוד:

mutate { remove_field => [ "datetime" ] }

בצורה הזו אנחנו מבקשים למחוק את שדה ה-datetime.

עכשיו חלק ה-filter שלנו יראה עכשיו כך:

filter {

    grok {

            match => { "message" => "(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY} %{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}.* completed %{WORD:status}.* Request time: %{BASE10NUM:request_time}.* DB time: %{BASE10NUM:db_time}.*"}

    }

    date { match => [ "datetime", "yyyy/MM/dd HH:mm:ss.SSS" ] }

    mutate { remove_field => [ "datetime" ] }

}

 נריץ את המערכת מחדש כמו שהסברנו לעיל (מחיקת ה-registery והרצת filebeat) ונראה מה נקבל עכשיו מ-logstash:

{
      "@timestamp" => 2020-02-24T20:25:17.337Z,
         "message" => "2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.",
    "request_time" => "3",
     "transaction" => "GET_MESSAGE",
             "ecs" => {
        "version" => "1.4.0"
    },
           "agent" => {
                "type" => "filebeat",    
             "version" => "7.6.2"
    },
            "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
             "log" => {
        "offset" => 0,
          "file" => {
            "path" => "/orabi_logs/elastic_test/myproject.log"
        }
    },
         "db_time" => "4",
           "input" => {
        "type" => "log"
    },
          "status" => "successfully",
        "@version" => "1"
}

כפי שניתן לראות, השדה datetime נמחק.

שורות שלא מתאימות לפילטר

הפילטר של grok שכתבנו מתאים למבנה מסויים של שורת הלוג שלנו. מכל השורות שלא מתאימות לפילטר הזה יווצרו json-ים בסיסיים כמו שראינו בתחילת הפוסט כשלא השתמשנו בכלל ב-filter.

במציאות, הרבה פעמים אנחנו נירצה ששורות שלא עברו את הפילטר בכלל לא יצאו ל-output. לצורך כך נשתמש בשדה tags שנוצר באופן אוטומטי ב-json. שורות שלא עוברות את הפילטר יקבלו tags של "_grokparsefailure".

למשל השורה השניה בלוג שלנו נראית כך:

2020/02/24 22:25:20.990 ERROR: Transaction Name: SEND_MESSAGE from device id 445566 ended with error HOST_NOT_AVAILABLE Request time: 4 ms, DB time: 4 ms.

שורה זו לא עברה את הפילטר שבנינו ולכן ה-json שלה מכיל בשדה ה-tags את הערך _grokparsefailure. כמו שניתן לראות להלן:

{
    "@timestamp" => 2020-05-21T13:59:21.306Z,
          "tags" => [
        [0] "beats_input_codec_plain_applied",
        [1] "_grokparsefailure"
    ],
       "message" => "2020/02/24 22:25:20.990 ERROR: Transaction Name: SEND_MESSAGE from device id 445566 ended with error HOST_NOT_AVAILABLE Request time: 4 ms, DB time: 4 ms.",
         "input" => {
        "type" => "log"
    },
           "ecs" => {
        "version" => "1.4.0"
    },
         "agent" => {
                "type" => "filebeat",
             "version" => "7.6.2"
    },
      "@version" => "1",
           "log" => {
          "file" => {
            "path" => "/orabi_logs/elastic_test/myproject.log"
        },
        "offset" => 142
    }
}

כדי להיפטר מכל השורות שמכילות את ה-tag הזה נכניס בסיום חלק ה-filter שלנו את השורה:

if "_grokparsefailure" in [tags] { drop {} }

וכך יראה הפילטר שלנו עכשיו:

 

filter {

    grok {

            match => { "message" => "(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY} %{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}.* completed %{WORD:status}.* Request time: %{BASE10NUM:request_time}.* DB time: %{BASE10NUM:db_time}.*"}

    }

    date { match => [ "datetime", "yyyy/MM/dd HH:mm:ss.SSS" ] }

    mutate { remove_field => [ "datetime" ] }

    if "_grokparsefailure" in [tags] { drop {} }

}

נריץ את המערכת מחדש כמו שהסברנו לעיל (מחיקת ה-registery והרצת filebeat) ונראה מה נקבל עכשיו מ-logstash. תריצו בעצמכם ותיראו שעכשיו יוצא רק json אחד עבור השורה הראשונה ולא יוצא כלום עבור השניה.

בעיות נפוצות

קורה לפעמים שכשמנסים להריץ את logstash מקבלים שגיאה בסגנון הזה:

Logstash could not be started because there is already another instance using the configured data directory…

זה אומר שיש logstash שעדיין רץ. לא ניתן להריץ שני יחידות של logstash ביחד. אם מוצאים את ה-logstash שרץ ואפשר בקלות לסגור אותו – מצוין. הבעיה היא שלפעמים לא קל למצוא אותו ולסגור אותו בקלות ולכן נעשה חיפוש קצר. נחפש מי משתמש ב-Port של logstash:

netstat -tulpn | grep 5044

נקבל משהו כזה:

tcp        0      0 :::5044      :::*     LISTEN      25197/java

ואז נסגור את התוכנה שרצה לפי ה-ID שלה:

kill -9 25197

עכשיו ניתן להריץ את logstash מחדש.

אם כל זה לא עזר כדאי למחוק את הקובץ .lock בצורה הבאה:

rm -f /usr/share/logstash/data/.lock

מה בהמשך

עד כאן לחלק הזה. בחלקים הבאים נעסוק ב-elasticsearch ו-kibanaאחרי שנבין טוב יותר את שני הכלים הבאים נכניס דברים נוספים שקשורים ל-logstash.


החלק הזה היה ארוך יותר, אבל אם הגעתם עד לפה אתם משקיענים ואתם בדרך הנכונה.

עד כאן להפעם.

אם אהבתם, תכתבו משהו למטה...

4 תגובות:

  1. אנונימי1/5/21 18:40

    תודה על ההסבר וההשקעה.

    השבמחק
  2. אנונימי8/6/23 18:24

    תודה רבה , ההסבר ממש נותן מושג במה מדובר ויותר מזה

    השבמחק
  3. אנונימי12/1/25 11:06

    תודה רבה ! עובד טוב ! ישר כח על העוונה והצניעות

    השבמחק