Logstash
הכלי הזה מיועד לעשות parsing למידע שהוא מקבל ולשים אותו
ב-elasticsearch. ניתן בעזרתו גם להעביר
מידע מ-DB שהוא SQL ל-elasticsearch אבל על זה נכתוב בפעם אחרת (אי"ה).
קובץ הקונפיגורציה נמצא בתיקייה /etc/logstash וקוראים לו logstash.yml. אבל במקרה שלנו אין לנו צורך לעשות שום שינוי מהגדרות ה-default. במקום זה אנחנו נגדיר ל-Logstash מה שנקרא pipeline שזה כמו תוכנית עבודה שאומרת מאיפה לקבל את המידע, מה לעשות איתו ואיפה לשים אותו.
אז ניצור קובץ
חדש בתיקייה /etc/logstash ונקרא לו first-pipeline.conf ונכניס לו את הטקסט
הבא:
input { beats { port => "5044" } } #filter { #} output { # output to the console stdout { codec => rubydebug } }
הקובץ בנוי משלושה חלקים:
·
input – בחלק הזה מגדירים מאיפה מגיע המידע. בתור התחלה הגדרנו שהמידע מגיע
ב-filebeat מ-port 5044 שכמו שאתם זוכרים זה
ה-port שהגדרנו ל-filebeat לשלוח אליו את המידע (מוסבר בחלק הראשון).
·
filter – בחלק הזה מגדירים פעולות שאנחנו מבצעים על המידע. זה לא שדה חובה ולכן
בתור התחלה הוא ב-comment, אבל נשתמש בו עוד
מעט כי בו אנחנו מגדירים את ה-parsing
·
output – בחלק הזה מגדירים לאן ישלח המידע לאחר כל הפעולות שבוצעו ב-filter. בתור התחלה הגדרנו שכל
המידע ישלח פשוט ל-console וכך נוכל לראות בקלות מה
בדיוק בוצע במידע. בהמשך נשנה את זה כדי לשלוח את המידע ל-elasticsearch.
לצורך התרגול ניצור קובץ לוג בתיקייה /my_logs בשם myproject.log ונכניס לו את הטקסט הבא:
2020/02/24 22:25:17.337
INFO : Transaction Name: GET_MESSAGE from device id 112233 completed
successfully. Request time: 3 ms, DB time: 4 ms.
2020/02/24 22:25:20.990 ERROR: Transaction Name: SEND_MESSAGE from device
id 445566 ended with error HOST_NOT_AVAILABLE Request time: 4 ms, DB time:
4 ms.
נריץ את logstash ע"י הפקודה:
/usr/share/logstash/bin/logstash -f
/etc/logstash/first-pipeline.conf --config.reload.automatic
הפרמטר config.reload.automatic מגדיר ל-logstash להתעדכן בצורה אוטומטית כל פעם ש- first-pipeline.conf משתנה כך שלא נצטרך כל פעם
לעצור ולהריץ מחדש את logstash.
לאחר מכן נריץ את filebeat כדי שיתחיל לשלוח את המידע
ל-Logstash ע"י הפקודה:
/usr/share/filebeat/bin/filebeat -e -c
/etc/filebeat/filebeat.yml -d "publish"
אם הכל עובד כשורה אנחנו נראה ש-logstash ידפיס את המידע הבא (הורדתי כמה שדות שמיוחדים למכונה שלי וכנראה שגם אצלכם יהיו כמו שדות ייחודיים לכם. השארתי את השדות החשובים):
{ "message" => "2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.", "@timestamp" => 2020-05-21T10:07:18.082Z, "input" => { "type" => "log" }, "agent" => { "type" => "filebeat", "version" => "7.6.2" }, "log" => { "offset" => 0, "file" => { "path" => "/my_logs/myproject.log" } }, "@version" => "1", "tags" => [ [0] "beats_input_codec_plain_applied" ], "ecs" => { "version" => "1.4.0" } } { "message" => "2020/02/24 22:25:20.990 ERROR: Transaction Name: SEND_MESSAGE from device id 445566 ended with error HOST_NOT_AVAILABLE Request time: 4 ms, DB time: 4 ms.", "@timestamp" => 2020-05-21T10:07:18.082Z, "input" => { "type" => "log" }, "agent" => { "type" => "filebeat", "version" => "7.6.2" }, "log" => { "offset" => 142, "file" => { "path" => "/my_logs/myproject.log" } }, "@version" => "1", "tags" => [ [0] "beats_input_codec_plain_applied" ], "ecs" => { "version" => "1.4.0" } }
קיבלנו שני אובייקטים של json אחד עבור כל שורה מהלוג. כל השורה מהלוג נמצאת בתוך השדה message. בנוסף יש שדה של timestamp שכרגע מייצג את הזמן שבו ה-logstash פעל (נשנה זאת בהמשך). ויש עוד כמה נתונים כלליים.
מה שאנחנו רוצים שיהיה בסופו של דבר, זה שכל מידע שאנחנו רוצים לחלץ מהשורה
של הלוג יקבל שדה בפני עצמו. לצורך כך נתחיל להשתמש בחלק ה-filter.
ישנם הרבה סוגי פילטרים שניתן להשתמש בהם, אך ללא ספק השימושי מכולם הוא grok. הוא נועד לחלץ נתונים מתוך
מידע שהוא Unstructured.
אז בתור התחלה נסביר את ה-syntax של grok. כל שדה שאנחנו רוצים לחלץ
נגדיר בצורה הבאה:
%{SYNTAX:SEMANTIC}
כאשר SYNTAX זוהי אחת התבניות המוגדרות של grok (התבניות מוגדרות כאן). ו-SEMANTIC זה שם השדה שאנחנו רוצים ליצור.
נתחיל עם הסבר על שורת לוג פשוטה. נניח שיש לנו שורת לוג כזו:
Transaction Name: GET_MESSAGE
ואנחנו רוצים לחלץ את GET_MESSAGE לתוך שדה שנקרא transaction. נכתוב את הדבר הבא ב-filter:
filter { grok { match => { "message" => ".*: %{USERNAME:transaction}.*"} } }
הסבר: התוים ".*" פירושם – כל תו שהוא אפס פעמים ויותר (בדומה ל-Regex). התו ":" פשוט אומר ל-grok למצוא נקודותיים. עד כאן בעצם מצאנו את הנקודותיים הראשונים בשורה. לאחר מכן יש רווח " " ואז מגיע חילוץ המידע.
התבנית USERNAME כפי שמוגדרת ב-grok כאן היא:
USERNAME [a-zA-Z0-9._-]+
וזו תבנית טובה לחילוץ המידע שלנו שכולל אותיות וקו תחתון בלבד.
כדי לשחק עם grok עד שמגיעים לפילטור המדוייק
כדאי להשתמש עם grok debugger. יש אחד online כאן ויש גם אחד מובנה
ב-Kibana. אני מזהיר מראש שהוא
לפעמים לא עובד כמו שצריך, וכשאתה בטוח בעצמך והוא מזייף כדאי מאוד לנסות
במערכת Elastic אמיתית. יצא לי כבר כמה
פעמים שהוא נתן תוצאות לא נכונות ובמערכת אמיתית הדברים עבדו אחרת. אבל בשביל
דברים פשוטים הוא די טוב.
במקרה הפשוט שלנו, זה מה שיצא (לחצו על התמונה כדי לראות אותה גדולה יותר):
זה בדיוק מה שרצינו.
זו היתה דוגמה פשוטה כדי שניכנס לעניינים. עכשיו נלמד איך לחלץ מידע משורה מורכבת יותר. ניקח את השורה הראשונה שהכנסנו לקובץ הלוג שיצרנו לעיל
2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.
השורה הזו מורכבת מהרבה שדות ולכן כדאי לפרק אותה לחלקים. בהתחלה ניקח את תחילת הלוג שמכילה את התאריך ואת ה-transaction. לאחר מכן כשנמצא ביטוי נכון לחילוץ המידע ניקח עוד חלק מהשורה וכן הלאה עד שנסיים את כל השורה. החלק שעליו נעבוד עכשיו הוא:
2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE
יש מקרים שבהם שום תבנית של grok לא מתאימה לנו ולכן יש צורך להעזר בספריה שנקראת Oniguruma שהיא כבר מובנת ב-grok. היא מאפשרת לנו לכתוב ביטויים ב-regex. לצורך כך אנחנו משתמשים ב-syntax שונה שנראה כך:
(?<field_name>the
pattern here)
נשתמש ביכולת הזו כדי לחלץ את התאריך והשעה. לוקח קצת זמן לשחק עם זה עד
שמגיעים לביטוי הנכון. במקרה שלנו בסופו של דבר זה הביטוי שנצטרך:
(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY}
%{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}
שימו לב שהסוגריים העגולים שסימנתי באדום מכילים את כל הביטוי שמחלץ את התאריך והשעה. השם datetime הוא השם של השדה החדש שאנחנו רוצים שיווצר. לאחר מכן השתמשנו בתבניות של grok כדי לחלץ כל חלק מהתאריך והשעה. בנוסף יווצרו לנו גם שדות נוספים עבור כל חלק מהתאריך והשעה. נכניס הכל ל-grok debugger:
ונקבל:
מצוין!
עכשיו נמשיך לחלץ את שאר השדות של השורה הראשונה בלוג שלנו:
2020/02/24
22:25:17.337 INFO : Transaction Name: GET_MESSAGE from device id 112233
completed successfully. Request time: 3 ms, DB time: 4 ms.
ממליץ מאוד לנסות לעשות את זה לבד, כדי ללמוד את כל המשחקים עם grok.
בסופו של דבר הביטוי השלם שבניתי כדי לחלץ את כל השדות הוא:
(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY}
%{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}.* completed
%{WORD:status}.* Request time: %{BASE10NUM:request_time}.* DB time:
%{BASE10NUM:db_time}.*
נראה מה ייצא לנו מה-grok debugger:
שימוש בפילטר date
כמו שראינו בהתחלה, logstash יוצר לנו אוטומטית שדה שנקרא timestamp וב-default הוא מכניס לשם את התאריך של
זמן יצירת אובייקט ה-json. מה שאנחנו רוצים זה שהשדה הזה יכיל את התאריך שחילצנו מתוך שורת
הלוג. זה מאוד יעזור לנו אח"כ בשביל ליצור גרפים ושאילתות לפי זמן. לצורך כך
נשתמש בפילטר שנקרא date. זה פילטר שמיועד בשביל חילוץ של תאריכים וזמנים.
צריך לזכור ש-logstash מבצע את הפילטרים שאנחנו מגדירים לו אחד אחרי השני, ולכן הפילטר date יגיע אחרי הפילטר grok ויעבוד על שדה ש-grok יצר. במקרה שלנו נשתמש בשדה datetime. וכך נכתוב
בשדה של ה-filter ב- first-pipeline.conf:
filter {
grok {
match => { "message" =>
"(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY} %{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}.* completed %{WORD:status}.* Request time: %{BASE10NUM:request_time}.* DB time: %{BASE10NUM:db_time}.*"
}
}
date { match => [ "datetime", "yyyy/MM/dd HH:mm:ss.SSS" ] }
}
כדי להבין את זה טוב כדאי מאוד לקרוא קצת על הפילטר date פה. הפילטר
מאפשר לבנות את ה-Parsing כך שיתאים בדיוק עם הצורה שבה התאריך והשעה כתובים בלוג.
נבדוק את זה על מערכת אמיתית
אני מזכיר שאין צורך לעצור ולהפעיל מחדש את logstash, כיון שהשתמשנו בפרמטר config.reload.automatic שדואג לכך ש-Logstash מתעדכן
אוטומטית כל פעם שהקובץ first-pipeline.conf משתנה.
נעצור את filebeat, ונמחוק את ה-registry שלו כמו שמוסבר בחלק הראשון כדי שישלח שוב את קובץ הלוג שלנו:
rm -rf
/usr/share/filebeat/bin/data/registry
נריץ שוב את filebeat:
/usr/share/filebeat/bin/filebeat
-e -c /etc/filebeat/filebeat.yml -d "publish"
ונראה מה עכשיו מוציא לנו logstash עבור השורה הראשונה בלוג:
{ "ecs" => { "version" => "1.4.0" }, "tags" => [ [0] "beats_input_codec_plain_applied" ], "input" => { "type" => "log" }, "message" => "2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from
device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.", "status" => "successfully", "request_time" => "3", "agent" => { "version" => "7.6.2", "type" => "filebeat" }, "datetime" => "2020/02/24 22:25:17.337", "@version" => "1", "@timestamp" => 2020-02-24T20:25:17.337Z, "log" => { "offset" => 0, "file" => { "path" => "/my_logs/myproject.log" } }, "db_time" => "4", "transaction" => "GET_MESSAGE", }
אפשר לראות עכשיו שהשדה timestamp מכיל את התאריך שחולץ מתוך השורה הראשונה בלוג.
שימוש בפילטר mutate
אם שמתם לב, יש לנו עכשיו שדה מיותר. השדה datetime כבר
לא נצרך כיון שהתאריך והשעה עכשיו נמצאים בשדה timestamp. לכן כדאי למחוק את השדה datetime.
לצורך כך נשתמש בפילטר mutate.
השימוש פשוט מאוד:
mutate {
remove_field => [ "datetime" ] }
בצורה הזו אנחנו מבקשים למחוק את שדה ה-datetime.
עכשיו חלק ה-filter שלנו יראה עכשיו כך:
filter {
grok {
match => { "message" =>
"(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY}
%{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}.* completed
%{WORD:status}.* Request time: %{BASE10NUM:request_time}.* DB time:
%{BASE10NUM:db_time}.*"}
}
date {
match => [ "datetime", "yyyy/MM/dd HH:mm:ss.SSS" ] }
mutate
{ remove_field => [ "datetime" ] }
}
נריץ את המערכת מחדש כמו שהסברנו
לעיל (מחיקת ה-registery והרצת filebeat) ונראה מה
נקבל עכשיו מ-logstash:
{ "@timestamp" => 2020-02-24T20:25:17.337Z, "message" => "2020/02/24 22:25:17.337 INFO : Transaction Name: GET_MESSAGE from device id 112233 completed successfully. Request time: 3 ms, DB time: 4 ms.", "request_time" => "3", "transaction" => "GET_MESSAGE", "ecs" => { "version" => "1.4.0" }, "agent" => { "type" => "filebeat", "version" => "7.6.2" }, "tags" => [ [0] "beats_input_codec_plain_applied" ], "log" => { "offset" => 0, "file" => { "path" => "/orabi_logs/elastic_test/myproject.log" } }, "db_time" => "4", "input" => { "type" => "log" }, "status" => "successfully", "@version" => "1" }
כפי שניתן לראות, השדה datetime נמחק.
שורות שלא מתאימות לפילטר
הפילטר של grok שכתבנו
מתאים למבנה מסויים של שורת הלוג שלנו. מכל השורות שלא מתאימות לפילטר הזה יווצרו json-ים בסיסיים כמו שראינו בתחילת
הפוסט כשלא השתמשנו בכלל ב-filter.
במציאות, הרבה פעמים אנחנו נירצה ששורות שלא עברו את הפילטר בכלל לא יצאו ל-output. לצורך כך נשתמש בשדה tags שנוצר באופן אוטומטי ב-json. שורות שלא עוברות את הפילטר
יקבלו tags של "_grokparsefailure".
למשל השורה השניה בלוג
שלנו נראית כך:
2020/02/24
22:25:20.990 ERROR: Transaction Name: SEND_MESSAGE from device id 445566 ended
with error HOST_NOT_AVAILABLE Request time: 4 ms, DB time: 4 ms.
שורה זו לא עברה את הפילטר שבנינו ולכן ה-json שלה מכיל בשדה ה-tags את הערך _grokparsefailure. כמו שניתן לראות להלן:
{ "@timestamp" => 2020-05-21T13:59:21.306Z, "tags" => [ [0] "beats_input_codec_plain_applied", [1] "_grokparsefailure" ], "message" => "2020/02/24 22:25:20.990 ERROR: Transaction Name: SEND_MESSAGE from device id 445566 ended with error HOST_NOT_AVAILABLE Request time: 4 ms, DB time: 4 ms.", "input" => { "type" => "log" }, "ecs" => { "version" => "1.4.0" }, "agent" => { "type" => "filebeat", "version" => "7.6.2" }, "@version" => "1", "log" => { "file" => { "path" => "/orabi_logs/elastic_test/myproject.log" }, "offset" => 142 } }
כדי להיפטר מכל השורות שמכילות את ה-tag הזה נכניס בסיום חלק ה-filter שלנו את השורה:
if
"_grokparsefailure" in [tags] { drop {} }
וכך יראה הפילטר שלנו עכשיו:
filter {
grok {
match => { "message"
=> "(?<datetime>%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY}
%{HOUR}\:%{MINUTE}\:%{SECOND}).*: %{USERNAME:transaction}.* completed
%{WORD:status}.* Request time: %{BASE10NUM:request_time}.* DB time: %{BASE10NUM:db_time}.*"}
}
date { match => [ "datetime",
"yyyy/MM/dd HH:mm:ss.SSS" ] }
mutate { remove_field => [
"datetime" ] }
if "_grokparsefailure" in [tags]
{ drop {} }
}
נריץ את המערכת מחדש כמו שהסברנו לעיל (מחיקת ה-registery והרצת filebeat) ונראה מה
נקבל עכשיו מ-logstash. תריצו
בעצמכם ותיראו שעכשיו יוצא רק json אחד עבור השורה הראשונה ולא יוצא כלום עבור השניה.
בעיות נפוצות
קורה לפעמים שכשמנסים להריץ את logstash מקבלים שגיאה בסגנון הזה:
Logstash
could not be started because there is already another instance using the configured
data directory…
זה אומר שיש logstash שעדיין רץ. לא ניתן להריץ שני יחידות של logstash ביחד. אם מוצאים את ה-logstash שרץ ואפשר
בקלות לסגור אותו – מצוין. הבעיה היא שלפעמים לא קל למצוא אותו ולסגור אותו
בקלות ולכן נעשה חיפוש קצר. נחפש מי משתמש ב-Port של logstash:
netstat
-tulpn | grep 5044
נקבל משהו כזה:
tcp 0
0 :::5044 :::* LISTEN 25197/java
ואז נסגור את התוכנה שרצה לפי ה-ID שלה:
kill -9
25197
עכשיו ניתן להריץ את logstash מחדש.
אם כל זה לא עזר כדאי למחוק את הקובץ .lock בצורה הבאה:
rm -f
/usr/share/logstash/data/.lock
מה בהמשך
עד כאן לחלק הזה. בחלקים הבאים נעסוק ב-elasticsearch ו-kibana. אחרי שנבין טוב יותר את שני הכלים הבאים נכניס דברים נוספים שקשורים ל-logstash.
החלק הזה היה ארוך יותר, אבל אם הגעתם עד לפה אתם משקיענים ואתם בדרך הנכונה.
עד כאן להפעם.
אם אהבתם, תכתבו משהו למטה...
תודה על ההסבר וההשקעה.
השבמחקבשמחה רבה
מחקתודה רבה , ההסבר ממש נותן מושג במה מדובר ויותר מזה
השבמחקתודה רבה ! עובד טוב ! ישר כח על העוונה והצניעות
השבמחק