מדריך מקיף על Airflow




תוכן העניינים


מה זה Airflow?

Airflow הוא כלי שמאפשר ליצור, לתזמן ולנטר סידרה של משימות (tasks). בשפה המקצועית, סידרה של משימות נקראת workflow. 
יצירת workflow נעשית ע"י קוד פייתון. זה קצת מורכב בהתחלה אבל אחרי שלומדים את היסודות זה יחסית פשוט ומאפשר גמישות גדולה ליצירת workflows מורכבים.

לדוגמה, אם אנחנו מפעילים אתר של איחסון תמונות, וכדי לחסוך בשטח איחסון אנחנו רוצים לדחוס את התמונות שהמשתמשים מעלים. אז אנחנו נירצה ליצור workflow שפעם ביום עושה את הפעולות הבאות:


דוגמה לא הכי מוצלחת אבל הרעיון מובן. כל ריבוע זה task. כל task צריך לרוץ לאחר שה-task הקודם סיים. 
במבט ראשון נראה שזה דבר פשוט, ולא מובן למה צריך כלי עבור זה. אבל במציאות יש workflows הרבה יותר מורכבים וארוכים, שכוללים גם tasks שרצים לפי תנאי מסוים. 
ויש כמובן עוד הרבה צרכים ש-Airflow יעזור לנו בהם כמו למשל:
  • הרצת משימות במקביל
  • הרצת משימות כתלות בתנאי
  • תזמון ריצות לפי זמן או לפי טריגר חיצוני
  • הרצת workflow ב-cluster
  • קונפיגורציה עשירה
  • UI טוב ונוח
  • אפשרויות alerting שונות
  • מנגנון העברת מידע בין tasks
  • דיווחים במייל או בערוצים אחרים כמו Slack
ועוד הרבה ...
ב-Airflow ל-workflow קוראים DAG. זה ראשי תיבות של Directed Acyclic Graph.
זה שם מקובל בעולם הזה של כלים עבור workflows כיון שהוא מתאר בצורה יותר טובה למה הכוונה במילה workflow. נסביר עליו בפיסקה בנפרד.

קצת היסטוריה

הכלי פותח בחברת Airbnb החל משנת 2014. כבר מההתחלה הוא פותח כפרוייקט קוד פתוח. בשנת 2016 הפרוייקט הצטרף ל-Apache.

יש כלים מתחרים?

כמובן! ויש הרבה. 
המתחרים הראשיים נכון להיום הם Luigi ו-argo. ויש גם את prefect שהוא לא קוד פתוח לגמרי אלא open core, זה אומר שה-core הוא קוד פתוח, אבל יש תוספות בתשלום.
כאן, אפשר לראות רשימה של יותר מ-100! כלים נוספים.
כאן, כאן וכאן, אפשר לראות השוואות בין הכלים.

אז למה דווקא Airflow?

כשבאים להשוות בין הכלים השונים, צריך קודם כל לבדוק מה הצרכים שלנו ולראות איזה כלי יכול לספק לנו את הצרכים האלו. זה תהליך לא קל, כי זה אומר שצריך ללמוד כל כלי כדי להבין אם הוא מסוגל לעשות מה שאנו צריכים.
לכן לדעתי, צריך לעשות רשימה קצרה של מה שאנחנו חייבים שהכלי יעשה. ובלי זה הוא לא שימושי לנו. נבחר 3 כלים שנראים לנו מועמדים טובים ונבדוק אם הם תומכים ברשימת צרכי החובה שלנו. אם זו רשימה קצרה שמוגדרת טוב, הרבה פעמים אפשר לבדוק באינטרנט אם הכלים תומכים במה שאנו צריכים, עוד לפני שהתחלנו ללמוד את הכלי לעומק.
ברוב המקרים Airflow יהיה ברשימת המועמדים שלכם כיון שהוא ללא ספק הוא הכלי הפופולארי ביותר נכון להיום. 
יכולים להיות הרבה סיבות לבחור ב-Airflow אבל אחת הסיבות החשובות ביותר היא הפופולאריות שלו. פופולאריות היא פקטור חשוב ביותר כיון שתמיד ניתקל באינספור בעיות בשימוש בכלים כאלו. וככל שהכלי יותר פופולארי, יש יותר סיכוי לקבל עזרה באינטרנט. יש המון שאלות ותשובות ב-StackOverflow, המון מאמרים וקהילה עם עשרות אלפי אנשים בערוץ של Airflow ב-Slack שכל אחד יכול להיצטרף דרך פה. מניסיון יש שם המון אנשים טובים ששמחים לעזור.

מה זה DAG?

כפי שכתבתי לעיל, DAG זה ראשי תיבות של Directed Acyclic Graph. אז כמו שזה נשמע מדובר על workflow שהוא directed. זאת אומרת שיש כיוון זרימה מוגדר בין ה-tasks. ושהוא acyclic, שאין מעגליות בזרימה של הגרף. 
דוגמה לגרף כזה מויקיפדיה 
כפי שרואים, החצים מסמנים את כיוון הזרימה בין המשימות. ורואים שאין פה שום נתיב שהוא מעגלי.
אם נגדיר ב-Airflow, איזשהו DAG מעגלי, נקבל שגיאה כזו:

Broken DAG: [/opt/airflow/dags/simple_dag.py] Cycle detected in DAG. Faulty task: upload_compressed_images


הארכיטקטורה של Airflow

באתר הרישמי נוכל לראות את הארכיטקטורה:



Airflow בנוי ממספר רכיבים, להלן הסבר קצר עליהם:
scheduler - אחראי להפעיל workflows לפי התיזמון שלהם, ואחראי להכניס משימות ל-executor שמריץ אותן.
executor - אחראי על הרצת המשימות. בהתקנה רגילה של Airflow ה-executor מריץ הכל בתוך ה-scheduler. אבל במערכות אמיתיות שנועדו ל-production מוציאים את ה-executor החוצה למכונות אחרות שנקראות workers.
webserver - זה ה-UI שלנו. הוא מאפשר לנו לקבל הרבה מידע על ה-DAG, להריץ אותו בצורה יזומה (ולא דווקא מתוזמנת מראש) ולראות את ההתקדמות של הריצה ואת הלוגים.
DAG directory - ספריה שמכילה את קבצי ה-DAG שלנו. היא נקראת ע"י ה-scheduer וה-executor (וכל worker ששייך ל-executor)
metadata database - זהו ה-DB שמחזיק את כל ה-metadata שה-Airflow צריך כדי לשמור על המצב הנוכחי שלו. ה-scheduer, executor וה-webserver משתמשים ב-DB הזה.

הבסיס של Airflow: ה-Operators וה-Sensors 

ב-Airflow יש מושג שנקרא operator. זה בעצם שם נרדף למשימה.
כיון שיש הרבה מאוד סוגים של משימות, יש הרבה מאוד סוגי אופרטורים. 
דוגמאות לאופרטורים מאוד שימושיים:
PythonOperator - נועד להריץ פונקציות או קבצי פייתון
BashOperator - נועד להריץ פקודות או קבצי bash (קבצים עם סיומת sh)
DummyOperator - אופרטור שלא עושה כלום. אז למה צריך אותו? כדי לבנות את המבנה של ה-DAG בהתחלה ואחר כך מחליפים אותו לאופרטור הרצוי. הוא גם משמש הרבה לדוגמאות.
BranchPythonOperator - נועד לאפשר הכנסת תנאים ב-DAG. כשאנחנו צריכים לפי תנאי מסוים להחליט איזה משימה תהיה הבאה בתור.

כאן ניתן לראות את כל האופרטורים של Airflow. בנוסף, ישנם אופרטורים נוספים שיצרו חברות כמו Amazon ו-google, לטובת שימוש בשירותים שלהם. כדי להשתמש בהם צריך להתקין אותם על גבי airflow. כאן למשל יש הוראות איך להתקין את האופרטורים של Amazon.

בנוסף לאופרטורים יש ב-Airflow גם sensors. הם תת מחלקה של האופרטורים, והתפקיד שלהם הוא לחכות לאיזשהו אירוע חיצוני. למשל לחכות שיגיע קובץ מסיום, או שיגיע זמן מסוים. ברגע שהאירוע אליו הסנסור מחכה קרה, הסנסור הזה מסומן כ-succeed ומשימות שבאות אחריו ב-DAG ממשיכות לרוץ.

גם האופרטורים וגם הסנסורים יורשים מהמחלקה הבסיסית BaseOperator. בכל זאת נהוג לחלק אותם ל-2 סוגים כיון שהתפקידים שלהם שונים.

התחלת עבודה

אני עובד עם Airflow גירסה 2.1.2 על מכונת Linux (ubuntu18.04). אבל כיון שרוב העבודה תהיה בפייתון, אז רוב הדברים פה רלוונטים גם למערכות הפעלה אחרות.

התקנה

התקנה של Airlow יכולה להיות לוקאלית על המכונה שעליה עובדים, או ב-docker. את שתי האפשרויות אפשר לראות כאן. מניסיון, ההתקנה הלוקאלית די מורכבת ויש לפעמים בעיות בדרך. לכן אני משתמש בהתקנה ב-docker (מי שצריך הרחבה על docker ובמיוחד על docker-compose מוזמן לקרוא במדריך המקיף על docker). 
לא אפרט כאן את השלבים בצורה מדוייקת כיון שהם יכולים להשתנות במשך הזמן, לכן תמיד כדאי להשתמש באתר הרישמי.
בכל זאת נסביר בקצרה את השלבים:
  1. יוצרים ספרייה בשם airflow ויוצרים בתוכה שלוש ספריות dags, logs, plugins
  2. מורידים לתוך airflow את קובץ docker-compose.yaml של airflow (כפי שמופיע באתר הרישמי)
  3. נותנים הרשאות ל-container בדומה להרשאות של המכונה שלנו
  4. מריצים docker-compose up airflow-init ומחכים שהוא יסיים
  5. אפשר להריץ את airflow ע"י: docker-compose up

הרצה ועצירה של airflow

כדי להריץ את airflow נכתוב:

$ docker-compose up

כדי לעצור אותו נכתוב:

$ docker-compose down


נתחיל בדוגמה

כדי להבין נושא חדש תמיד טוב להתחיל בדוגמה פשוטה, ואז הרבה דברים מתבהרים.

נבנה DAG שיש בו 3 משימות פשוטות. ה-DAG וכל המשימות יהיו מוגדרים בקובץ אחד כיון שזה הדרך הפשוטה ביותר. בהמשך נראה איך ניתן להגדיר את ה-DAG בקובץ אחד והמשימות בקבצים אחרים, שזו הדרך שבה בפועל עובדים במקרים אמיתיים.

לשם הדוגמה, המשימות יהיו מאוד פשוטות ורק ידפיסו משהו על המסך. 

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.utils.dates import days_ago

 

with DAG(

    dag_id='simple_dag',

    start_date=days_ago(2),

) as dag:

 

    task1 = BashOperator(

        task_id='task1_name',

        bash_command='echo this is task1'

    )

 

    task2 = BashOperator(

        task_id='task2_name',

        bash_command='echo this is task2'

    )

 

    task3 = BashOperator(

        task_id='task3_name',

        bash_command='echo this is task3'

    )

 

    task1 >> task2 >> task3 


ככה נראה ה-DAG הכי פשוט, פחות או יותר.
הסבר:
בהתחלה אנחנו מייבאים ספריות של airflow שאנו צריכים.
לאחר מכן מגדירים את ה-DAG. בסוגריים אחרי המילה DAG אנחנו נותנים פרמטרים ל-constructor של ה-DAG. בדוגמה הזו נתנו 2 פרמטרים שהם המינימום ההכרחי. בלעדיהם תהיה שגיאה. אפשר לראות את כל הפרמטרים האפשריים כאן.
ה-dag_id הוא השם של ה-DAG. את השם הזה נראה בהמשך מופיע ב-UI.
ה-start_date הוא פרמטר שקובע החל ממתי ה-DAG הזה מתחיל לרוץ. בהמשך נראה שנוכל לתזמן שה-DAG ירוץ כל זמן מסוים ע"י הפרמטר schedule_interval, למשל כל יום. אבל אם ה-start_date עדיין לא הגיע אז ה-DAG עדיין לא ירוץ.
לאחר מכן מגדירים את המשימות. בדוגמה הזו השתמשנו לשלושת המשימות ב-BashOperator. הסבר על אופרטורים יגיע בהמשך בפיסקה נפרדת. 
כדי לבנות משימה אנחנו צריכים לתת ל-constructor של כל אופרטור task_id שהוא יהיה שם המשימה וכך היא תיקרא ב-UI. בנוסף לפי סוג האופרטור צריך לתת פרמטרים נוספים. במקרה הנוכחי ה-BashOperator צריך לקבל bash_command שהיא הפקודה שהמשימה תריץ ב-bash. בדוגמה הזו המשימות פשוט עושות echo שזו פקודה שמדפיסה טקסט.
בסיום יש סינטקס מוזר: 

task1 >> task2 >> task3 


זוהי כתיבה מקוצרת ב-airflow שבה מגדירים את כיוון הזרימה של ה-DAG. איזו משימה קוראת לאיזו משימה. בדוגמה הזו task1 קוראת ל-task2 שקוראת ל-task3.
ב-UI נוכל לראות את זה בצורה יותר יפה. אבל כמו שאמרנו ב-UI נראה את שמות המשימות (task1_name) ולא את שם האובייקט (task1).


אפשר גם להשתמש בחיצים האלו בכיוון ההפוך. למשל אם נשנה את כיוון הזרימה לדבר הבא
: 

task1 >> task3 << task2 


נקבל את ה-DAG הבא:

אגב, לא חייב להשתמש בכתיבה המקוצרת. אפשר במקום זה להשתמש בפונקציות set_downstream ו-set_upstream. למשל כדי ליצור את ה-DAG הראשון נכתוב
:

task1.set_downstream(task2)

task2.set_downstream(task3)


וכדי ליצור את ה-DAG השני נכתוב:

task1.set_downstream(task3)

task2.set_downstream(task3)


או:

task1.set_downstream(task3)

task3.set_upstream(task2)


תשתמשו במה שנראה לכם הכי קריא.

רואים שזה ממש פשוט להגדיר DAG, נכון? אז בואו נתחיל לסבך קצת :-)

הרצת משימות במקביל

בדוגמאות לעיל כבר ראינו הרצה של משימות במקביל. נסביר שוב ונוסיף עוד דרך.
כדי שמשימות ירוצו במקביל צריך פשוט שהם יהיו באותו מקום ב-DAG. למשל, אם משימה אחת קוראת לשתי משימות אז הן תרוצנה במקביל. למשל אם נכתוב:

task1.set_downstream(task2)

task1.set_downstream(task3)


 או בשיטה השניה:

task1 >> task2
task1 >> task3


נקבל:


ואפשר גם להשתמש ב-list כדי לקצר את הכתיבה:

task1.set_downstream([task2, task3])


או:

task1 >> [task2, task3]


מהצד השני, כדי שכמה משימות יתאחדו למשימה אחת נכתוב:

task1.set_downstream([task2, task3])
task2.set_downstream(task4)
task3.set_downstream(task4)


או:

task1 >> [task2, task3] >> task4





ממשק המשתמש - UI

נעשה עצירה קטנה בהסברים על DAG, ונסביר על ה-UI כדי שנדע איך להריץ ולבדוק את מה שאנחנו כותבים.
ה-UI מגיע מה-process של Airflow שנקרא webserver. אם אתם רוצים לגשת ל-UI שרץ על אותה מכונה שבה אתם עובדיםף פיתחו את הדפדפן וגשו לכתובת:
http://localhost:8080
אם מריצים את ה-webserver על מכונה אחרת מהמכונה שאתם עובדים עליה, תשתמשו ב-IP של המכונה במקום localhost.
שם המשתמש והסיסמה הדיפולטיביים הם airflow.
השימוש ב-UI מאוד קל ואינטואיטיבי ואני חושב שלא צריך להסביר עליו, פשוט תנסו. בכל זאת כמה הערות חשובות.
אם יש בעיה ב-DAG, יופיע ERROR בצבע אדום בחלק העליון בעמוד הראשי (העמוד שבו רואים את רשימת ה-DAGs). בלחיצה על שורת ה-ERROR נקבל עוד פרטים על הבעיה.
ניתן להריץ DAG מהעמוד הראשי או ללחוץ על DAG מסוים ואז לעבור לעמוד של ה-DAG וגם משם ניתן להריץ אותו ע"י לחיצה על כפתור play ואז Trigger DAG.
כדי שיהיה אפשר להריץ DAG צריך להעביר אותו למצב פעיל במסך הראשי.
אם ננסה להריץ את ה-DAG כשהוא לא פעיל אלא במצב של pause הוא לא יתחיל לרוץ. אבל ברגע שנעביר אותו למצב פעיל, כיון שכבר לחצנו על play הוא יתחיל לרוץ באותו רגע. 
כשנמצאים בעמוד של DAG מסוים תחת הלשונית Tree View נראה משהו כזה:

כל עיגול מסמל DAG וכל ריבוע מסמל task. בלחיצה עליהם נפתח חלון עם הרבה אפשרויות, והכי חשוב יש שם כפתור Log שמאפשר לראות את הלוג שלא אותה משימה.
ה-UI מאפשר גישה להרבה דברים נוספים כמו configuration, pools, variables ועוד. בפיסקה שעוסקת בכל נושא נתייחס גם ל-UI הרלוונטי.

טיפים נוספים: 
  • אם אתה מנסה להריץ DAG מסוים ונראה שהוא תקוע ולא מתקדם, לך לעמוד הראשי שבו יש את רשימת כל ה-DAGs ותרענן אותו. בדרך כלל תמצא שם ERROR ב-DAG עצמו שגורם ל-DAG לא לרוץ.
  • אם בעמוד הראשי מופיע ERROR לא ברור עם המילים המוזרות bad magic number, לדוגמה:
Broken DAG: [/opt/airflow/dags/dag_file_name.py] Traceback (most recent call last): File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "/opt/airflow/dags/dag_file_name.py", line 12, in <module> from someFile import someClass ImportError: bad magic number in 'someFile.someClass': b'\x03\xf3\r\n'

זה בד"כ אומר שעשיתם שינוי וה-Airflow עוד לא הספיק להתעדכן. תנו לו עוד 5 שניות ותרעננו את הדף והשגיאה תעלם.

תזמון משימות

על תזמון המשימות אחראי הרכיב שנקרא scheduler. הוא זה שבודק מה הגדרנו ב-DAG ומריץ את המשימות על ה-worker בזמן הנכון.
השדות שמשפיעים על התזמון נמצאים גם בהגדרת ה-DAG וגם בהגדרת ה-task.

שדות של DAG:

start_date - שדה זה קובע מאיזה תאריך ה-DAG הזה רשאי להתחיל לרוץ. למשל: כדי לקבוע שהוא יתחיל לרוץ החל מה-01/04/2021 נכתוב:

start_date=datetime(2021, 4, 1)


נעשה פה שימוש בספריית datetime של פייתון. אפשר גם להשתמש בספריית airflow.utils.dates. בהתחלה נייבא אותה:

from airflow.utils.dates import days_ago


ואז נוכל להשתמש כך:

start_date=days_ago(2)


end_date - עד איזה תאריך ה-DAG הזה צריך לרוץ. אם לא משתמשים בו הוא לא יפסיק וימשיך לרוץ כל סבב שמגיע הזמן שלו.

schedule_interval - מגדיר כל כמה זמן ה-DAG ירוץ. הוא יכול לקבל cron expression בצורה של string או datetime.timedelta. איך משתמשים ב-timedelta אפשר לראות כאן. נראה דוגמה איך להשתמש ב-cron expression. הביטוי מורכב משש או שבע יחידות בסדר הבא (משמאל לימין):
seconds minutes hours day_of_mounth month day_of_week year
אם למשל נכתוב ביטוי כזה:
0 30 14 ? * * *
זה אומר שה-DAG צריך לרוץ בשעה 14:30 כל יום כל הזמן.
הסימן כוכבית מייצג - כל ערך אפשרי, והסימן שאלה מייצג - "לא ערך ספציפי" והוא מותר לשימוש רק ב-day_of_mounth ו-day_of_week. אסור ששני השדות האלו יהיו שניהם באותו ביטוי כוכבית או שניהם סימן שאלה.
כדי לראות הסבר שלם לעניין הזה מומלץ לקרוא כאן וכדי לבנות ביטוי נכון ולבדוק אותו מומלץ להשתמש באתר הזה. דוגמה לשימוש:

schedule_interval="0 30 14 * * *",


דרך מקובלת נוספת היא להשתמש בביטויים שמוגדרים ב-airflow מראש כמו למשל:
@daily, @hourly, @weekly
ניתן לראות תיאור מדוייק שלהם כאן. דוגמה לשימוש:

schedule_interval='@daily'


dagrun_timeout כמה זמן ריצה מוקצה ל-DAG. אם זמן הריצה יהיה מעבר לזמן שהוגדר הוא ייכשל.

שדות של task

execution_timeout - כמה זמן ריצה מוקצה למשימה. אם זמן הריצה שלה יהיה מעבר לזמן שהוגדר היא תיכשל.

retries - מספר הפעמים שניתן להריץ שוב את ה-task לפני שהוא יחשב כ-failed.

retry_delay - כמה זמן לחכות בין ניסיונות ריצה של task.

retry_exponential_backoff - גורם לזמן בין נסיונות להתארך בכל פעם.

start_date - שדה זה קובע מאיזה תאריך המשימה תרוץ בפעם הראשונה. בד"כ משתמשים בשדה start_date רק ב-DAG אבל אם יש צורך, יש גם למשימות שדה כזה.

end_date - עד איזה תאריך מותר למשימה הזו לרוץ.


קונפיגורציה

ל-Airflow יש קובץ קונפיגורציה שנקרא airflow.cfg. הוא נוצר בהרצה הראשונית של airflow והוא נמצא בספרייה הראשית של airflow (בד"כ airflow/~). 

הקובץ הזה מחולק לנושאים שונים. כל נושא מוקף בסוגריים מרובועות למשל [core] או [logging] או [webserver], ותחתיו כל ההגדרות שקשורות לאותו נושא. כל אפשרויות הקונפיגורציה נמצאות כאן עם הסברים.

דרך נוספת לקנפג את airflow היא באמצעות משתני סביבה - environment varibles. לכל הגדרה ב-airflow.cfg יש משתנה סביבה מתאים. יש שיטה פשוטה לדעת מה שם משתנה הסביבה המתאים לכל הגדרת קונפיגורציה. כל משתנה מתחיל במילה AIRFLOW ואז קו תחתון פעמיים. לאחר מכן שם הנושא (core/webserver/etc) אליו הוא שייך ושוב קו תחתון פעמיים, ואז השם המופיע בקובץ airflow.cfg באותיות גדולות.

דוגמה תבהיר את זה בקלות. ניקח למשל את הגדרת הקונפיגורציה שנקראת dags_folder ששם מגדירים באיזה ספריה ה-DAGs שלנו נמצאים. היא נמצאת תחת הנושא [core]. משתנה הסביבה של הגדרה זו הוא:

AIRFLOW__CORE__DAGS_FOLDER

תעברו רגע על השם הזה ועל ההסבר לעיל ותיראו שזה מתאים.

כשמריצים את airflow ע"י docker לא תהיה השפעה של airflow.cfg החיצוני (אני מעריך שיש דרך לגרום לקובץ הקונפיגורציה להשפיע אבל אני לא מצאתי). לכן צריך להשתמש במשתני הסביבה בתוך ה-docker-compose.yaml. הקובץ הזה נמצא בד"כ בתוך ספריית airflow ואפשר גם לראות אותו כאן. הוא מגיע כבר עם כמה הגדרות מראש למשל:

AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'

AIRFLOW__CORE__LOAD_EXAMPLES: 'true'

באותה צורה נוכל להוסיף משתנים שאנו רוצים לשנות מהדיפולט שלהם. קודם נעצור את המערכת עם
$ docker-compose down
נעשה את השינויים ב-docker-compose.yaml ונרים את המערכת:
$ docker-compose up
אפשר לראות את הקונפיגורציה ב-UI תחת Admin->Configurations. 
בהתחלה לא תיראו שם את הקונפיגורציה כי בדיפולט זה סגור מטעמי אבטחה. כדי לאפשר את זה צריך להגדיר את משתנה הסביבה הבא::

AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'

החלק העליון של העמוד הזה מראה את קובץ ה-airflow.cfg והחלק התחתון (צריך לגלול הרבה למטה) מראה את הקונפיגורציה בפועל, ז"א לאחר שמתחשבים גם במשתני הסביבה.
מבחינת סדרי העדיפויות, airflow קודם כל מתחשב במשתני הסביבה, לאחר מכן בקובץ airflow.cfg ובסוף בערכי הדיפולט.
ניתן גם לבדוק ערכי קונפיגורציה ע"י פקודות CLI של Airflow. למשל, כדי לקבל את הערך של load_examplesמהחלק של core נכתוב:

airflow config get-value core load_examples

ואם airflow רץ בתוך docker אז נריץ את הפקודה בתוך ה-container בצורה הבאה:

docker exec -it airflow_airflow-worker_1 airflow config get-value core load_examples

עוד לגבי CLI בפיסקה הבאה.

שימוש ב-CLI

ל-Airflow יש סט פקודות CLI עשיר. אפשר לראות את כולם פה.

כשמריצים את airflow לוקאלית (לא בתוך container) אפשר להשתמש ישירות ב-CLI. כמה דוגמאות לפקודות בסיסיות שגם די ברור מה הן עושות:

airflow info

airflow dags list

airflow dags trigger dag_id

אבל כאשר airflow רת בתוך docker container צריך להריץ את הפקודות הללו בתוך ה-worker container. יש שתי אפשרויות איך לעשות את זה.

1. להיכנס לתוך ה-container:

docker exec -it airflow_airflow-worker_1 bash


ואז להריץ את הפקודות, למשל כדי להריץ DAG מסוים:

default@78bed324a0cc:/opt/airflow$ airflow dags trigger my_simple_dag


2. להריץ את הפקודות ישירות בתוך ה-container בפקודה אחת, למשל כדי להריץ DAG מסוים:

docker exec -it airflow_airflow-worker_1 airflow dags trigger my_simple_dag


שימוש ב-REST API

התיעוד המלא ל-REAST API של Airflow נמצא כאן.

נראה כמה דוגמאות.

כדי לקבל רשימה של כל ה-DAGs נרשום:

curl GET 'http://localhost:8080/api/v1/dags' -H 'Content-Type: application/json' --user "airflow:airflow"

בחלק האחרון של ה user-- צריך לתת username:password שלנו ב-airflow. כמו שהיזכרנו, הדיפולט הוא airflow:airflow. 

הדוגמה הזו טובה כשמריצים את הפקודה על אותה מכונה שבה נמצא ה-airflow. אם מדובר על מכונה אחרת צריך לשנות localhost ל-ip של המכונה.

כדי לקבל פרטים על DAG ספציפי (בדוגמה הזו my_dag_name):

curl GET 'http://localhost:8080/api/v1/dags/my_dag_name' -H 'Content-Type: application/json' --user "airflow:airflow"

כדי להריץ DAG  (בדוגמה הזו my_dag_name):

curl POST 'http://localhost:8080/api/v1/dags/my_dag_name/dagRuns' -H 'Content-Type: application/json' --user "airflow:airflow" -d '{}'


לוגים

הלוגים ב-airflow נמצאים תחת airflow/logs ומסודרים באופן הבא:

airflow/logs/dag_id/task_id/date/incremental_number.log 

אם למשל שם ה-DAG הוא simple_dag ושם המשימה הראשונה הוא task1 אז הלוג שלה יהיה ב:

airflow/logs/simple_dag/task1/2021-08-23T07:00:00+00:00/1.log

דרך הקונפיגורציה, בחלק של [logging] אפשר לשנות הרבה דברים וביניהם את ה-path של הלוגים ואת הפורמט של שם הלוג. 

דרך פשוטה יותר לגשת ללוגים זה בעזרת ה-UI. פשוט לוחצים על הריבוע שמייצג את המשימה:



ואז נפתח חלון שבחלק העליון שלו יש כפתור של Log. 



לחיצה עליו תציג לנו את הלוג.

ב-Airflow יש גם תמיכה בכתיבת לוגים לענן (AWS, google cloud, Azure).

לגבי לוגים של פייתון, אם נשתמש בפקודה print אנחנו נראה שהטקסט הגיע מקובץ בשם logging_mixin.py.

למשל:

[2021-08-22 11:48:37,829] {logging_mixin.py:104} INFO - my print text

אבל אם נשתמש בספריית logging של פייתון הלוג יציין את שם הקובץ שכתב ללוג:

import logging


LOGGER = logging.getLogger("airflow.task")

LOGGER.info("example of log line")


העברת פרמטרים בין משימות - XCOM

בעיקרון, airflow תוכנן כך שכל משימה אמורה להיות בפני עצמה, ולכן אם אפשר עדיף להימנע מהעברת מידע בין משימות. עם זאת, יש מנגנון שנקרא xcom שנועד לאפשר העברת מידע בין משימות. שימו לב ש-xcom מיועד להעברת כמויות מידע קטנות, ולא גדולות, אז לא מומלץ לשים בו אובייקטים גדולים.

מדובר על רשימה של ערכים שלכל אחד יש שישה שדות:

key, value, timestamp, execution date, task id, Dag id

אפשר לראות את הרשימה ב-UI, אם נלחץ על Admin->Xcoms.


בד"כ רוב השימוש ב-xcom יהיה ב-PythonOperator כיון שבו אפשר להשתמש בפונקציה xcom_push כדי להכניס נתונים ל-xcom ובפונקציה xcom_pull כדי לקבל נתונים מ-xcom. 
בנוסף, ה-return value של כל משימה גם נשמר ב-xcom. אם מדובר על PythonOperator אז ישמר הערך שמוחזר מהפונקציה שאליה קראנו. ואם מדובר על BashOperator ישמר הערך האחרון שהודפס.
הדוגמה הבאה מדגימה את ה-return_value:

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.operators.python import PythonOperator, PythonVirtualenvOperator

from airflow.utils.dates import days_ago

 

args = {

    'owner': 'airflow',

}

 

with DAG(

    dag_id='simple_python_dag',

    default_args=args,

    schedule_interval=None,

    start_date=days_ago(2),

    tags=['example'],

) as dag:

 

    def task1_function():

        print('This print is from task 1')

        return 'return from task1'

 

    def task2_function():

        print('This print is from task 2')

 

    task1 = PythonOperator(

        task_id='task_1',

        python_callable=task1_function,

    )

 

    task2 = PythonOperator(

        task_id='task_2',

        python_callable=task2_function,

    )

   

    task3 = BashOperator(

        task_id='task3',

        bash_command='echo This print is from task3'

    )

 

task1 >> task2 >> task3

המשימה task_1 היא PythonOperator והיא מחזירה ערך, ולכן הערך הזה יהיה ב-xcom.

המשימה task_2 היא PythonOperator והיא לא מחזירה ערך, ולכן לא נראה return_value מהמשימה הזו ב-xcom.

המשימה task_3 היא BashOperator והיא מדפיסה משהו למסך, ולכן מה שמודפס גם יהיה ב-xcom.

כך נראת טבלת ה-xcom לאחר הרצת ה-DAG הזה:

אז אם משתמשים ב-PythonOperator ניתן לשמור כמה ערכים ב-xcom. אבל אם משתמשים ב-BashOperator יש רק ערך אחד שנשמר והוא יהיה ה-return_value.

עד כאן זה פשוט כיון שהשתמשנו בפונקציית פייתון שמוגדרת בתוך קובץ ה-DAG. עכשיו נראה איך משתמשים ב-xcom כשה-PythonOperator קורא לפונקציה שנמצאת בקובץ אחר ולא בקובץ ה-DAG.

שימוש ב-xcom מתוך קובץ חיצוני (לא מתוך קובץ ה-DAG)

דבר ראשון, צריך להוסיף למשימה שמשתמשת בקובץ חיצוני את השדה provide_context=True, בצורה הבאה:

task1 = PythonOperator(

    task_id='task_1',

    python_callable=python_file_function,

    provide_context=True

)


השדה הזה גורם לכך ש-airflow מעביר סט של keyword arguments שאפשר להשתמש בהם בקובץ החיצוני. וכדי להשתמש בסט הזה צריך להוסיף בפונקציה שבקובץ החיצוני פרמטר kwargs** כקלט.

ניראה דוגמה והדברים יתבהרו. קובץ ה-DAG שלנו יראה כך:
 

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.utils.dates import days_ago

from python_xcom_file import python_file_function

 

with DAG(

    dag_id='xcom_exteranl_python_dag',

    schedule_interval=None,

    start_date=days_ago(2),

) as dag:

 

    def simple_function(ti):

        print('task1: print from task 1')

        ti.xcom_push(key='task1_string', value='this string from task1')

        return 'return from task 1'

 

    def print_xcom_value(ti):

        print('task3: value from xcom pull: ' + ti.xcom_pull(task_ids='task2', key='task2_string'))

 

 

    task1 = PythonOperator(

        task_id='task1',

        python_callable=simple_function,

    )

 

    task2 = PythonOperator(

        task_id='task2',

        python_callable=python_file_function,

        provide_context=True

    )

 

    task3 = PythonOperator(

        task_id='task3',

        python_callable=print_xcom_value,

        provide_context=True

    )

 

 

task1 >> task2 >> task3 


והקובץ פייתון החיצוני, python_xcom_file.py, יראה כך:

def python_file_function(**kwargs):
    kwargs['ti'].xcom_push(key='task2_string', value='this string from task2')
    print('task2: value from xcom pull task1_string: '
           kwargs[
'ti'].xcom_pull(task_ids='task1', key='task1_string'))
    return 0
 


הסבר:
task1 - משתמשת בפונקציה simple_function שמוגדרת בקובץ ה-DAG. הפונקציה שומרת ערך ב-xcom.
task2 - משתמשת בפונקציה חיצונית שמוגדרת בקובץ python_xcom_file.py ונקראת python_file_function. הפונקציה הזו לוקחת מ-xcom את הערך שנשמר ב-task1 ע"י xcom_pull וגם שומרת ערך ע"י xcom_push.
task3 - משתמשת בפונקציה print_xcom_value שמוגדרת גם היא בקובץ ה-DAG. הפונקציה לוקחת את הערך ש-task2 שמרה ב-xcom ומדפיסה אותו.

בטבלת ה-xcom נראה את כל הערכים:

רואים את הערכים ש-task1 ו-task2 הכניסו ל-xcom. כמו כן, רואים בתור return_value את הערכים ששתי המשימות החזירו.
אז ראינו איך להכניס ולקבל נתונים מ-xcom גם בפונקציות שמוגדרות בתוך קובץ ה-DAG וגם בפונקציות שמוגדרות בקבצים חיצוניים.

קביעת תנאים בהרצת משימות - conditional tasks

ישנן מספר דרכים להרצת משימות כתלות בתנאי מסוים. בדיפולט, התנאי להרצת משימה הוא שכל המשימות הקודמת אליה יסתיימו בהצלחה.

שימוש ב-trigger_rule

לכל משימה יש שדה אופציונאלי שנקרא trigger_rule. בשדה הזה ניתן להגדיר שמשימה תרוץ גם אם לא כל המשימות שלפניה הצליחו. האפשרויות הן:

  • all_success (default): All upstream tasks have succeeded

  • all_failed: All upstream tasks are in a failed or upstream_failed state

  • all_done: All upstream tasks are done with their execution

  • one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done)

  • one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)

  • none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped

  • none_failed_or_skipped: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.

  • none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a successfailed, or upstream_failed state

  • dummy: No dependencies at all, run this task at any time

דוגמה לשימוש בשדה הזה: 

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

import logging

 

with DAG(

    dag_id='trigger_rule_dag',

    start_date=datetime(2021, 4, 1),

) as dag:

 

    task1 = BashOperator(

        task_id='task1_name',

        bash_command='echo this is task1'

    )

 

    task2 = BashOperator(

        task_id='task2_name',

        trigger_rule='all_failed',

        bash_command='echo this is task2'

    )

 

    task3 = BashOperator(

        task_id='task3_name',

        bash_command='echo this is task3'

    )

 

    task4 = BashOperator(

          task_id='task4_name',

          trigger_rule='none_failed',

          bash_command='echo this is task4'

    )

    task1 >> [task2, task3] >> task4 

כפי שניתן לראות, task2 ו-task3 הן משימות מקביליות לאחר task1, כאשר task2 מותנת בכך ש-task1 נכשלה ואילו task3 מותנת בכך ש-task1 הצליחה. זו דוגמה למקרה נפוץ, שבו אנו צריכים לעשות משימות שונות כתלות בהצלחת המשימה הקודמת. 

אבל task4 צריכה לרוץ במקרה ש-task2 או task3 הצליחו ולכן ה-trigger_rule שלה הוא none_failed. שזה תנאי שמוודא שהמשימות הקודמות הצליחו או דולגו, אבל לא נכשלו.

הגרף של ה-DAG הזה יראה כך:

השדה trigger_rule הוא פשוט לשימוש ויכול להועיל במקרים מסויימים, אבל כשיש לנו צורך בתנאי מורכב יותר trigger_rule לא תמיד מספיק, ולכן יש לנו האופרטור BranchPythonOperator.

שימוש ב-BranchPythonOperator

האופרטור BranchPythonOperator מאפשר לכתוב בפונקציית פייתון תנאי שיקבע מה המשימות הבאות שירוצו. האופרטור הזה דומה ל-PythonOperator רק שבשדה python_callable הוא מקבל פונקציה של פייתון שמחזירה את שמות המשימות הבאות שירוצו, כך שלאחר האופרטור הזה המשימות שחזרו מהפונקציה שמוגדרת ב-python_callable ירוצו ושאר הפונקציות ידולגו. אפשר להשתמש ב-xcom בתוך התנאי וזה מאפשר ליצור תנאים שתלויים במשימות אחרות.

כרגיל, דוגמה תבהיר את העניין בצורה הכי טובה: 

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.operators.python import BranchPythonOperator

from datetime import datetime, timedelta

import logging

 

def branch_function(ti):

    xcom_value = int(ti.xcom_pull(task_ids='task1_name'))

    if xcom_value == 1:

        return 'task3_name'

    else:

        return 'task4_name'

 

with DAG(

    dag_id='branching_dag',

    schedule_interval=None,

    start_date=datetime(2021, 4, 1),

) as dag:

 

    task1 = BashOperator(

        task_id='task1_name',

        bash_command='echo 1'

    )

 

    task2 = BranchPythonOperator(

        task_id='task2_name',

        python_callable=branch_function

    )

 

    task3 = BashOperator(

        task_id='task3_name',

        bash_command='echo this is task3'

    )

 

    task4 = BashOperator(

          task_id='task4_name',

          trigger_rule='none_failed',

          bash_command='echo this is task4'

    )

    task1 >> task2 >> [task3, task4] 

בהתחלה יש לנו את task1 שהוא מאוד פשוט, וכמו שהסברנו לעיל לגבי xcom הערך האחרון ש-BashOperator מדפיס נכנס ל-xcom כ-return_value של אותה משימה. לכן בדוגמה הזו יכנס הערך 1. 



ב-task2 יש לנו BranchPythonOperator שמשתמש בפונקציה שנקראת branch_function שמוגדרת בתחילת הקוד. מה שהיא עושה זה לקרוא את הערך של ה-return_value של task1 ולפי זה להחליט האם המשימה הבאה תהיה task3 או task4. בדומה הזו קל לראות שהמשימה הבאה תהיה task3. ולכן נקבל גרף כזה:



אבל אם נשנה ב-task1 את הפקודה ל-

    bash_command='echo 2'

נראה ש-task4 היא זו שתרוץ ונקבל גרף כזה:


לסיכום, האופרטור BranchPythonOperator מאפשר גמישות גדולה לקביעת כל תנאי שאנו צריכים ב-DAG שלנו.


שימוש ב-ShortCircuitOperator 

אופרטור נוסף שיכול לעזור בקביעת תנאים הוא האופרטור ShortCircuitOperator. בדומה ל-BranchPythonOperator  גם באופרטור הזה בשדה python_callable הוא מקבל פונקציה. אם הפונקציה הזו מחזירה False ה-DAG ידלג על כל שאר המשימות שיש לאחר ה-ShortCircuitOperator ויסמן אותן כ-skipped. ואם הוא מחזיר True הוא לא ידלג אלא ימשיך במשימות הבאות כרגיל.

מספר המשימות שניתן להריץ במקביל - Airflow Pools

כדי להגביל את כמות המשימות שרצות במקביל משתמשים ב-pools. ניתן לגשת ל-Pools דרך ה-UI אם נלחץ על Admin->Pools. שם נראה משהו כזה:

רואים שיש לנו default_pool שיש לו 128 slots. בדיפולט כל המשימות חולקות את ה-pool הזה. כל פעם שרצה משימה היא מקבלת סלוט אחד. וכאשר המשימה מסתיימת הסלוט משתחרר. ערך הדיפולט הוא 128 וזה אומר שאפשר להריץ 128 משימות במקביל כל עוד לא נשנה את ערך ה-default_pool.

אם לא נשארו סלוטים פנויים, ויש משימות שצריכות לרוץ, המשימות הללו יכנסו לתור ויוצאו לפועל כל אחת בתורה ברגע שמתשחרר סלוט.

זה נכון לגבי כל המשימות שרצות במערכת ה-airflow, אבל יש מגבלה נוספת על כל DAG. אם לא נשנה את ערכי הדיפולט אז כל DAG יכול להריץ עד 16 משימות במקביל.

ניתן לשנות ערך זה בשתי דרכים: 

  • שימוש בשדה dag_concurrency בקונפיגורציה של מערכת ה-airflow. ואז ערך זה ישפיע על כל ה-DAGs במערכת
  • שימוש בשדה concurrency ביצירת ה-DAG, כדי להשפיע רק על DAG מסוים.

ב-UI דרך Admin->Pools ניתן לשנות את ערך ה-default_pool. אפשר גם להוסיף pools אחרים, לתת להם שם ולהשתמש בהם עבור משימות. כדי להגדיר שמשימה מסויימת משתמשת ב-pool מסוים שהוא לא ה-default_pool נשתמש בשדה שנקרא pool ביצירת המשימה. אם למשל יצרנו pool עבור שאילתות SQL וקראנו לו sql_pool נוכל להשתמש בו במשימת SQL בצורה הזו:

mysql_task = MySqlOperator(

    task_id='mysql_task',

    mysql_conn_id='mysql_default_conn',

    sql='./sample_sql.sql',

    pool='sql_pool'

) 

וכך המשימה הזו תשתמש ב-sql_pool ולא ב-default_pool. בצורה הזו נוכל לסדר את המערכת שלנו כך שלא יהיה מצב שיותר מדיי משימות רצות במקביל ועלולות לגרום למערכת לקרוס.


Airflow Sensor

כמו שכבר הזכרנו, סנסור זה סוג של אופרטור שנועד לחכות לאיזשהו אירוע חיצוני. למשל לקובץ או תיקייה שיגיעו למיקום מסוים, או לקרוא לאיזשהו שאילתת SQL עד שתנאי מסוים מתקיים. הסנסור בודק כל אינטרוול מסוים שאנו מגדירים האם התנאי התקיים ואם כן אז הסנסור הזה מסומן כ-succeed ומשימות שבאות אחריו ב-DAG ממשיכות לרוץ. בנוסף, אנחנו גם צריכים להגדיר עד מתי הסנסור הזה ממשיך לבדוק את מה שהגדרנו לו. נראה את זה עוד מעט ע"י שימוש בשדה timeout.

סנסורים פופולאריים לדוגמה:

  • FileSensor - מחכה לקובץ או תיקייה שיגיעו למיקום מסוים
  • SqlSensor - מריץ שאילתת SQL עד שתנאי מסוים מתקיים
  • ExternalTaskSensor - מחכה ש-DAG מסוים או משימה מסויימת יסתיימו בתאריך מסוים
  • DateTimeSensor - מחכה לזמן מסוים (תאריך ושעה)
  • TimeDeltaSensor - מחכה שזמן מסוים יעבור לאחר סיום המשימה הקודמת
  • S3KeySensor - מחכה ל-key מסוים שיופיע ב-bucket מסוים ב-S3
כל הסנסורים יורשים ממחלקת הבסיס BaseSensorOperator.
כדי לקבוע כל כמה זמן הסנסור יבדוק את מה שהוא אמור לבדוק אנו משתמשים בשדה poke_interval בהגדרת הסנסור. הוא מקבל ערך מסוג float שנמדד בשניות. למשל כדי שהסנסור יבדוק כל 20 שניות נכתוב:

wait_for_file = FileSensor(

    task_id='wait_for_file',

    fs_conn_id='fs_default',

    filepath='my_file_name.txt',

    poke_interval=20

) 

הדיפולט של poke_interval הוא 60 שניות.

בדוגמה הזו השתמשנו ב-FileSensor. השדה filepath צריך להכיל את שם הקובץ או התיקייה שלהם אנו מחכים. שדה חשוב נוסף בסנסור הזה הוא fs_conn_id. השדה הזה צריך להכיל את ה-connection_id למערכת שבה אנו רוצים לבדוק אם הקובץ קיים. ניתן להגדיר connection דרך ה-UI. לוחצים על Admin->Connections ושם מוסיפים הגדרות. לדוגמה, אם אנו רוצים לבדוק שהקובץ my_file_name.txt קיים בספריית ה-dags אז נגדיר את ה-connection שנקרא fd_default בצורה הבאה:


דוגמה מלאה לשימוש ב-FileSensor:


task1 = BashOperator(

task_id='task1_name',

bash_command='echo this is task1'

)


wait_for_file = FileSensor(

task_id='wait_for_file',

fs_conn_id='fs_default',

filepath='my_file_name.txt',

poke_interval=5

)


task2 = BashOperator(

task_id='task2_name',

bash_command='echo this is task2'

)


task1 >> wait_for_file >> task2

הגרף יראה כך:


רק ברגע ש-wait_for_file יגלה שהקובץ my_file_name.txt קיים בספריית dags הוא יסומן כ-success וה-DAG ימשיך להתקדם ל-task2.

כדי להגדיר עד מתי ירוץ הסנסור, נשתמש בשדה timeout. הערך שלו נמדד בשניות. ערך הדיפולט שלו הוא 7 ימים (604,800 בשניות). צריך לתת את הדעת לגבי קביעת ה-tiemout כי הוא יכול להציל אותנו ממה שנקרא sensor deadlock. מדובר על מצב שבו יש בעיה במערכת וסנסורים מסויימים מחכים המון זמן. כל סנסור תופס סלוט אחד כמו שלמדנו בפיסקה על airflow pool עד שמגיעים למצב שכל הסלוטים של ה-DAG תפוסים ואין לו יכולת להמשיך לרוץ.

לדוגמה, אם יש לנו DAG שלא שינינו בו את ה-timeout ולכן הוא 7 ימים. וגם לא שינינו בו את ה-concurrency ולכן הוא 16. ה-DAG הזה רץ כל יום והוא מתחיל ב-8 סנסורים שמחכים למשהו מסוים שיקרה. אבל כמו שאמרנו יש בבעיה במערכת והדבר שהם מחכים לו לא קורה. אז ביום הראשון נתפסים 8 סלוטים, ולא משוחררים. ביום השני נתפסים עוד 8 סלוטים, כך שכבר 16 סלוטים תפוסים. ביום השלישי כבר אין יותר סלוטים פנויים ל-DAG הזה והוא לא יכול לרוץ. רק בעוד 5 ימים יסתיים הזמן של ה-timeout והסנסורים יסגרו והסלוטים ישתחררו. עד אז אנחנו ב-sensor deadlock.

אופטימיזציה נוספת אפשרית לסנסור היא קביעת ה-mode שלו. יש שתי אפשרויות - poke או reschedule. הדיפולט הוא poke. ההסבר הוא פשוט, הרי סנסור ברוב הזמן לא עושה כלום. והשאלה היא האם בזמן הזה הוא משחרר את הסלוט או לא. ב-mode של poke הסנסור לא משחרר את הסלוט, ואילו ב-reschedule הוא משחרר את הסלוט וכל פעם שהוא צריך לבדוק את מה שהוא אמור לבדוק הוא תופס שוב סלוט. 

אם נשתמש ב-reschedule נוכל להימנע מ-sensor deadlock שהזכרנו לעיל. מצד שני, אם יש הרבה משימות עלול להיות מצב שכשהסנסור שלנו צריך לפעול לא יהיה לא סלוט פנוי ואז הוא יצטרך להמתין.

כדי להחליט איזה mode לבחור צריך לבחון את כל מערכת ה-airflow שלכם ואת ה-DAG הספציפי שהסנסור הזה שייך אליו. אבל אפשר להגיד כלל אצבע, שאם ה-poke_interval של הסנסור קצר יחסית, הגיוני להשאיר אותו ב-mode של poke.

דוגמה לשימוש ב-mode שהוא reschedule וב-timeout של שעה:

wait_for_file = FileSensor(

    task_id='wait_for_file',

    fs_conn_id='fs_default',

    poke_interval=200,

    timeout=60*60,

    mode='reschedule',

)


ב-UI ניתן לראות את הסנסורים שמוגדרים כ-reschedule תחת Browse->Task Reschedules.

ניתן להגדיר פונקציה שתיקרא במקרה שה-timeout של הסנסור הגיע. נשתמש בשדה on_failure_callback. לדוגמה:

def failure_callback(context):

    if isinstance(context['exception'], AirflowSensorTimeout):

        print(context)

        print("Sensor timed out")


with
 DAG(

    dag_id='simple_dag',

    start_date=days_ago(2),

as dag:


    wait_for_file = FileSensor(

        task_id='wait_for_file',

        poke_interval=200,

        timeout=60*60,

        mode='reschedule',

        on_failure_callback=failure_callback,

    )

בדוגמה הזו, במקרה שמגיע ה-timeout של הסנסור הפונקציה failure_callback תיקרא. הפונקציה מקבלת אובייקט שנקרא context שמכיל את ה-task_instance וממנו אפשר לקבל הרבה מידע על ה-task וכן על ה-DAG. במקרה הזה בדקנו למה שווה השדה exception שבתוך ה-context ואם הוא AirflowSensorTimeout זה אומר שהוא הגיע מאיזשהו סנסור. אם יש לנו כמה נצטרך לחשוב איך להבדיל ביניהם. אפשר פשוט שכל אחד יקרא לפונקציה אחרת.

דבר נוסף שאפשר להגדיר למקרה של timeout הוא soft_fail. השדה הזה קובע האם במקרה של timeout הסנסור יחשב כ-failed, או כ-skipped. אם הוא True אז במקרה של timeout הסנסור יחשב כ-skipped. בדיפולט הוא False ולכן הסנסור יחשב כ-failed. רק צריך לזכור שגם במקרה שמשימה היא skipped כל המשימות שאחריה לא ירוצו, אלא אם כן נשנה את ה-trigger_rule של המשימות הבאות.

עוד שדה שיכול לשפר את הסנסור שלנו הוא ה-exponential_backoff. אם השדה הזה שווה ל-True הוא גורם לכך שהזמן שעובר בין פעולות של הסנסור הולך וגדל. במקום שהוא יהיה קבוע, למשל בדוגמא לעיל קבענו שזה יהיה כל 200 שניות, הוא ילך ויגדל בכל פעם. לפעמים זה עוזר ולפעמים לא, כל מקרה צריך לבחון לגופו.


קצת על Security

באתר הרישמי ישנו עמוד שמדבר על security, והוא מחלק את הנושא הזה מספר נושאים. כאן נעסוק בקצרה בחלק מהנושאים הללו.

Access Control

ב-Airflow לכל משתמש יש role מסוים שקובע את ההרשאות שלו ב-webserver UI. ה-roles הדיפולטיביים לפי רמת ההרשאות שלהם הם:

  • Admin - all possible permissions
  • Op - User permissions + additional op permissions
  • User - Viewer permissions + additional user permissions
  • Viewer - limited viewer permissions
  • Public - no permissions

ניתן לראות ב-UI את ה-role של כל משתמש Security->List Users ואת כל ה-roles הקיימים וכל אחד מה ההרשאות שלו ב-Security->List Roles.

יש שני סוגים של permissions:

1. Resource-Based permissions - ההרשאות הם פר resource. וכל הרשאה מוגדרת ע"י resource+action. 

דוגמאות ל-resources הם - Dag, Task, Connection. 

ה-actions הם - can_create, can_read, can_edit, can_delete.

דוגמאות להרשאות:

DAGs.can_read
Connections.can_create
Pool.can_delete
Variables.can_edit


2. DAG-level permissions - לפעמים יש לנו DAGs מסוימים שאנחנו רוצים שלא יהיו נגישים לכולם. למשל כי הם מכילים מידע רגיש. או בגלל שההרצות שלהם ארוכות ואנחנו לא רוצים שמישהו בטעות יפריע להם באמצע ריצה. לצורך כך יש אפשרות לשלוט על ההרשאות פר DAG. 
קודם כל, מי שיש לו הרשאה כללית לכל ה-DAGs כמו DAGs.can_edit תהיה לו הרשאה לכל DAG.
כדי לתת הרשאה פר DAG, ה-resource שנשתמש כשניתן הרשאות יהיה DAG:dag_id. למשל, אם אנחנו רוצים לתת הרשאה עבור DAG שנקרא my_dag ההרשאה תיראה כך: DAG:my_dag.can_edit.

 Custome Roles

אפשר ליצור roles שיהיו מותאמים אישית לצרכים שלנו. כשיוצרים role אפשר להגביל אותו ל-DAGs מסוימים. כדי ליצור role אפשר להשתמש בעמוד Security->List Roles וזו הדרך הפשוטה. אפשר גם לעשות זאת מה-CLI.

הסתרת מידע סודי

Airflow מסתיר מידע סודי כמו למשל סיסמאות ע"י כך שהוא מחליף את המידע הסודי ב-***. למשל אם לא שינית את הסיסמה הדיפולטיבית אז הסיסמה שלך היא airflow, ולכן כל מקום שיהיה כתוב airflow בלוג יוחלף ב- ***. לדוגמה:

[2021-08-17 15:37:30,821] {logging_mixin.py:104} INFO - dir_path is: /opt/***/dags/my_code/pythonOperator_simple_dag/pythonFileName

אפשר לבטל את ההסתרה הזו ע"י קביעת הערך של hide_sensitive_var_conn_fields בקונפיגורציה ל-false.
Airflow גם יסתיר כל מידע שנמצא ב-Variables או בשדה Extra של Connections שהשם שלו מכיל את אחת המילים האלה:
'password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token'


התקנת תוכנות בתוך ה-container של airflow

לפעמים יש לנו צורך להתקין בתוך ה-container של airflow תוכנות נוספות. למשל, אם אנחנו רוצים להריץ פקודות של AWS CLI נצטרך להתקין awscli ואם אנו צריכים להריץ פקודות של kubectl נצטרך להתקין kubectl

כדי לעשות את זה, נריץ את airflow וניכנס לתוך ה-container של ה-worker. בתוך ה-container נתקין את התוכנות שאנו צריכים. ואז נצא מה-container. כדי ליצור image חדש שכולל את airflow יחד עם התוכנות שהתקנו בתוכו נשתמש בפקודה docker commit.

תיאור מפורט של התהליך הזה, סיכמתי פה.


התקנת Airflow בתוך kubernetes cluster ע"י Helm Chart

Helm הוא ה-package manager של Kubernetes שמאפשר יחסית בקלות להכניס תוכנות ל-Kubernetes cluster ע"י שימוש ב-helm chart.

במערכת שלי (ubuntu 18.04) ההתקנה של helm היתה ע"י הפקודה:

sudo snap install helm3

כרגיל בתוכנה, הרבה פעמים יש בעיות בלתי צפויות, שתמיד קורות דווקא לי... אז אם יש בעיות נסו את הפקודה הזו:

kubectl -n airflow get events --sort-by='{.lastTimestamp}'

היא אמורה לתת לכם רשימה של מה שקרה מאחורי הקלעים, ואולי תמצאו שם רמז למה גורם לבעיה שלכם.

באתר הרישמי ניתן לראות הסבר לגבי השימוש ב-helm עבור airflow. בגדול כל מה שצריך לעשות כדי להתקין את airflow ב-kubernetes cluster ע"י helm זה את הפקודות הבאות:

kubectl create namespace airflow

helm repo add apache-airflow https://airflow.apache.org

helm install airflow apache-airflow/airflow --namespace airflow

אישית לא יצא לי להנסות את זה ולכן אני רק מציין את האפשרות הזו, כיון שהיא חשובה ומאוד נוחה, אבל לצערי אין לי עדיין טיפים של ניסיון.


עד כאן להפעם, אולי בהמשך אמצא זמן להוסיף כאן נושאים נוספים. בהצלחה!

ו... אם אהבתם, תכתבו משהו למטה...