להבין את git reset לעומק

לעילוי נשמת הלל מנחם ויגל יעקב למשפחת יניב, מהר-ברכה, שנירצחו על היותם יהודים (ה אדר התשפ"ג).

רוב הפקודות בגיט בטוחות לשימוש וגם אם טועים יש דרך לתקן. ישנן כמה פקודות מסוכנות שצריך להכיר לעומק כי לא תמיד יש אפשרות תיקון. git reset היא אחת מהן.

יש לה המון אפשרויות שכל אחת פועלת אחרת וכדאי להכיר מה בדיוק כל פקודה עושה לפני שמשתמשים בה.

המאמר הזה גם יעזור להבין טוב יותר איך git עובד ויכיר לנו מספר פקודות נוספות.

שלושת העצים

גיט עובד במקביל על שלוש עצים (Git's three trees). זה לא בדיוק מבנה הנתונים עץ שאנחנו מכירים, אלא סוג של רשימה מקושרת. העצים הם working directory, staging index, commit history. ונסביר על כל אחד בנפרד.
כדי להבין את הדברים בצורה טובה נשתנש בדוגמה. נכין את הסביבה שלנו ע"י הפקודות הבאות:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git
$ mkdir git_reset

rafael@DELL-RAFAELJ MINGW64 /c/learn/git
$ cd git_reset/

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset
$ git init .
Initialized empty Git repository in C:/learn/git/git_reset/.git/

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ touch a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git add a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git commit -m "initial commit"
[master (root-commit) a358dfe] initial commit
 1 file changed, 0 insertions(+), 0 deletions(-)
 create mode 100644 a.txt
בפקודות לעיל יצרנו ספריה בשם git_reset, נכנסו אליה ואיתחלנו אותה שתהיה תחת פיקוח של git. בתוכה יצרנו קובץ בשם a.txt והכנסנו אותו פנימה ע"י commit.

working directory

העץ הזה מייצג את השינויים הלוקאלים במחשב שלנו. למשל אם נעשה שינוי בקובץ a.txt, העץ הזה יראה לנו את השינוי הזה:


rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ echo 'hello' > a.txt rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ git status On branch master Changes not staged for commit: (use "git add <file>..." to update what will be committed) (use "git restore <file>..." to discard changes in working directory) modified: a.txt no changes added to commit (use "git add" and/or "git commit -a")
השינויים מסומנים באדום לאחר טקסט modified.

staging index

העץ הזה מייצג את השינויים הלוקאלים שהכנסנו אותם ע"י פקודת git add כמיועדים ל-commit הבא. היישום של העץ הזה מתבצע ע"י שימוש במנגנון caching פנימי שגיט בד"כ מנסה להסתיר מאיתנו.
כדי לראות יותר לעומק את המצב של העץ הזה נשתמש בפקודה git ls-files עם הדגל s- או stage--.

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ git ls-files -s 100644 e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 0 a.txt
הנתונים שהפקודה הזו נותנת לנו הם (לפי הסדר): staged contents' mode bits, object name, stage number. ה-object name זה החלק הגדול באמצע. זה בעצם SHA-1 hash רגיל של גיט. בעצם זה חישוב ה-hash של של תוכן הקבצים שגיט משתמש בו כדי לעקוב אחרי שינויים.
עכשיו נקדם את השינוי שעשינו בקובץ a.txt ע"י git add לתוך עץ ה-staging index.

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ git add a.txt warning: in the working copy of 'a.txt', LF will be replaced by CRLF the next time Git touches it rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ git status On branch master Changes to be committed: (use "git restore --staged <file>..." to unstage) modified: a.txt
השינויים מסומנים בירוק ויש לפניהם את הטקסט changes to be committed.
חשוב לציין ש-git status לא מראה לנו את המצב של ה-staging index במדויק. מה שהוא מראה לנו זה את השינויים בין ה-commit history, שזה העץ הבא שנדבר עליו, לבין עץ ה-staging index. כדי לבדוק את מצב ה-staging index בדיוק, נשתמש שוב בפקודה הבאה:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ git ls-files -s 100644 ce013625030ba8dba906f756967f9e9ca394464a 0 a.txt
ניתן לראות שה-SHA-1 של הקובץ a.txt השתנה ממה שהיה לפני פקודת git add.

commit history

פקודת git commit מכניסה שינויים שישמרו כ-snapshot בעץ ה-commit history. ה-snapshot כולל גם את המצב של ה-staging index בזמן ה-commit. נדגים זאת:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git commit -m "insert some text into a.txt"
[master 043d64c] insert some text into a.txt
 1 file changed, 1 insertion(+)

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git status
On branch master
nothing to commit, working tree clean

ניתן לראות בעזרת git status שלאחר ה-commit אין שינויים באף אחד מהעצים.


אז מה בדיוק עושה פקודת git reset

האמת שהיא דומה לפקודת git checkout. פקודת checkout משנה את הפוינטר HEAD. ולעומתה, פקודת reset פועלת על הפוינטר HEAD וגם על הפוינטר של ה-branch הנוכחי. 

דוגמה תסביר את זה הכי טוב:

בדוגמה זו יש רצף של commits ב-branch שנקרא main. כאשר גם ה-HEAD וגם המצביע של ה-branch מצביעים ל-commit שנקרא d. עכשיו נשווה את הפעולה של checkout מול reset.

אם נבצע git checkout b נקבל את המצב הבא:

רואים שרק המצביע HEAD השתנה. (המצב הזה נקרא deatached HEAD כיון שבמצב הזה HEAD לא מחובר לשום branch). 

ואם נבצע את הפקודה git reset b נקבל את המצב הבא:

כאן רואים שפקודת reset משפיעה גם על HEAD וגם על המצביע של ה-branch.

ה-HEAD והפוינטר של ה-branch נקראים commit ref pointers. ופעולות עליהם הן פעולות בעץ ה-commit history. 

ל-git reset יש 3 אפשרויות, hard, mixed, soft. והן קובעות איך ישתנו שני העצים האחרים. 

בקצרה:

hard-- זו האפשרות הכי מסוכנת. הפעולה הזו גם משנה את הפוינטר ל-commit המבוקש. אבל היא גם מוחקת את השינויים שהיו על ה-staged tree ועל ה-working directory tree.

mixed-- זו אפשרות הדיפולט. היא משנה את הפוינטר ל-commit המבוקש. והיא מעבירה שינויים שהיו על ה-staged tree ל working directory tree. כאילו שלא עשינו להם git add.

soft-- זו האפשרות העדינה ביותר. היא משנה את הפוינטר ל-commit המבוקש. ולא נוגעת ב-staged tree וב-working directory tree.



הפקודה הדיפולטיבית

אם נשתמש בפקודה git reset בלי לציין שום דבר נוסף, זה יהיה זהה לפקודה git reset --mixed HEAD.

הפקודה הזו בעצם תגרום למצביע HEAD ולמצביע של ה-branch להצביע על HEAD. ותעביר שינויים שהיו על ה-staged tree ל working directory tree.

שימוש ב git reset --hard commit_sha1

האפשרות המסוכנת ביותר ועם זאת היא האפשרות שככל הנראה הכי הרבה משתמשים בה. דבר ראשון, כמו בכל שימוש של git reset הפוינטרים של HEAD ושל ה-branch הנוכחי עוברים להצביע על ה-commit_sha1 שאותו בחרנו בפקודה. בנוסף לזה, העצים staging index, working directory מאותחלים כך שגם הם יהיו על אותו commit_sha1 שבחרנו. בפקודה הזו אנחנו מקבלים סביבת עבודה נקיה משינויים. זו הסיבה שזו הפקודה שהכי בשימוש.

כל שינוי שנעשה על ה-working directory וכל שינוי שמחכה על ה-staging index ימחקו כדי להביא את העצים הללו למצב זהה לאותו commit_sha1 שבחרנו.

נדגים את זה בצורה הבאה, נכניס שינוי לעץ ה-working directory ולעץ ה-staging:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ echo 'some text' > b.txt rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ git add b.txt
rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ echo 'adding some text' >> a.txt rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master) $ git status On branch master Changes to be committed: (use "git restore --staged <file>..." to unstage) new file: b.txt Changes not staged for commit: (use "git add <file>..." to update what will be committed) (use "git restore <file>..." to discard changes in working directory) modified: a.txt
יצרנו קובץ חדש שנקרא b.txt וכתבנו בו 'some text' והכנסנו אותו ל-staging. בנוסף הוספנו טקסט לקובץ a.txt. עכשיו יש לנו שינויים בשני העצים.
עכשיו נשתמש ב- git reset --hard ונראה את התוצאה:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git reset --hard
HEAD is now at 043d64c insert some text into a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git status
On branch master
nothing to commit, working tree clean
כפי שרואים נמחקו השינויים שהיו ב-staging וב-working directory. כדאי לזכור שאין פקודת git שמסוגלת להחזיר את המצב לקדמותו, ולכן מומלץ מאוד להיזהר.
לא ציינתי לאן לעשות reset ולכן ה-default היה בשימוש שהוא בעצם ה-HEAD. זה אומר שהעצים חוזרים להיות באותו commit שעליו מצביע HEAD.

שימוש ב mixed--

נתחיל עם אותה דוגמה כמו בפיסקה הקודמת, ואז נשתמש בפקודה git reset --mixed ונראה את התוצאה:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ echo 'some text' > b.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git add b.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ echo 'adding some text' >> a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git status
On branch master
Changes to be committed:
  (use "git restore --staged <file>..." to unstage)
        new file:   b.txt

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
        modified:   a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git reset --mixed
Unstaged changes after reset:
M       a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git status
On branch master
Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
        modified:   a.txt

Untracked files:
  (use "git add <file>..." to include in what will be committed)
        b.txt

no changes added to commit (use "git add" and/or "git commit -a")

במקרה הזה ניתן לראות שהשינויים שהיו ב-staging (הקובץ b.txt) ירדו עכשיו להיות ב-working directory. ואילו השינויים שהיו ב-working directory (שינוי הקובץ a.txt) נשארו עדיין.

שימוש ב soft--

האפשרות של soft משנה רק את המצביעים של ה-HEAD ושל ה-branch הנוכחי, ולא משנה בכלל את עץ ה-staging וה-working directory. 

כדי להבין איך בדיוק soft פועל נשתמש בדוגמה אחרת. נתחיל עם קובץ a.txt ונכניס לתוכו שלוש שורות, כל שורה תהיה ב-commit נפרד:


rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ echo 'one' >> a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git commit -am "one"
warning: in the working copy of 'a.txt', LF will be replaced by CRLF the next time Git touches it
[master 3faafaf] one
 1 file changed, 1 insertion(+)

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ echo 'two' >> a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git commit -am "two"
warning: in the working copy of 'a.txt', LF will be replaced by CRLF the next time Git touches it
[master 5dc4b70] two
 1 file changed, 1 insertion(+)

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ echo 'three' >> a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git commit -am "three"
warning: in the working copy of 'a.txt', LF will be replaced by CRLF the next time Git touches it
[master d3308f2] three
 1 file changed, 1 insertion(+)

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ cat a.txt
one
two
three
נשתמש ב-git log כדי לראות את שלושת ה-commits:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git log
commit d3308f222fed7811c93d868a3aed9aa5c4fe6736 (HEAD -> master)
Author: Rafael Jan <rafael.jan@inx.co>
Date:   Fri Mar 3 00:55:23 2023 +0200

    three

commit 5dc4b707348e31a12a934120b97e8d684277eee0
Author: Rafael Jan <rafael.jan@inx.co>
Date:   Fri Mar 3 00:55:07 2023 +0200

    two

commit 3faafaffec7c7078db091ac2e87ddf44f6279a4c
Author: Rafael Jan <rafael.jan@inx.co>
Date:   Fri Mar 3 00:54:26 2023 +0200

    one

commit a358dfe0f11dcf512be74a8fe83bf3efb0bb25dd
Author: Rafael Jan <rafael.jan@inx.co>
Date:   Thu Feb 23 10:00:41 2023 +0200

    initial commit
נכניס לקובץ שורה רביעית כדי שיהיה שינוי בעץ ה-working directory:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ echo 'four' >> a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git status
On branch master
Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
        modified:   a.txt

no changes added to commit (use "git add" and/or "git commit -a")
עכשיו נשתמש ב-git reset --soft כדי לחזור ל-commit שבו הכנסנו את השורה שבה כתוב one:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git reset --soft 3faafaffec7c7078db091ac2e87ddf44f6279a4c
ונבדוק מה הסטטוס:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git status
On branch master
Changes to be committed:
  (use "git restore --staged <file>..." to unstage)
        modified:   a.txt

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
        modified:   a.txt


rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ cat a.txt
one
two
three
four
במבט ראשון זה נראה מוזר. פתאום נהיו לנו שינויים ב-staging. וכשבודקים את התוכן של הקובץ a.txt נראה שהוא לא השתנה למרות שחזרנו ל-commit שהיה בו רק את שורה one.

אז מה בעצם קרה פה?
הפקודה reset החזירה אותנו ל-commit שבו יש רק את שורה one. אבל בגלל שהשתמשנו ב-soft היא לא שינתה את העץ של ה-working directory שבו כבר היה את הקובץ a.txt עם ארבעת השורות. ולכן המצב כרגע הוא שהשורה one היא כבר בעץ ה-commit history. השורה four היא בעץ ה-working directory. ואילו שתי השורות האמצעיות, two, three, הן נמצאות בעץ ה-staging.
אפשר להוכיח את זה ע"י פעולת restore שמבטלת שינויים ב-working directory וע"י פעול restore --staged שמבטלת שינויים ב-staging.

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git restore a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ cat a.txt
one
two
three
ביטלנו את השינויים ב-working directory ואפשר לראות ששורה four נעלמה.
עכשיו נבטל את השינויים ב-staging:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git restore --staged a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git status
On branch master
Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
        modified:   a.txt

no changes added to commit (use "git add" and/or "git commit -a")

rafae@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ cat a.txt
one
two
three
רואים שבקובץ a.txt יש רק שינויים ב-working directory כי הפקודה restore --staged ביטלה את השינויים ב-staging והעבירה אותם ל-working directory. ואם עכשיו שוב נבטל את השינויים ב-working directory נישאר רק עם שורת one:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git restore a.txt

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git status
On branch master
nothing to commit, working tree clean

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ cat a.txt
one
קצת מורכב, אבל אני חושב שההיגיון ברור.
כמובן שהלוג יראה לנו שה-commits האחרים נעלמו, מה שקורה בכל פעול reset בכל שלושת הסוגים:

rafael@DELL-RAFAELJ MINGW64 /c/learn/git/git_reset (master)
$ git log
commit 3faafaffec7c7078db091ac2e87ddf44f6279a4c (HEAD -> master)
Author: Rafael Jan <rafael.jan@inx.co>
Date:   Fri Mar 3 00:54:26 2023 +0200

    one

commit a358dfe0f11dcf512be74a8fe83bf3efb0bb25dd
Author: Rafael Jan <rafael.jan@inx.co>
Date:   Thu Feb 23 10:00:41 2023 +0200

    initial commit

פעולת reset לעומת פעול revert

פעולת git revert מאפשרת לבטל commit ע"י commit חדש שמחזיר את השינויים שהיו ב-commit אותו אנו רוצים לבטל. בצורה הזו, כל השינויים נשמרים בגיט ויש אפשרות להחזיר אותם אם רוצים.
אם פעולת revert נחשבת לפעולה הבטוחה כשרוצים להחזיר שינויים, אז פעול reset נחשבת לפעולה המסוכנת. אמנם reset לא מוחקת שום commit אבל היא גורמת לכך ש-commits יהפכו להיות מיותמים (orphaned). במילה מיותמים הכוונה היא שה-commits האלו התנתקו משרשרת ה-commits והם אמנם קיימים אבל לא נראה אותם בלוג של שום branch. 
בכל זאת יש פקודה שמאפשרת לראות ולהחזיר commits שהתנתקו והיא פקודת ה-git reflog.
רק שכדאי לדעת שלגיט יש garbage collector שרץ בדיפולט כל 30 יום ומוחק commits שהתנתקו. ואז באמת מאבדים את ה-commits המנותקים. ולכן צריך להכיר טוב את פעולת reset משום שהיא אחת הפעולות היחידות שמאפשרת לאבד לחלוטין commits ללא יכולת חזרה.

המסקנה מכל זה היא ש-revert היא הפעולה הנכונה לשימוש כאשר רוצים לעשות undo ל-commit ואילו פעולת reset היא הפעולה הנכונה לשימוש כשרוצים לבטל שינויים ב-staging וב-working directory.

מאוד לא מומלץ לעשות שימוש ב-reset על commit שכבר פורסם בפרוייקט שמשתמשים בו עוד אנשים, כיון שזה יכול לגרום לכך שמי שנמצא על אותו commit ינותק משאר העץ, ולא יוכל להסתנכרן עם העץ בהמשך. לכן כדאי תמיד להשתמש ב-revert במקרה כזה.

מצד שני, זה בסדר להשתמש ב-reset על commits שעדיין לא פורסמו אלא הם עדיין רק לוקאלים. למשל, אם עשיתי שינוי בקובץ ועשיתי לו commit. ואז עשיתי שוב שינוי באותו קובץ ושוב commit. ועכשיו אני רוצה לבטל את 2 ה-commits, כיון שעדיין לא פירמתי אותם עם push אני יכול בביטחה להשתמש ב-reset. כדי למחוק שתי commits אפשר פשוט להשתמש בכתיבה הנוחה הזו:

$ git reset --hard HEAD~2

סיכום

פעולת git reset היא אחת הפעולות המסוכנות שיש בגיט ואחת היחידות שיכולה לגרום לאיבוד מידע לחלוטין. השימוש בה הוא בעיקר לביטול שינויים בעץ ה-staging וה-working directory, או לביטול commits לוקאלים שעדיין לא פורסמו. 

עבור commits שכבר פורסמו נשתמש תמיד בפקודת revert, שמבטלת את השינויים, תוך כדי יצירת commit נוסף וכך שומרת את כל ההיסטוריה.


יגל יעקב - הלל מנחם



אם אהבתם, תכתבו משהו למטה...


מקורות:

מבוסס בעיקר על המאמר הזה: https://www.atlassian.com/git/tutorials/undoing-changes/git-reset

מדריך מקיף על Airflow




תוכן העניינים


מה זה Airflow?

Airflow הוא כלי שמאפשר ליצור, לתזמן ולנטר סידרה של משימות (tasks). בשפה המקצועית, סידרה של משימות נקראת workflow. 
יצירת workflow נעשית ע"י קוד פייתון. זה קצת מורכב בהתחלה אבל אחרי שלומדים את היסודות זה יחסית פשוט ומאפשר גמישות גדולה ליצירת workflows מורכבים.

לדוגמה, אם אנחנו מפעילים אתר של איחסון תמונות, וכדי לחסוך בשטח איחסון אנחנו רוצים לדחוס את התמונות שהמשתמשים מעלים. אז אנחנו נירצה ליצור workflow שפעם ביום עושה את הפעולות הבאות:


דוגמה לא הכי מוצלחת אבל הרעיון מובן. כל ריבוע זה task. כל task צריך לרוץ לאחר שה-task הקודם סיים. 
במבט ראשון נראה שזה דבר פשוט, ולא מובן למה צריך כלי עבור זה. אבל במציאות יש workflows הרבה יותר מורכבים וארוכים, שכוללים גם tasks שרצים לפי תנאי מסוים. 
ויש כמובן עוד הרבה צרכים ש-Airflow יעזור לנו בהם כמו למשל:
  • הרצת משימות במקביל
  • הרצת משימות כתלות בתנאי
  • תזמון ריצות לפי זמן או לפי טריגר חיצוני
  • הרצת workflow ב-cluster
  • קונפיגורציה עשירה
  • UI טוב ונוח
  • אפשרויות alerting שונות
  • מנגנון העברת מידע בין tasks
  • דיווחים במייל או בערוצים אחרים כמו Slack
ועוד הרבה ...
ב-Airflow ל-workflow קוראים DAG. זה ראשי תיבות של Directed Acyclic Graph.
זה שם מקובל בעולם הזה של כלים עבור workflows כיון שהוא מתאר בצורה יותר טובה למה הכוונה במילה workflow. נסביר עליו בפיסקה בנפרד.

קצת היסטוריה

הכלי פותח בחברת Airbnb החל משנת 2014. כבר מההתחלה הוא פותח כפרוייקט קוד פתוח. בשנת 2016 הפרוייקט הצטרף ל-Apache.

יש כלים מתחרים?

כמובן! ויש הרבה. 
המתחרים הראשיים נכון להיום הם Luigi ו-argo. ויש גם את prefect שהוא לא קוד פתוח לגמרי אלא open core, זה אומר שה-core הוא קוד פתוח, אבל יש תוספות בתשלום.
כאן, אפשר לראות רשימה של יותר מ-100! כלים נוספים.
כאן, כאן וכאן, אפשר לראות השוואות בין הכלים.

אז למה דווקא Airflow?

כשבאים להשוות בין הכלים השונים, צריך קודם כל לבדוק מה הצרכים שלנו ולראות איזה כלי יכול לספק לנו את הצרכים האלו. זה תהליך לא קל, כי זה אומר שצריך ללמוד כל כלי כדי להבין אם הוא מסוגל לעשות מה שאנו צריכים.
לכן לדעתי, צריך לעשות רשימה קצרה של מה שאנחנו חייבים שהכלי יעשה. ובלי זה הוא לא שימושי לנו. נבחר 3 כלים שנראים לנו מועמדים טובים ונבדוק אם הם תומכים ברשימת צרכי החובה שלנו. אם זו רשימה קצרה שמוגדרת טוב, הרבה פעמים אפשר לבדוק באינטרנט אם הכלים תומכים במה שאנו צריכים, עוד לפני שהתחלנו ללמוד את הכלי לעומק.
ברוב המקרים Airflow יהיה ברשימת המועמדים שלכם כיון שהוא ללא ספק הוא הכלי הפופולארי ביותר נכון להיום. 
יכולים להיות הרבה סיבות לבחור ב-Airflow אבל אחת הסיבות החשובות ביותר היא הפופולאריות שלו. פופולאריות היא פקטור חשוב ביותר כיון שתמיד ניתקל באינספור בעיות בשימוש בכלים כאלו. וככל שהכלי יותר פופולארי, יש יותר סיכוי לקבל עזרה באינטרנט. יש המון שאלות ותשובות ב-StackOverflow, המון מאמרים וקהילה עם עשרות אלפי אנשים בערוץ של Airflow ב-Slack שכל אחד יכול להיצטרף דרך פה. מניסיון יש שם המון אנשים טובים ששמחים לעזור.

מה זה DAG?

כפי שכתבתי לעיל, DAG זה ראשי תיבות של Directed Acyclic Graph. אז כמו שזה נשמע מדובר על workflow שהוא directed. זאת אומרת שיש כיוון זרימה מוגדר בין ה-tasks. ושהוא acyclic, שאין מעגליות בזרימה של הגרף. 
דוגמה לגרף כזה מויקיפדיה 
כפי שרואים, החצים מסמנים את כיוון הזרימה בין המשימות. ורואים שאין פה שום נתיב שהוא מעגלי.
אם נגדיר ב-Airflow, איזשהו DAG מעגלי, נקבל שגיאה כזו:

Broken DAG: [/opt/airflow/dags/simple_dag.py] Cycle detected in DAG. Faulty task: upload_compressed_images


הארכיטקטורה של Airflow

באתר הרישמי נוכל לראות את הארכיטקטורה:



Airflow בנוי ממספר רכיבים, להלן הסבר קצר עליהם:
scheduler - אחראי להפעיל workflows לפי התיזמון שלהם, ואחראי להכניס משימות ל-executor שמריץ אותן.
executor - אחראי על הרצת המשימות. בהתקנה רגילה של Airflow ה-executor מריץ הכל בתוך ה-scheduler. אבל במערכות אמיתיות שנועדו ל-production מוציאים את ה-executor החוצה למכונות אחרות שנקראות workers.
webserver - זה ה-UI שלנו. הוא מאפשר לנו לקבל הרבה מידע על ה-DAG, להריץ אותו בצורה יזומה (ולא דווקא מתוזמנת מראש) ולראות את ההתקדמות של הריצה ואת הלוגים.
DAG directory - ספריה שמכילה את קבצי ה-DAG שלנו. היא נקראת ע"י ה-scheduer וה-executor (וכל worker ששייך ל-executor)
metadata database - זהו ה-DB שמחזיק את כל ה-metadata שה-Airflow צריך כדי לשמור על המצב הנוכחי שלו. ה-scheduer, executor וה-webserver משתמשים ב-DB הזה.

הבסיס של Airflow: ה-Operators וה-Sensors 

ב-Airflow יש מושג שנקרא operator. זה בעצם שם נרדף למשימה.
כיון שיש הרבה מאוד סוגים של משימות, יש הרבה מאוד סוגי אופרטורים. 
דוגמאות לאופרטורים מאוד שימושיים:
PythonOperator - נועד להריץ פונקציות או קבצי פייתון
BashOperator - נועד להריץ פקודות או קבצי bash (קבצים עם סיומת sh)
DummyOperator - אופרטור שלא עושה כלום. אז למה צריך אותו? כדי לבנות את המבנה של ה-DAG בהתחלה ואחר כך מחליפים אותו לאופרטור הרצוי. הוא גם משמש הרבה לדוגמאות.
BranchPythonOperator - נועד לאפשר הכנסת תנאים ב-DAG. כשאנחנו צריכים לפי תנאי מסוים להחליט איזה משימה תהיה הבאה בתור.

כאן ניתן לראות את כל האופרטורים של Airflow. בנוסף, ישנם אופרטורים נוספים שיצרו חברות כמו Amazon ו-google, לטובת שימוש בשירותים שלהם. כדי להשתמש בהם צריך להתקין אותם על גבי airflow. כאן למשל יש הוראות איך להתקין את האופרטורים של Amazon.

בנוסף לאופרטורים יש ב-Airflow גם sensors. הם תת מחלקה של האופרטורים, והתפקיד שלהם הוא לחכות לאיזשהו אירוע חיצוני. למשל לחכות שיגיע קובץ מסיום, או שיגיע זמן מסוים. ברגע שהאירוע אליו הסנסור מחכה קרה, הסנסור הזה מסומן כ-succeed ומשימות שבאות אחריו ב-DAG ממשיכות לרוץ.

גם האופרטורים וגם הסנסורים יורשים מהמחלקה הבסיסית BaseOperator. בכל זאת נהוג לחלק אותם ל-2 סוגים כיון שהתפקידים שלהם שונים.

התחלת עבודה

אני עובד עם Airflow גירסה 2.1.2 על מכונת Linux (ubuntu18.04). אבל כיון שרוב העבודה תהיה בפייתון, אז רוב הדברים פה רלוונטים גם למערכות הפעלה אחרות.

התקנה

התקנה של Airlow יכולה להיות לוקאלית על המכונה שעליה עובדים, או ב-docker. את שתי האפשרויות אפשר לראות כאן. מניסיון, ההתקנה הלוקאלית די מורכבת ויש לפעמים בעיות בדרך. לכן אני משתמש בהתקנה ב-docker (מי שצריך הרחבה על docker ובמיוחד על docker-compose מוזמן לקרוא במדריך המקיף על docker). 
לא אפרט כאן את השלבים בצורה מדוייקת כיון שהם יכולים להשתנות במשך הזמן, לכן תמיד כדאי להשתמש באתר הרישמי.
בכל זאת נסביר בקצרה את השלבים:
  1. יוצרים ספרייה בשם airflow ויוצרים בתוכה שלוש ספריות dags, logs, plugins
  2. מורידים לתוך airflow את קובץ docker-compose.yaml של airflow (כפי שמופיע באתר הרישמי)
  3. נותנים הרשאות ל-container בדומה להרשאות של המכונה שלנו
  4. מריצים docker-compose up airflow-init ומחכים שהוא יסיים
  5. אפשר להריץ את airflow ע"י: docker-compose up

הרצה ועצירה של airflow

כדי להריץ את airflow נכתוב:

$ docker-compose up

כדי לעצור אותו נכתוב:

$ docker-compose down


נתחיל בדוגמה

כדי להבין נושא חדש תמיד טוב להתחיל בדוגמה פשוטה, ואז הרבה דברים מתבהרים.

נבנה DAG שיש בו 3 משימות פשוטות. ה-DAG וכל המשימות יהיו מוגדרים בקובץ אחד כיון שזה הדרך הפשוטה ביותר. בהמשך נראה איך ניתן להגדיר את ה-DAG בקובץ אחד והמשימות בקבצים אחרים, שזו הדרך שבה בפועל עובדים במקרים אמיתיים.

לשם הדוגמה, המשימות יהיו מאוד פשוטות ורק ידפיסו משהו על המסך. 

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.utils.dates import days_ago

 

with DAG(

    dag_id='simple_dag',

    start_date=days_ago(2),

) as dag:

 

    task1 = BashOperator(

        task_id='task1_name',

        bash_command='echo this is task1'

    )

 

    task2 = BashOperator(

        task_id='task2_name',

        bash_command='echo this is task2'

    )

 

    task3 = BashOperator(

        task_id='task3_name',

        bash_command='echo this is task3'

    )

 

    task1 >> task2 >> task3 


ככה נראה ה-DAG הכי פשוט, פחות או יותר.
הסבר:
בהתחלה אנחנו מייבאים ספריות של airflow שאנו צריכים.
לאחר מכן מגדירים את ה-DAG. בסוגריים אחרי המילה DAG אנחנו נותנים פרמטרים ל-constructor של ה-DAG. בדוגמה הזו נתנו 2 פרמטרים שהם המינימום ההכרחי. בלעדיהם תהיה שגיאה. אפשר לראות את כל הפרמטרים האפשריים כאן.
ה-dag_id הוא השם של ה-DAG. את השם הזה נראה בהמשך מופיע ב-UI.
ה-start_date הוא פרמטר שקובע החל ממתי ה-DAG הזה מתחיל לרוץ. בהמשך נראה שנוכל לתזמן שה-DAG ירוץ כל זמן מסוים ע"י הפרמטר schedule_interval, למשל כל יום. אבל אם ה-start_date עדיין לא הגיע אז ה-DAG עדיין לא ירוץ.
לאחר מכן מגדירים את המשימות. בדוגמה הזו השתמשנו לשלושת המשימות ב-BashOperator. הסבר על אופרטורים יגיע בהמשך בפיסקה נפרדת. 
כדי לבנות משימה אנחנו צריכים לתת ל-constructor של כל אופרטור task_id שהוא יהיה שם המשימה וכך היא תיקרא ב-UI. בנוסף לפי סוג האופרטור צריך לתת פרמטרים נוספים. במקרה הנוכחי ה-BashOperator צריך לקבל bash_command שהיא הפקודה שהמשימה תריץ ב-bash. בדוגמה הזו המשימות פשוט עושות echo שזו פקודה שמדפיסה טקסט.
בסיום יש סינטקס מוזר: 

task1 >> task2 >> task3 


זוהי כתיבה מקוצרת ב-airflow שבה מגדירים את כיוון הזרימה של ה-DAG. איזו משימה קוראת לאיזו משימה. בדוגמה הזו task1 קוראת ל-task2 שקוראת ל-task3.
ב-UI נוכל לראות את זה בצורה יותר יפה. אבל כמו שאמרנו ב-UI נראה את שמות המשימות (task1_name) ולא את שם האובייקט (task1).


אפשר גם להשתמש בחיצים האלו בכיוון ההפוך. למשל אם נשנה את כיוון הזרימה לדבר הבא
: 

task1 >> task3 << task2 


נקבל את ה-DAG הבא:

אגב, לא חייב להשתמש בכתיבה המקוצרת. אפשר במקום זה להשתמש בפונקציות set_downstream ו-set_upstream. למשל כדי ליצור את ה-DAG הראשון נכתוב
:

task1.set_downstream(task2)

task2.set_downstream(task3)


וכדי ליצור את ה-DAG השני נכתוב:

task1.set_downstream(task3)

task2.set_downstream(task3)


או:

task1.set_downstream(task3)

task3.set_upstream(task2)


תשתמשו במה שנראה לכם הכי קריא.

רואים שזה ממש פשוט להגדיר DAG, נכון? אז בואו נתחיל לסבך קצת :-)

הרצת משימות במקביל

בדוגמאות לעיל כבר ראינו הרצה של משימות במקביל. נסביר שוב ונוסיף עוד דרך.
כדי שמשימות ירוצו במקביל צריך פשוט שהם יהיו באותו מקום ב-DAG. למשל, אם משימה אחת קוראת לשתי משימות אז הן תרוצנה במקביל. למשל אם נכתוב:

task1.set_downstream(task2)

task1.set_downstream(task3)


 או בשיטה השניה:

task1 >> task2
task1 >> task3


נקבל:


ואפשר גם להשתמש ב-list כדי לקצר את הכתיבה:

task1.set_downstream([task2, task3])


או:

task1 >> [task2, task3]


מהצד השני, כדי שכמה משימות יתאחדו למשימה אחת נכתוב:

task1.set_downstream([task2, task3])
task2.set_downstream(task4)
task3.set_downstream(task4)


או:

task1 >> [task2, task3] >> task4





ממשק המשתמש - UI

נעשה עצירה קטנה בהסברים על DAG, ונסביר על ה-UI כדי שנדע איך להריץ ולבדוק את מה שאנחנו כותבים.
ה-UI מגיע מה-process של Airflow שנקרא webserver. אם אתם רוצים לגשת ל-UI שרץ על אותה מכונה שבה אתם עובדיםף פיתחו את הדפדפן וגשו לכתובת:
http://localhost:8080
אם מריצים את ה-webserver על מכונה אחרת מהמכונה שאתם עובדים עליה, תשתמשו ב-IP של המכונה במקום localhost.
שם המשתמש והסיסמה הדיפולטיביים הם airflow.
השימוש ב-UI מאוד קל ואינטואיטיבי ואני חושב שלא צריך להסביר עליו, פשוט תנסו. בכל זאת כמה הערות חשובות.
אם יש בעיה ב-DAG, יופיע ERROR בצבע אדום בחלק העליון בעמוד הראשי (העמוד שבו רואים את רשימת ה-DAGs). בלחיצה על שורת ה-ERROR נקבל עוד פרטים על הבעיה.
ניתן להריץ DAG מהעמוד הראשי או ללחוץ על DAG מסוים ואז לעבור לעמוד של ה-DAG וגם משם ניתן להריץ אותו ע"י לחיצה על כפתור play ואז Trigger DAG.
כדי שיהיה אפשר להריץ DAG צריך להעביר אותו למצב פעיל במסך הראשי.
אם ננסה להריץ את ה-DAG כשהוא לא פעיל אלא במצב של pause הוא לא יתחיל לרוץ. אבל ברגע שנעביר אותו למצב פעיל, כיון שכבר לחצנו על play הוא יתחיל לרוץ באותו רגע. 
כשנמצאים בעמוד של DAG מסוים תחת הלשונית Tree View נראה משהו כזה:

כל עיגול מסמל DAG וכל ריבוע מסמל task. בלחיצה עליהם נפתח חלון עם הרבה אפשרויות, והכי חשוב יש שם כפתור Log שמאפשר לראות את הלוג שלא אותה משימה.
ה-UI מאפשר גישה להרבה דברים נוספים כמו configuration, pools, variables ועוד. בפיסקה שעוסקת בכל נושא נתייחס גם ל-UI הרלוונטי.

טיפים נוספים: 
  • אם אתה מנסה להריץ DAG מסוים ונראה שהוא תקוע ולא מתקדם, לך לעמוד הראשי שבו יש את רשימת כל ה-DAGs ותרענן אותו. בדרך כלל תמצא שם ERROR ב-DAG עצמו שגורם ל-DAG לא לרוץ.
  • אם בעמוד הראשי מופיע ERROR לא ברור עם המילים המוזרות bad magic number, לדוגמה:
Broken DAG: [/opt/airflow/dags/dag_file_name.py] Traceback (most recent call last): File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "/opt/airflow/dags/dag_file_name.py", line 12, in <module> from someFile import someClass ImportError: bad magic number in 'someFile.someClass': b'\x03\xf3\r\n'

זה בד"כ אומר שעשיתם שינוי וה-Airflow עוד לא הספיק להתעדכן. תנו לו עוד 5 שניות ותרעננו את הדף והשגיאה תעלם.

תזמון משימות

על תזמון המשימות אחראי הרכיב שנקרא scheduler. הוא זה שבודק מה הגדרנו ב-DAG ומריץ את המשימות על ה-worker בזמן הנכון.
השדות שמשפיעים על התזמון נמצאים גם בהגדרת ה-DAG וגם בהגדרת ה-task.

שדות של DAG:

start_date - שדה זה קובע מאיזה תאריך ה-DAG הזה רשאי להתחיל לרוץ. למשל: כדי לקבוע שהוא יתחיל לרוץ החל מה-01/04/2021 נכתוב:

start_date=datetime(2021, 4, 1)


נעשה פה שימוש בספריית datetime של פייתון. אפשר גם להשתמש בספריית airflow.utils.dates. בהתחלה נייבא אותה:

from airflow.utils.dates import days_ago


ואז נוכל להשתמש כך:

start_date=days_ago(2)


end_date - עד איזה תאריך ה-DAG הזה צריך לרוץ. אם לא משתמשים בו הוא לא יפסיק וימשיך לרוץ כל סבב שמגיע הזמן שלו.

schedule_interval - מגדיר כל כמה זמן ה-DAG ירוץ. הוא יכול לקבל cron expression בצורה של string או datetime.timedelta. איך משתמשים ב-timedelta אפשר לראות כאן. נראה דוגמה איך להשתמש ב-cron expression. הביטוי מורכב משש או שבע יחידות בסדר הבא (משמאל לימין):
seconds minutes hours day_of_mounth month day_of_week year
אם למשל נכתוב ביטוי כזה:
0 30 14 ? * * *
זה אומר שה-DAG צריך לרוץ בשעה 14:30 כל יום כל הזמן.
הסימן כוכבית מייצג - כל ערך אפשרי, והסימן שאלה מייצג - "לא ערך ספציפי" והוא מותר לשימוש רק ב-day_of_mounth ו-day_of_week. אסור ששני השדות האלו יהיו שניהם באותו ביטוי כוכבית או שניהם סימן שאלה.
כדי לראות הסבר שלם לעניין הזה מומלץ לקרוא כאן וכדי לבנות ביטוי נכון ולבדוק אותו מומלץ להשתמש באתר הזה. דוגמה לשימוש:

schedule_interval="0 30 14 * * *",


דרך מקובלת נוספת היא להשתמש בביטויים שמוגדרים ב-airflow מראש כמו למשל:
@daily, @hourly, @weekly
ניתן לראות תיאור מדוייק שלהם כאן. דוגמה לשימוש:

schedule_interval='@daily'


dagrun_timeout כמה זמן ריצה מוקצה ל-DAG. אם זמן הריצה יהיה מעבר לזמן שהוגדר הוא ייכשל.

שדות של task

execution_timeout - כמה זמן ריצה מוקצה למשימה. אם זמן הריצה שלה יהיה מעבר לזמן שהוגדר היא תיכשל.

retries - מספר הפעמים שניתן להריץ שוב את ה-task לפני שהוא יחשב כ-failed.

retry_delay - כמה זמן לחכות בין ניסיונות ריצה של task.

retry_exponential_backoff - גורם לזמן בין נסיונות להתארך בכל פעם.

start_date - שדה זה קובע מאיזה תאריך המשימה תרוץ בפעם הראשונה. בד"כ משתמשים בשדה start_date רק ב-DAG אבל אם יש צורך, יש גם למשימות שדה כזה.

end_date - עד איזה תאריך מותר למשימה הזו לרוץ.


קונפיגורציה

ל-Airflow יש קובץ קונפיגורציה שנקרא airflow.cfg. הוא נוצר בהרצה הראשונית של airflow והוא נמצא בספרייה הראשית של airflow (בד"כ airflow/~). 

הקובץ הזה מחולק לנושאים שונים. כל נושא מוקף בסוגריים מרובועות למשל [core] או [logging] או [webserver], ותחתיו כל ההגדרות שקשורות לאותו נושא. כל אפשרויות הקונפיגורציה נמצאות כאן עם הסברים.

דרך נוספת לקנפג את airflow היא באמצעות משתני סביבה - environment varibles. לכל הגדרה ב-airflow.cfg יש משתנה סביבה מתאים. יש שיטה פשוטה לדעת מה שם משתנה הסביבה המתאים לכל הגדרת קונפיגורציה. כל משתנה מתחיל במילה AIRFLOW ואז קו תחתון פעמיים. לאחר מכן שם הנושא (core/webserver/etc) אליו הוא שייך ושוב קו תחתון פעמיים, ואז השם המופיע בקובץ airflow.cfg באותיות גדולות.

דוגמה תבהיר את זה בקלות. ניקח למשל את הגדרת הקונפיגורציה שנקראת dags_folder ששם מגדירים באיזה ספריה ה-DAGs שלנו נמצאים. היא נמצאת תחת הנושא [core]. משתנה הסביבה של הגדרה זו הוא:

AIRFLOW__CORE__DAGS_FOLDER

תעברו רגע על השם הזה ועל ההסבר לעיל ותיראו שזה מתאים.

כשמריצים את airflow ע"י docker לא תהיה השפעה של airflow.cfg החיצוני (אני מעריך שיש דרך לגרום לקובץ הקונפיגורציה להשפיע אבל אני לא מצאתי). לכן צריך להשתמש במשתני הסביבה בתוך ה-docker-compose.yaml. הקובץ הזה נמצא בד"כ בתוך ספריית airflow ואפשר גם לראות אותו כאן. הוא מגיע כבר עם כמה הגדרות מראש למשל:

AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'

AIRFLOW__CORE__LOAD_EXAMPLES: 'true'

באותה צורה נוכל להוסיף משתנים שאנו רוצים לשנות מהדיפולט שלהם. קודם נעצור את המערכת עם
$ docker-compose down
נעשה את השינויים ב-docker-compose.yaml ונרים את המערכת:
$ docker-compose up
אפשר לראות את הקונפיגורציה ב-UI תחת Admin->Configurations. 
בהתחלה לא תיראו שם את הקונפיגורציה כי בדיפולט זה סגור מטעמי אבטחה. כדי לאפשר את זה צריך להגדיר את משתנה הסביבה הבא::

AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'

החלק העליון של העמוד הזה מראה את קובץ ה-airflow.cfg והחלק התחתון (צריך לגלול הרבה למטה) מראה את הקונפיגורציה בפועל, ז"א לאחר שמתחשבים גם במשתני הסביבה.
מבחינת סדרי העדיפויות, airflow קודם כל מתחשב במשתני הסביבה, לאחר מכן בקובץ airflow.cfg ובסוף בערכי הדיפולט.
ניתן גם לבדוק ערכי קונפיגורציה ע"י פקודות CLI של Airflow. למשל, כדי לקבל את הערך של load_examplesמהחלק של core נכתוב:

airflow config get-value core load_examples

ואם airflow רץ בתוך docker אז נריץ את הפקודה בתוך ה-container בצורה הבאה:

docker exec -it airflow_airflow-worker_1 airflow config get-value core load_examples

עוד לגבי CLI בפיסקה הבאה.

שימוש ב-CLI

ל-Airflow יש סט פקודות CLI עשיר. אפשר לראות את כולם פה.

כשמריצים את airflow לוקאלית (לא בתוך container) אפשר להשתמש ישירות ב-CLI. כמה דוגמאות לפקודות בסיסיות שגם די ברור מה הן עושות:

airflow info

airflow dags list

airflow dags trigger dag_id

אבל כאשר airflow רת בתוך docker container צריך להריץ את הפקודות הללו בתוך ה-worker container. יש שתי אפשרויות איך לעשות את זה.

1. להיכנס לתוך ה-container:

docker exec -it airflow_airflow-worker_1 bash


ואז להריץ את הפקודות, למשל כדי להריץ DAG מסוים:

default@78bed324a0cc:/opt/airflow$ airflow dags trigger my_simple_dag


2. להריץ את הפקודות ישירות בתוך ה-container בפקודה אחת, למשל כדי להריץ DAG מסוים:

docker exec -it airflow_airflow-worker_1 airflow dags trigger my_simple_dag


שימוש ב-REST API

התיעוד המלא ל-REAST API של Airflow נמצא כאן.

נראה כמה דוגמאות.

כדי לקבל רשימה של כל ה-DAGs נרשום:

curl GET 'http://localhost:8080/api/v1/dags' -H 'Content-Type: application/json' --user "airflow:airflow"

בחלק האחרון של ה user-- צריך לתת username:password שלנו ב-airflow. כמו שהיזכרנו, הדיפולט הוא airflow:airflow. 

הדוגמה הזו טובה כשמריצים את הפקודה על אותה מכונה שבה נמצא ה-airflow. אם מדובר על מכונה אחרת צריך לשנות localhost ל-ip של המכונה.

כדי לקבל פרטים על DAG ספציפי (בדוגמה הזו my_dag_name):

curl GET 'http://localhost:8080/api/v1/dags/my_dag_name' -H 'Content-Type: application/json' --user "airflow:airflow"

כדי להריץ DAG  (בדוגמה הזו my_dag_name):

curl POST 'http://localhost:8080/api/v1/dags/my_dag_name/dagRuns' -H 'Content-Type: application/json' --user "airflow:airflow" -d '{}'


לוגים

הלוגים ב-airflow נמצאים תחת airflow/logs ומסודרים באופן הבא:

airflow/logs/dag_id/task_id/date/incremental_number.log 

אם למשל שם ה-DAG הוא simple_dag ושם המשימה הראשונה הוא task1 אז הלוג שלה יהיה ב:

airflow/logs/simple_dag/task1/2021-08-23T07:00:00+00:00/1.log

דרך הקונפיגורציה, בחלק של [logging] אפשר לשנות הרבה דברים וביניהם את ה-path של הלוגים ואת הפורמט של שם הלוג. 

דרך פשוטה יותר לגשת ללוגים זה בעזרת ה-UI. פשוט לוחצים על הריבוע שמייצג את המשימה:



ואז נפתח חלון שבחלק העליון שלו יש כפתור של Log. 



לחיצה עליו תציג לנו את הלוג.

ב-Airflow יש גם תמיכה בכתיבת לוגים לענן (AWS, google cloud, Azure).

לגבי לוגים של פייתון, אם נשתמש בפקודה print אנחנו נראה שהטקסט הגיע מקובץ בשם logging_mixin.py.

למשל:

[2021-08-22 11:48:37,829] {logging_mixin.py:104} INFO - my print text

אבל אם נשתמש בספריית logging של פייתון הלוג יציין את שם הקובץ שכתב ללוג:

import logging


LOGGER = logging.getLogger("airflow.task")

LOGGER.info("example of log line")


העברת פרמטרים בין משימות - XCOM

בעיקרון, airflow תוכנן כך שכל משימה אמורה להיות בפני עצמה, ולכן אם אפשר עדיף להימנע מהעברת מידע בין משימות. עם זאת, יש מנגנון שנקרא xcom שנועד לאפשר העברת מידע בין משימות. שימו לב ש-xcom מיועד להעברת כמויות מידע קטנות, ולא גדולות, אז לא מומלץ לשים בו אובייקטים גדולים.

מדובר על רשימה של ערכים שלכל אחד יש שישה שדות:

key, value, timestamp, execution date, task id, Dag id

אפשר לראות את הרשימה ב-UI, אם נלחץ על Admin->Xcoms.


בד"כ רוב השימוש ב-xcom יהיה ב-PythonOperator כיון שבו אפשר להשתמש בפונקציה xcom_push כדי להכניס נתונים ל-xcom ובפונקציה xcom_pull כדי לקבל נתונים מ-xcom. 
בנוסף, ה-return value של כל משימה גם נשמר ב-xcom. אם מדובר על PythonOperator אז ישמר הערך שמוחזר מהפונקציה שאליה קראנו. ואם מדובר על BashOperator ישמר הערך האחרון שהודפס.
הדוגמה הבאה מדגימה את ה-return_value:

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.operators.python import PythonOperator, PythonVirtualenvOperator

from airflow.utils.dates import days_ago

 

args = {

    'owner': 'airflow',

}

 

with DAG(

    dag_id='simple_python_dag',

    default_args=args,

    schedule_interval=None,

    start_date=days_ago(2),

    tags=['example'],

) as dag:

 

    def task1_function():

        print('This print is from task 1')

        return 'return from task1'

 

    def task2_function():

        print('This print is from task 2')

 

    task1 = PythonOperator(

        task_id='task_1',

        python_callable=task1_function,

    )

 

    task2 = PythonOperator(

        task_id='task_2',

        python_callable=task2_function,

    )

   

    task3 = BashOperator(

        task_id='task3',

        bash_command='echo This print is from task3'

    )

 

task1 >> task2 >> task3

המשימה task_1 היא PythonOperator והיא מחזירה ערך, ולכן הערך הזה יהיה ב-xcom.

המשימה task_2 היא PythonOperator והיא לא מחזירה ערך, ולכן לא נראה return_value מהמשימה הזו ב-xcom.

המשימה task_3 היא BashOperator והיא מדפיסה משהו למסך, ולכן מה שמודפס גם יהיה ב-xcom.

כך נראת טבלת ה-xcom לאחר הרצת ה-DAG הזה:

אז אם משתמשים ב-PythonOperator ניתן לשמור כמה ערכים ב-xcom. אבל אם משתמשים ב-BashOperator יש רק ערך אחד שנשמר והוא יהיה ה-return_value.

עד כאן זה פשוט כיון שהשתמשנו בפונקציית פייתון שמוגדרת בתוך קובץ ה-DAG. עכשיו נראה איך משתמשים ב-xcom כשה-PythonOperator קורא לפונקציה שנמצאת בקובץ אחר ולא בקובץ ה-DAG.

שימוש ב-xcom מתוך קובץ חיצוני (לא מתוך קובץ ה-DAG)

דבר ראשון, צריך להוסיף למשימה שמשתמשת בקובץ חיצוני את השדה provide_context=True, בצורה הבאה:

task1 = PythonOperator(

    task_id='task_1',

    python_callable=python_file_function,

    provide_context=True

)


השדה הזה גורם לכך ש-airflow מעביר סט של keyword arguments שאפשר להשתמש בהם בקובץ החיצוני. וכדי להשתמש בסט הזה צריך להוסיף בפונקציה שבקובץ החיצוני פרמטר kwargs** כקלט.

ניראה דוגמה והדברים יתבהרו. קובץ ה-DAG שלנו יראה כך:
 

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.utils.dates import days_ago

from python_xcom_file import python_file_function

 

with DAG(

    dag_id='xcom_exteranl_python_dag',

    schedule_interval=None,

    start_date=days_ago(2),

) as dag:

 

    def simple_function(ti):

        print('task1: print from task 1')

        ti.xcom_push(key='task1_string', value='this string from task1')

        return 'return from task 1'

 

    def print_xcom_value(ti):

        print('task3: value from xcom pull: ' + ti.xcom_pull(task_ids='task2', key='task2_string'))

 

 

    task1 = PythonOperator(

        task_id='task1',

        python_callable=simple_function,

    )

 

    task2 = PythonOperator(

        task_id='task2',

        python_callable=python_file_function,

        provide_context=True

    )

 

    task3 = PythonOperator(

        task_id='task3',

        python_callable=print_xcom_value,

        provide_context=True

    )

 

 

task1 >> task2 >> task3 


והקובץ פייתון החיצוני, python_xcom_file.py, יראה כך:

def python_file_function(**kwargs):
    kwargs['ti'].xcom_push(key='task2_string', value='this string from task2')
    print('task2: value from xcom pull task1_string: '
           kwargs[
'ti'].xcom_pull(task_ids='task1', key='task1_string'))
    return 0
 


הסבר:
task1 - משתמשת בפונקציה simple_function שמוגדרת בקובץ ה-DAG. הפונקציה שומרת ערך ב-xcom.
task2 - משתמשת בפונקציה חיצונית שמוגדרת בקובץ python_xcom_file.py ונקראת python_file_function. הפונקציה הזו לוקחת מ-xcom את הערך שנשמר ב-task1 ע"י xcom_pull וגם שומרת ערך ע"י xcom_push.
task3 - משתמשת בפונקציה print_xcom_value שמוגדרת גם היא בקובץ ה-DAG. הפונקציה לוקחת את הערך ש-task2 שמרה ב-xcom ומדפיסה אותו.

בטבלת ה-xcom נראה את כל הערכים:

רואים את הערכים ש-task1 ו-task2 הכניסו ל-xcom. כמו כן, רואים בתור return_value את הערכים ששתי המשימות החזירו.
אז ראינו איך להכניס ולקבל נתונים מ-xcom גם בפונקציות שמוגדרות בתוך קובץ ה-DAG וגם בפונקציות שמוגדרות בקבצים חיצוניים.

קביעת תנאים בהרצת משימות - conditional tasks

ישנן מספר דרכים להרצת משימות כתלות בתנאי מסוים. בדיפולט, התנאי להרצת משימה הוא שכל המשימות הקודמת אליה יסתיימו בהצלחה.

שימוש ב-trigger_rule

לכל משימה יש שדה אופציונאלי שנקרא trigger_rule. בשדה הזה ניתן להגדיר שמשימה תרוץ גם אם לא כל המשימות שלפניה הצליחו. האפשרויות הן:

  • all_success (default): All upstream tasks have succeeded

  • all_failed: All upstream tasks are in a failed or upstream_failed state

  • all_done: All upstream tasks are done with their execution

  • one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done)

  • one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)

  • none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped

  • none_failed_or_skipped: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.

  • none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a successfailed, or upstream_failed state

  • dummy: No dependencies at all, run this task at any time

דוגמה לשימוש בשדה הזה: 

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

import logging

 

with DAG(

    dag_id='trigger_rule_dag',

    start_date=datetime(2021, 4, 1),

) as dag:

 

    task1 = BashOperator(

        task_id='task1_name',

        bash_command='echo this is task1'

    )

 

    task2 = BashOperator(

        task_id='task2_name',

        trigger_rule='all_failed',

        bash_command='echo this is task2'

    )

 

    task3 = BashOperator(

        task_id='task3_name',

        bash_command='echo this is task3'

    )

 

    task4 = BashOperator(

          task_id='task4_name',

          trigger_rule='none_failed',

          bash_command='echo this is task4'

    )

    task1 >> [task2, task3] >> task4 

כפי שניתן לראות, task2 ו-task3 הן משימות מקביליות לאחר task1, כאשר task2 מותנת בכך ש-task1 נכשלה ואילו task3 מותנת בכך ש-task1 הצליחה. זו דוגמה למקרה נפוץ, שבו אנו צריכים לעשות משימות שונות כתלות בהצלחת המשימה הקודמת. 

אבל task4 צריכה לרוץ במקרה ש-task2 או task3 הצליחו ולכן ה-trigger_rule שלה הוא none_failed. שזה תנאי שמוודא שהמשימות הקודמות הצליחו או דולגו, אבל לא נכשלו.

הגרף של ה-DAG הזה יראה כך:

השדה trigger_rule הוא פשוט לשימוש ויכול להועיל במקרים מסויימים, אבל כשיש לנו צורך בתנאי מורכב יותר trigger_rule לא תמיד מספיק, ולכן יש לנו האופרטור BranchPythonOperator.

שימוש ב-BranchPythonOperator

האופרטור BranchPythonOperator מאפשר לכתוב בפונקציית פייתון תנאי שיקבע מה המשימות הבאות שירוצו. האופרטור הזה דומה ל-PythonOperator רק שבשדה python_callable הוא מקבל פונקציה של פייתון שמחזירה את שמות המשימות הבאות שירוצו, כך שלאחר האופרטור הזה המשימות שחזרו מהפונקציה שמוגדרת ב-python_callable ירוצו ושאר הפונקציות ידולגו. אפשר להשתמש ב-xcom בתוך התנאי וזה מאפשר ליצור תנאים שתלויים במשימות אחרות.

כרגיל, דוגמה תבהיר את העניין בצורה הכי טובה: 

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.operators.python import BranchPythonOperator

from datetime import datetime, timedelta

import logging

 

def branch_function(ti):

    xcom_value = int(ti.xcom_pull(task_ids='task1_name'))

    if xcom_value == 1:

        return 'task3_name'

    else:

        return 'task4_name'

 

with DAG(

    dag_id='branching_dag',

    schedule_interval=None,

    start_date=datetime(2021, 4, 1),

) as dag:

 

    task1 = BashOperator(

        task_id='task1_name',

        bash_command='echo 1'

    )

 

    task2 = BranchPythonOperator(

        task_id='task2_name',

        python_callable=branch_function

    )

 

    task3 = BashOperator(

        task_id='task3_name',

        bash_command='echo this is task3'

    )

 

    task4 = BashOperator(

          task_id='task4_name',

          trigger_rule='none_failed',

          bash_command='echo this is task4'

    )

    task1 >> task2 >> [task3, task4] 

בהתחלה יש לנו את task1 שהוא מאוד פשוט, וכמו שהסברנו לעיל לגבי xcom הערך האחרון ש-BashOperator מדפיס נכנס ל-xcom כ-return_value של אותה משימה. לכן בדוגמה הזו יכנס הערך 1. 



ב-task2 יש לנו BranchPythonOperator שמשתמש בפונקציה שנקראת branch_function שמוגדרת בתחילת הקוד. מה שהיא עושה זה לקרוא את הערך של ה-return_value של task1 ולפי זה להחליט האם המשימה הבאה תהיה task3 או task4. בדומה הזו קל לראות שהמשימה הבאה תהיה task3. ולכן נקבל גרף כזה:



אבל אם נשנה ב-task1 את הפקודה ל-

    bash_command='echo 2'

נראה ש-task4 היא זו שתרוץ ונקבל גרף כזה:


לסיכום, האופרטור BranchPythonOperator מאפשר גמישות גדולה לקביעת כל תנאי שאנו צריכים ב-DAG שלנו.


שימוש ב-ShortCircuitOperator 

אופרטור נוסף שיכול לעזור בקביעת תנאים הוא האופרטור ShortCircuitOperator. בדומה ל-BranchPythonOperator  גם באופרטור הזה בשדה python_callable הוא מקבל פונקציה. אם הפונקציה הזו מחזירה False ה-DAG ידלג על כל שאר המשימות שיש לאחר ה-ShortCircuitOperator ויסמן אותן כ-skipped. ואם הוא מחזיר True הוא לא ידלג אלא ימשיך במשימות הבאות כרגיל.

מספר המשימות שניתן להריץ במקביל - Airflow Pools

כדי להגביל את כמות המשימות שרצות במקביל משתמשים ב-pools. ניתן לגשת ל-Pools דרך ה-UI אם נלחץ על Admin->Pools. שם נראה משהו כזה:

רואים שיש לנו default_pool שיש לו 128 slots. בדיפולט כל המשימות חולקות את ה-pool הזה. כל פעם שרצה משימה היא מקבלת סלוט אחד. וכאשר המשימה מסתיימת הסלוט משתחרר. ערך הדיפולט הוא 128 וזה אומר שאפשר להריץ 128 משימות במקביל כל עוד לא נשנה את ערך ה-default_pool.

אם לא נשארו סלוטים פנויים, ויש משימות שצריכות לרוץ, המשימות הללו יכנסו לתור ויוצאו לפועל כל אחת בתורה ברגע שמתשחרר סלוט.

זה נכון לגבי כל המשימות שרצות במערכת ה-airflow, אבל יש מגבלה נוספת על כל DAG. אם לא נשנה את ערכי הדיפולט אז כל DAG יכול להריץ עד 16 משימות במקביל.

ניתן לשנות ערך זה בשתי דרכים: 

  • שימוש בשדה dag_concurrency בקונפיגורציה של מערכת ה-airflow. ואז ערך זה ישפיע על כל ה-DAGs במערכת
  • שימוש בשדה concurrency ביצירת ה-DAG, כדי להשפיע רק על DAG מסוים.

ב-UI דרך Admin->Pools ניתן לשנות את ערך ה-default_pool. אפשר גם להוסיף pools אחרים, לתת להם שם ולהשתמש בהם עבור משימות. כדי להגדיר שמשימה מסויימת משתמשת ב-pool מסוים שהוא לא ה-default_pool נשתמש בשדה שנקרא pool ביצירת המשימה. אם למשל יצרנו pool עבור שאילתות SQL וקראנו לו sql_pool נוכל להשתמש בו במשימת SQL בצורה הזו:

mysql_task = MySqlOperator(

    task_id='mysql_task',

    mysql_conn_id='mysql_default_conn',

    sql='./sample_sql.sql',

    pool='sql_pool'

) 

וכך המשימה הזו תשתמש ב-sql_pool ולא ב-default_pool. בצורה הזו נוכל לסדר את המערכת שלנו כך שלא יהיה מצב שיותר מדיי משימות רצות במקביל ועלולות לגרום למערכת לקרוס.


Airflow Sensor

כמו שכבר הזכרנו, סנסור זה סוג של אופרטור שנועד לחכות לאיזשהו אירוע חיצוני. למשל לקובץ או תיקייה שיגיעו למיקום מסוים, או לקרוא לאיזשהו שאילתת SQL עד שתנאי מסוים מתקיים. הסנסור בודק כל אינטרוול מסוים שאנו מגדירים האם התנאי התקיים ואם כן אז הסנסור הזה מסומן כ-succeed ומשימות שבאות אחריו ב-DAG ממשיכות לרוץ. בנוסף, אנחנו גם צריכים להגדיר עד מתי הסנסור הזה ממשיך לבדוק את מה שהגדרנו לו. נראה את זה עוד מעט ע"י שימוש בשדה timeout.

סנסורים פופולאריים לדוגמה:

  • FileSensor - מחכה לקובץ או תיקייה שיגיעו למיקום מסוים
  • SqlSensor - מריץ שאילתת SQL עד שתנאי מסוים מתקיים
  • ExternalTaskSensor - מחכה ש-DAG מסוים או משימה מסויימת יסתיימו בתאריך מסוים
  • DateTimeSensor - מחכה לזמן מסוים (תאריך ושעה)
  • TimeDeltaSensor - מחכה שזמן מסוים יעבור לאחר סיום המשימה הקודמת
  • S3KeySensor - מחכה ל-key מסוים שיופיע ב-bucket מסוים ב-S3
כל הסנסורים יורשים ממחלקת הבסיס BaseSensorOperator.
כדי לקבוע כל כמה זמן הסנסור יבדוק את מה שהוא אמור לבדוק אנו משתמשים בשדה poke_interval בהגדרת הסנסור. הוא מקבל ערך מסוג float שנמדד בשניות. למשל כדי שהסנסור יבדוק כל 20 שניות נכתוב:

wait_for_file = FileSensor(

    task_id='wait_for_file',

    fs_conn_id='fs_default',

    filepath='my_file_name.txt',

    poke_interval=20

) 

הדיפולט של poke_interval הוא 60 שניות.

בדוגמה הזו השתמשנו ב-FileSensor. השדה filepath צריך להכיל את שם הקובץ או התיקייה שלהם אנו מחכים. שדה חשוב נוסף בסנסור הזה הוא fs_conn_id. השדה הזה צריך להכיל את ה-connection_id למערכת שבה אנו רוצים לבדוק אם הקובץ קיים. ניתן להגדיר connection דרך ה-UI. לוחצים על Admin->Connections ושם מוסיפים הגדרות. לדוגמה, אם אנו רוצים לבדוק שהקובץ my_file_name.txt קיים בספריית ה-dags אז נגדיר את ה-connection שנקרא fd_default בצורה הבאה:


דוגמה מלאה לשימוש ב-FileSensor:


task1 = BashOperator(

task_id='task1_name',

bash_command='echo this is task1'

)


wait_for_file = FileSensor(

task_id='wait_for_file',

fs_conn_id='fs_default',

filepath='my_file_name.txt',

poke_interval=5

)


task2 = BashOperator(

task_id='task2_name',

bash_command='echo this is task2'

)


task1 >> wait_for_file >> task2

הגרף יראה כך:


רק ברגע ש-wait_for_file יגלה שהקובץ my_file_name.txt קיים בספריית dags הוא יסומן כ-success וה-DAG ימשיך להתקדם ל-task2.

כדי להגדיר עד מתי ירוץ הסנסור, נשתמש בשדה timeout. הערך שלו נמדד בשניות. ערך הדיפולט שלו הוא 7 ימים (604,800 בשניות). צריך לתת את הדעת לגבי קביעת ה-tiemout כי הוא יכול להציל אותנו ממה שנקרא sensor deadlock. מדובר על מצב שבו יש בעיה במערכת וסנסורים מסויימים מחכים המון זמן. כל סנסור תופס סלוט אחד כמו שלמדנו בפיסקה על airflow pool עד שמגיעים למצב שכל הסלוטים של ה-DAG תפוסים ואין לו יכולת להמשיך לרוץ.

לדוגמה, אם יש לנו DAG שלא שינינו בו את ה-timeout ולכן הוא 7 ימים. וגם לא שינינו בו את ה-concurrency ולכן הוא 16. ה-DAG הזה רץ כל יום והוא מתחיל ב-8 סנסורים שמחכים למשהו מסוים שיקרה. אבל כמו שאמרנו יש בבעיה במערכת והדבר שהם מחכים לו לא קורה. אז ביום הראשון נתפסים 8 סלוטים, ולא משוחררים. ביום השני נתפסים עוד 8 סלוטים, כך שכבר 16 סלוטים תפוסים. ביום השלישי כבר אין יותר סלוטים פנויים ל-DAG הזה והוא לא יכול לרוץ. רק בעוד 5 ימים יסתיים הזמן של ה-timeout והסנסורים יסגרו והסלוטים ישתחררו. עד אז אנחנו ב-sensor deadlock.

אופטימיזציה נוספת אפשרית לסנסור היא קביעת ה-mode שלו. יש שתי אפשרויות - poke או reschedule. הדיפולט הוא poke. ההסבר הוא פשוט, הרי סנסור ברוב הזמן לא עושה כלום. והשאלה היא האם בזמן הזה הוא משחרר את הסלוט או לא. ב-mode של poke הסנסור לא משחרר את הסלוט, ואילו ב-reschedule הוא משחרר את הסלוט וכל פעם שהוא צריך לבדוק את מה שהוא אמור לבדוק הוא תופס שוב סלוט. 

אם נשתמש ב-reschedule נוכל להימנע מ-sensor deadlock שהזכרנו לעיל. מצד שני, אם יש הרבה משימות עלול להיות מצב שכשהסנסור שלנו צריך לפעול לא יהיה לא סלוט פנוי ואז הוא יצטרך להמתין.

כדי להחליט איזה mode לבחור צריך לבחון את כל מערכת ה-airflow שלכם ואת ה-DAG הספציפי שהסנסור הזה שייך אליו. אבל אפשר להגיד כלל אצבע, שאם ה-poke_interval של הסנסור קצר יחסית, הגיוני להשאיר אותו ב-mode של poke.

דוגמה לשימוש ב-mode שהוא reschedule וב-timeout של שעה:

wait_for_file = FileSensor(

    task_id='wait_for_file',

    fs_conn_id='fs_default',

    poke_interval=200,

    timeout=60*60,

    mode='reschedule',

)


ב-UI ניתן לראות את הסנסורים שמוגדרים כ-reschedule תחת Browse->Task Reschedules.

ניתן להגדיר פונקציה שתיקרא במקרה שה-timeout של הסנסור הגיע. נשתמש בשדה on_failure_callback. לדוגמה:

def failure_callback(context):

    if isinstance(context['exception'], AirflowSensorTimeout):

        print(context)

        print("Sensor timed out")


with
 DAG(

    dag_id='simple_dag',

    start_date=days_ago(2),

as dag:


    wait_for_file = FileSensor(

        task_id='wait_for_file',

        poke_interval=200,

        timeout=60*60,

        mode='reschedule',

        on_failure_callback=failure_callback,

    )

בדוגמה הזו, במקרה שמגיע ה-timeout של הסנסור הפונקציה failure_callback תיקרא. הפונקציה מקבלת אובייקט שנקרא context שמכיל את ה-task_instance וממנו אפשר לקבל הרבה מידע על ה-task וכן על ה-DAG. במקרה הזה בדקנו למה שווה השדה exception שבתוך ה-context ואם הוא AirflowSensorTimeout זה אומר שהוא הגיע מאיזשהו סנסור. אם יש לנו כמה נצטרך לחשוב איך להבדיל ביניהם. אפשר פשוט שכל אחד יקרא לפונקציה אחרת.

דבר נוסף שאפשר להגדיר למקרה של timeout הוא soft_fail. השדה הזה קובע האם במקרה של timeout הסנסור יחשב כ-failed, או כ-skipped. אם הוא True אז במקרה של timeout הסנסור יחשב כ-skipped. בדיפולט הוא False ולכן הסנסור יחשב כ-failed. רק צריך לזכור שגם במקרה שמשימה היא skipped כל המשימות שאחריה לא ירוצו, אלא אם כן נשנה את ה-trigger_rule של המשימות הבאות.

עוד שדה שיכול לשפר את הסנסור שלנו הוא ה-exponential_backoff. אם השדה הזה שווה ל-True הוא גורם לכך שהזמן שעובר בין פעולות של הסנסור הולך וגדל. במקום שהוא יהיה קבוע, למשל בדוגמא לעיל קבענו שזה יהיה כל 200 שניות, הוא ילך ויגדל בכל פעם. לפעמים זה עוזר ולפעמים לא, כל מקרה צריך לבחון לגופו.


קצת על Security

באתר הרישמי ישנו עמוד שמדבר על security, והוא מחלק את הנושא הזה מספר נושאים. כאן נעסוק בקצרה בחלק מהנושאים הללו.

Access Control

ב-Airflow לכל משתמש יש role מסוים שקובע את ההרשאות שלו ב-webserver UI. ה-roles הדיפולטיביים לפי רמת ההרשאות שלהם הם:

  • Admin - all possible permissions
  • Op - User permissions + additional op permissions
  • User - Viewer permissions + additional user permissions
  • Viewer - limited viewer permissions
  • Public - no permissions

ניתן לראות ב-UI את ה-role של כל משתמש Security->List Users ואת כל ה-roles הקיימים וכל אחד מה ההרשאות שלו ב-Security->List Roles.

יש שני סוגים של permissions:

1. Resource-Based permissions - ההרשאות הם פר resource. וכל הרשאה מוגדרת ע"י resource+action. 

דוגמאות ל-resources הם - Dag, Task, Connection. 

ה-actions הם - can_create, can_read, can_edit, can_delete.

דוגמאות להרשאות:

DAGs.can_read
Connections.can_create
Pool.can_delete
Variables.can_edit


2. DAG-level permissions - לפעמים יש לנו DAGs מסוימים שאנחנו רוצים שלא יהיו נגישים לכולם. למשל כי הם מכילים מידע רגיש. או בגלל שההרצות שלהם ארוכות ואנחנו לא רוצים שמישהו בטעות יפריע להם באמצע ריצה. לצורך כך יש אפשרות לשלוט על ההרשאות פר DAG. 
קודם כל, מי שיש לו הרשאה כללית לכל ה-DAGs כמו DAGs.can_edit תהיה לו הרשאה לכל DAG.
כדי לתת הרשאה פר DAG, ה-resource שנשתמש כשניתן הרשאות יהיה DAG:dag_id. למשל, אם אנחנו רוצים לתת הרשאה עבור DAG שנקרא my_dag ההרשאה תיראה כך: DAG:my_dag.can_edit.

 Custome Roles

אפשר ליצור roles שיהיו מותאמים אישית לצרכים שלנו. כשיוצרים role אפשר להגביל אותו ל-DAGs מסוימים. כדי ליצור role אפשר להשתמש בעמוד Security->List Roles וזו הדרך הפשוטה. אפשר גם לעשות זאת מה-CLI.

הסתרת מידע סודי

Airflow מסתיר מידע סודי כמו למשל סיסמאות ע"י כך שהוא מחליף את המידע הסודי ב-***. למשל אם לא שינית את הסיסמה הדיפולטיבית אז הסיסמה שלך היא airflow, ולכן כל מקום שיהיה כתוב airflow בלוג יוחלף ב- ***. לדוגמה:

[2021-08-17 15:37:30,821] {logging_mixin.py:104} INFO - dir_path is: /opt/***/dags/my_code/pythonOperator_simple_dag/pythonFileName

אפשר לבטל את ההסתרה הזו ע"י קביעת הערך של hide_sensitive_var_conn_fields בקונפיגורציה ל-false.
Airflow גם יסתיר כל מידע שנמצא ב-Variables או בשדה Extra של Connections שהשם שלו מכיל את אחת המילים האלה:
'password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token'


התקנת תוכנות בתוך ה-container של airflow

לפעמים יש לנו צורך להתקין בתוך ה-container של airflow תוכנות נוספות. למשל, אם אנחנו רוצים להריץ פקודות של AWS CLI נצטרך להתקין awscli ואם אנו צריכים להריץ פקודות של kubectl נצטרך להתקין kubectl

כדי לעשות את זה, נריץ את airflow וניכנס לתוך ה-container של ה-worker. בתוך ה-container נתקין את התוכנות שאנו צריכים. ואז נצא מה-container. כדי ליצור image חדש שכולל את airflow יחד עם התוכנות שהתקנו בתוכו נשתמש בפקודה docker commit.

תיאור מפורט של התהליך הזה, סיכמתי פה.


התקנת Airflow בתוך kubernetes cluster ע"י Helm Chart

Helm הוא ה-package manager של Kubernetes שמאפשר יחסית בקלות להכניס תוכנות ל-Kubernetes cluster ע"י שימוש ב-helm chart.

במערכת שלי (ubuntu 18.04) ההתקנה של helm היתה ע"י הפקודה:

sudo snap install helm3

כרגיל בתוכנה, הרבה פעמים יש בעיות בלתי צפויות, שתמיד קורות דווקא לי... אז אם יש בעיות נסו את הפקודה הזו:

kubectl -n airflow get events --sort-by='{.lastTimestamp}'

היא אמורה לתת לכם רשימה של מה שקרה מאחורי הקלעים, ואולי תמצאו שם רמז למה גורם לבעיה שלכם.

באתר הרישמי ניתן לראות הסבר לגבי השימוש ב-helm עבור airflow. בגדול כל מה שצריך לעשות כדי להתקין את airflow ב-kubernetes cluster ע"י helm זה את הפקודות הבאות:

kubectl create namespace airflow

helm repo add apache-airflow https://airflow.apache.org

helm install airflow apache-airflow/airflow --namespace airflow

אישית לא יצא לי להנסות את זה ולכן אני רק מציין את האפשרות הזו, כיון שהיא חשובה ומאוד נוחה, אבל לצערי אין לי עדיין טיפים של ניסיון.


עד כאן להפעם, אולי בהמשך אמצא זמן להוסיף כאן נושאים נוספים. בהצלחה!

ו... אם אהבתם, תכתבו משהו למטה...