תוכן העניינים
מה זה Airflow?
- הרצת משימות במקביל
- הרצת משימות כתלות בתנאי
- תזמון ריצות לפי זמן או לפי טריגר חיצוני
- הרצת workflow ב-cluster
- קונפיגורציה עשירה
- UI טוב ונוח
- אפשרויות alerting שונות
- מנגנון העברת מידע בין tasks
- דיווחים במייל או בערוצים אחרים כמו Slack
קצת היסטוריה
יש כלים מתחרים?
אז למה דווקא Airflow?
מה זה DAG?
כפי שכתבתי לעיל, DAG זה ראשי תיבות של Directed Acyclic Graph. אז כמו שזה נשמע מדובר על workflow שהוא directed. זאת אומרת שיש כיוון זרימה מוגדר בין ה-tasks. ושהוא acyclic, שאין מעגליות בזרימה של הגרף.Broken DAG:
[/opt/airflow/dags/simple_dag.py] Cycle detected in DAG. Faulty task:
upload_compressed_images
הארכיטקטורה של Airflow
הבסיס של Airflow: ה-Operators וה-Sensors
התחלת עבודה
התקנה
- יוצרים ספרייה בשם airflow ויוצרים בתוכה שלוש ספריות dags, logs, plugins
- מורידים לתוך airflow את קובץ docker-compose.yaml של airflow (כפי שמופיע באתר הרישמי)
- נותנים הרשאות ל-container בדומה להרשאות של המכונה שלנו
- מריצים docker-compose up airflow-init ומחכים שהוא יסיים
- אפשר להריץ את airflow ע"י: docker-compose up
הרצה ועצירה של airflow
כדי להריץ את airflow נכתוב:
$ docker-compose up
כדי לעצור אותו נכתוב:
$ docker-compose down
נתחיל בדוגמה
כדי להבין נושא חדש תמיד טוב להתחיל בדוגמה פשוטה, ואז הרבה דברים מתבהרים.
נבנה DAG שיש בו 3 משימות פשוטות. ה-DAG וכל המשימות יהיו מוגדרים בקובץ אחד כיון שזה הדרך הפשוטה ביותר. בהמשך נראה איך ניתן להגדיר את ה-DAG בקובץ אחד והמשימות בקבצים אחרים, שזו הדרך שבה בפועל עובדים במקרים אמיתיים.
לשם הדוגמה, המשימות יהיו מאוד פשוטות ורק ידפיסו משהו על המסך.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='simple_dag',
start_date=days_ago(2),
) as dag:
task1 = BashOperator(
task_id='task1_name',
bash_command='echo this is task1'
)
task2 = BashOperator(
task_id='task2_name',
bash_command='echo
this is task2'
)
task3 = BashOperator(
task_id='task3_name',
bash_command='echo
this is task3'
)
task1 >> task2 >> task3
task1 >> task2 >> task3
task1 >> task3 << task2
אגב, לא חייב להשתמש בכתיבה המקוצרת. אפשר במקום זה להשתמש בפונקציות set_downstream ו-set_upstream. למשל כדי ליצור את ה-DAG הראשון נכתוב:
task1.set_downstream(task2)
task2.set_downstream(task3)
task1.set_downstream(task3)
task2.set_downstream(task3)
task1.set_downstream(task3)
task3.set_upstream(task2)
הרצת משימות במקביל
task1.set_downstream(task2)
task1.set_downstream(task3)
task1 >> task2
task1 >> task3
task1.set_downstream([task2, task3])
task1 >> [task2, task3]
task1.set_downstream([task2, task3])
task2.set_downstream(task4)
task3.set_downstream(task4)
task1 >> [task2, task3] >> task4
ממשק המשתמש - UI
- אם אתה מנסה להריץ DAG מסוים ונראה שהוא תקוע ולא מתקדם, לך לעמוד הראשי שבו יש את רשימת כל ה-DAGs ותרענן אותו. בדרך כלל תמצא שם ERROR ב-DAG עצמו שגורם ל-DAG לא לרוץ.
- אם בעמוד הראשי מופיע ERROR לא ברור עם המילים המוזרות bad magic number, לדוגמה:
זה בד"כ אומר שעשיתם שינוי וה-Airflow עוד לא הספיק להתעדכן. תנו לו עוד 5 שניות ותרעננו את הדף והשגיאה תעלם.
תזמון משימות
start_date=datetime(2021, 4, 1)
from airflow.utils.dates import days_ago
start_date=days_ago(2)
schedule_interval="0 30 14 * * *",
schedule_interval='@daily'
קונפיגורציה
ל-Airflow יש קובץ קונפיגורציה שנקרא airflow.cfg. הוא נוצר בהרצה הראשונית של airflow והוא נמצא בספרייה הראשית של airflow (בד"כ airflow/~).
הקובץ הזה מחולק לנושאים שונים. כל נושא מוקף בסוגריים מרובועות למשל [core] או [logging] או [webserver], ותחתיו כל ההגדרות שקשורות לאותו נושא. כל אפשרויות הקונפיגורציה נמצאות כאן עם הסברים.
דרך נוספת לקנפג את airflow היא באמצעות משתני סביבה - environment varibles. לכל הגדרה ב-airflow.cfg יש משתנה סביבה מתאים. יש שיטה פשוטה לדעת מה שם משתנה הסביבה המתאים לכל הגדרת קונפיגורציה. כל משתנה מתחיל במילה AIRFLOW ואז קו תחתון פעמיים. לאחר מכן שם הנושא (core/webserver/etc) אליו הוא שייך ושוב קו תחתון פעמיים, ואז השם המופיע בקובץ airflow.cfg באותיות גדולות.
דוגמה תבהיר את זה בקלות. ניקח למשל את הגדרת הקונפיגורציה שנקראת dags_folder ששם מגדירים באיזה ספריה ה-DAGs שלנו נמצאים. היא נמצאת תחת הנושא [core]. משתנה הסביבה של הגדרה זו הוא:
AIRFLOW__CORE__DAGS_FOLDER
תעברו רגע על השם הזה ועל ההסבר לעיל ותיראו שזה מתאים.
כשמריצים את airflow ע"י docker לא תהיה השפעה של airflow.cfg החיצוני (אני מעריך שיש דרך לגרום לקובץ הקונפיגורציה להשפיע אבל אני לא מצאתי). לכן צריך להשתמש במשתני הסביבה בתוך ה-docker-compose.yaml. הקובץ הזה נמצא בד"כ בתוך ספריית airflow ואפשר גם לראות אותו כאן. הוא מגיע כבר עם כמה הגדרות מראש למשל:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
באותה צורה נוכל להוסיף משתנים שאנו רוצים לשנות מהדיפולט שלהם. קודם נעצור את המערכת עם
$ docker-compose down
נעשה את השינויים ב-docker-compose.yaml ונרים את המערכת:
$ docker-compose up
אפשר לראות את הקונפיגורציה ב-UI תחת Admin->Configurations.
בהתחלה לא תיראו שם את הקונפיגורציה כי בדיפולט זה סגור מטעמי אבטחה. כדי לאפשר את זה צריך להגדיר את משתנה הסביבה הבא::
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'
החלק העליון של העמוד הזה מראה את קובץ ה-airflow.cfg והחלק התחתון (צריך לגלול הרבה למטה) מראה את הקונפיגורציה בפועל, ז"א לאחר שמתחשבים גם במשתני הסביבה.
מבחינת סדרי העדיפויות, airflow קודם כל מתחשב במשתני הסביבה, לאחר מכן בקובץ airflow.cfg ובסוף בערכי הדיפולט.
ניתן גם לבדוק ערכי קונפיגורציה ע"י פקודות CLI של Airflow. למשל, כדי לקבל את הערך של load_examplesמהחלק של core נכתוב:
airflow config get-value core load_examples
ואם airflow רץ בתוך docker אז נריץ את הפקודה בתוך ה-container בצורה הבאה:
docker exec -it airflow_airflow-worker_1 airflow config get-value core load_examples
עוד לגבי CLI בפיסקה הבאה.
שימוש ב-CLI
ל-Airflow יש סט פקודות CLI עשיר. אפשר לראות את כולם פה.
כשמריצים את airflow לוקאלית (לא בתוך container) אפשר להשתמש ישירות ב-CLI. כמה דוגמאות לפקודות בסיסיות שגם די ברור מה הן עושות:
airflow info
airflow dags list
airflow dags trigger dag_id
אבל כאשר airflow רת בתוך docker container צריך להריץ את הפקודות הללו בתוך ה-worker container. יש שתי אפשרויות איך לעשות את זה.
1. להיכנס לתוך ה-container:
docker exec -it airflow_airflow-worker_1 bash
ואז להריץ את הפקודות, למשל כדי להריץ DAG מסוים:
default@78bed324a0cc:/opt/airflow$ airflow dags trigger my_simple_dag
2. להריץ את הפקודות ישירות בתוך ה-container בפקודה אחת, למשל כדי להריץ DAG מסוים:
docker exec -it airflow_airflow-worker_1 airflow dags trigger my_simple_dag
שימוש ב-REST API
התיעוד המלא ל-REAST API של Airflow נמצא כאן.
נראה כמה דוגמאות.
כדי לקבל רשימה של כל ה-DAGs נרשום:
curl GET 'http://localhost:8080/api/v1/dags' -H 'Content-Type: application/json' --user "airflow:airflow"
בחלק האחרון של ה user-- צריך לתת username:password שלנו ב-airflow. כמו שהיזכרנו, הדיפולט הוא airflow:airflow.
הדוגמה הזו טובה כשמריצים את הפקודה על אותה מכונה שבה נמצא ה-airflow. אם מדובר על מכונה אחרת צריך לשנות localhost ל-ip של המכונה.
כדי לקבל פרטים על DAG ספציפי (בדוגמה הזו my_dag_name):
curl GET 'http://localhost:8080/api/v1/dags/my_dag_name' -H 'Content-Type: application/json' --user "airflow:airflow"
כדי להריץ DAG (בדוגמה הזו my_dag_name):
curl POST 'http://localhost:8080/api/v1/dags/my_dag_name/dagRuns' -H 'Content-Type: application/json' --user "airflow:airflow" -d '{}'
לוגים
הלוגים ב-airflow נמצאים תחת airflow/logs ומסודרים באופן הבא:
airflow/logs/dag_id/task_id/date/incremental_number.log
אם למשל שם ה-DAG הוא simple_dag ושם המשימה הראשונה הוא task1 אז הלוג שלה יהיה ב:
airflow/logs/simple_dag/task1/2021-08-23T07:00:00+00:00/1.log
דרך הקונפיגורציה, בחלק של [logging] אפשר לשנות הרבה דברים וביניהם את ה-path של הלוגים ואת הפורמט של שם הלוג.
דרך פשוטה יותר לגשת ללוגים זה בעזרת ה-UI. פשוט לוחצים על הריבוע שמייצג את המשימה:
ואז נפתח חלון שבחלק העליון שלו יש כפתור של Log.
לחיצה עליו תציג לנו את הלוג.
ב-Airflow יש גם תמיכה בכתיבת לוגים לענן (AWS, google cloud, Azure).
לגבי לוגים של פייתון, אם נשתמש בפקודה print אנחנו נראה שהטקסט הגיע מקובץ בשם logging_mixin.py.
למשל:
[2021-08-22 11:48:37,829] {logging_mixin.py:104} INFO - my print text
אבל אם נשתמש בספריית logging של פייתון הלוג יציין את שם הקובץ שכתב ללוג:
import logging
LOGGER = logging.getLogger("airflow.task")
LOGGER.info("example of log line")
העברת פרמטרים בין משימות - XCOM
בעיקרון, airflow תוכנן כך שכל משימה אמורה להיות בפני עצמה, ולכן אם אפשר עדיף להימנע מהעברת מידע בין משימות. עם זאת, יש מנגנון שנקרא xcom שנועד לאפשר העברת מידע בין משימות. שימו לב ש-xcom מיועד להעברת כמויות מידע קטנות, ולא גדולות, אז לא מומלץ לשים בו אובייקטים גדולים.
מדובר על רשימה של ערכים שלכל אחד יש שישה שדות:
key, value, timestamp, execution date, task id, Dag id
אפשר לראות את הרשימה ב-UI, אם נלחץ על Admin->Xcoms.
בד"כ רוב השימוש ב-xcom יהיה ב-PythonOperator כיון שבו אפשר להשתמש בפונקציה xcom_push כדי להכניס נתונים ל-xcom ובפונקציה xcom_pull כדי לקבל נתונים מ-xcom.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator,
PythonVirtualenvOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'airflow',
}
with DAG(
dag_id='simple_python_dag',
default_args=args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
) as dag:
def task1_function():
print('This
print is from task 1')
return 'return
from task1'
def task2_function():
print('This
print is from task 2')
task1 = PythonOperator(
task_id='task_1',
python_callable=task1_function,
)
task2 = PythonOperator(
task_id='task_2',
python_callable=task2_function,
)
task3 = BashOperator(
task_id='task3',
bash_command='echo
This print is from task3'
)
task1 >> task2
>> task3
המשימה task_1 היא PythonOperator והיא מחזירה ערך, ולכן הערך הזה יהיה ב-xcom.
המשימה task_2 היא PythonOperator והיא לא מחזירה ערך, ולכן לא נראה return_value מהמשימה הזו ב-xcom.
המשימה task_3 היא BashOperator והיא מדפיסה משהו למסך, ולכן מה שמודפס גם יהיה ב-xcom.
שימוש ב-xcom מתוך קובץ חיצוני (לא מתוך קובץ ה-DAG)
task1 = PythonOperator(
task_id='task_1',
python_callable=python_file_function,
provide_context=True
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from python_xcom_file import python_file_function
with DAG(
dag_id='xcom_exteranl_python_dag',
schedule_interval=None,
start_date=days_ago(2),
) as dag:
def simple_function(ti):
print('task1:
print from task 1')
ti.xcom_push(key='task1_string', value='this
string from task1')
return 'return
from task 1'
def print_xcom_value(ti):
print('task3:
value from xcom pull: ' + ti.xcom_pull(task_ids='task2', key='task2_string'))
task1 = PythonOperator(
task_id='task1',
python_callable=simple_function,
)
task2 = PythonOperator(
task_id='task2',
python_callable=python_file_function,
provide_context=True
)
task3 = PythonOperator(
task_id='task3',
python_callable=print_xcom_value,
provide_context=True
)
task1 >> task2 >> task3
def python_file_function(**kwargs):
kwargs['ti'].xcom_push(key='task2_string', value='this string from
task2')
print('task2: value from xcom
pull task1_string: ' +
kwargs['ti'].xcom_pull(task_ids='task1', key='task1_string'))
return 0
רואים את הערכים ש-task1 ו-task2 הכניסו ל-xcom. כמו כן, רואים בתור return_value את הערכים ששתי המשימות החזירו.
קביעת תנאים בהרצת משימות - conditional tasks
ישנן מספר דרכים להרצת משימות כתלות בתנאי מסוים. בדיפולט, התנאי להרצת משימה הוא שכל המשימות הקודמת אליה יסתיימו בהצלחה.
שימוש ב-trigger_rule
לכל משימה יש שדה אופציונאלי שנקרא trigger_rule. בשדה הזה ניתן להגדיר שמשימה תרוץ גם אם לא כל המשימות שלפניה הצליחו. האפשרויות הן:
all_success
(default): All upstream tasks have succeededall_failed
: All upstream tasks are in afailed
orupstream_failed
stateall_done
: All upstream tasks are done with their executionone_failed
: At least one upstream task has failed (does not wait for all upstream tasks to be done)one_success
: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)none_failed
: All upstream tasks have notfailed
orupstream_failed
- that is, all upstream tasks have succeeded or been skippednone_failed_or_skipped
: All upstream tasks have notfailed
orupstream_failed
, and at least one upstream task has succeeded.none_skipped
: No upstream task is in askipped
state - that is, all upstream tasks are in asuccess
,failed
, orupstream_failed
statedummy
: No dependencies at all, run this task at any time
דוגמה לשימוש בשדה הזה:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime,
timedelta
import logging
with DAG(
dag_id='trigger_rule_dag',
start_date=datetime(2021, 4, 1),
) as dag:
task1 = BashOperator(
task_id='task1_name',
bash_command='echo
this is task1'
)
task2 = BashOperator(
task_id='task2_name',
trigger_rule='all_failed',
bash_command='echo
this is task2'
)
task3 = BashOperator(
task_id='task3_name',
bash_command='echo
this is task3'
)
task4 = BashOperator(
task_id='task4_name',
trigger_rule='none_failed',
bash_command='echo
this is task4'
)
task1 >> [task2, task3] >> task4
כפי שניתן לראות, task2 ו-task3 הן משימות מקביליות לאחר task1, כאשר task2 מותנת בכך ש-task1 נכשלה ואילו task3 מותנת בכך ש-task1 הצליחה. זו דוגמה למקרה נפוץ, שבו אנו צריכים לעשות משימות שונות כתלות בהצלחת המשימה הקודמת.
אבל task4 צריכה לרוץ במקרה ש-task2 או task3 הצליחו ולכן ה-trigger_rule שלה הוא none_failed. שזה תנאי שמוודא שהמשימות הקודמות הצליחו או דולגו, אבל לא נכשלו.
הגרף של ה-DAG הזה יראה כך:
השדה trigger_rule הוא פשוט לשימוש ויכול להועיל במקרים מסויימים, אבל כשיש לנו צורך בתנאי מורכב יותר trigger_rule לא תמיד מספיק, ולכן יש לנו האופרטור BranchPythonOperator.
שימוש ב-BranchPythonOperator
האופרטור BranchPythonOperator מאפשר לכתוב בפונקציית פייתון תנאי שיקבע מה המשימות הבאות שירוצו. האופרטור הזה דומה ל-PythonOperator רק שבשדה python_callable הוא מקבל פונקציה של פייתון שמחזירה את שמות המשימות הבאות שירוצו, כך שלאחר האופרטור הזה המשימות שחזרו מהפונקציה שמוגדרת ב-python_callable ירוצו ושאר הפונקציות ידולגו. אפשר להשתמש ב-xcom בתוך התנאי וזה מאפשר ליצור תנאים שתלויים במשימות אחרות.
כרגיל, דוגמה תבהיר את העניין בצורה הכי טובה:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime,
timedelta
import logging
def branch_function(ti):
xcom_value = int(ti.xcom_pull(task_ids='task1_name'))
if xcom_value == 1:
return 'task3_name'
else:
return 'task4_name'
with DAG(
dag_id='branching_dag',
schedule_interval=None,
start_date=datetime(2021, 4, 1),
) as dag:
task1 = BashOperator(
task_id='task1_name',
bash_command='echo
1'
)
task2 = BranchPythonOperator(
task_id='task2_name',
python_callable=branch_function
)
task3 = BashOperator(
task_id='task3_name',
bash_command='echo
this is task3'
)
task4 = BashOperator(
task_id='task4_name',
trigger_rule='none_failed',
bash_command='echo
this is task4'
)
task1 >> task2 >> [task3, task4]
בהתחלה יש לנו את task1 שהוא מאוד פשוט, וכמו שהסברנו לעיל לגבי xcom הערך האחרון ש-BashOperator מדפיס נכנס ל-xcom כ-return_value של אותה משימה. לכן בדוגמה הזו יכנס הערך 1.
ב-task2 יש לנו BranchPythonOperator שמשתמש בפונקציה שנקראת branch_function שמוגדרת בתחילת הקוד. מה שהיא עושה זה לקרוא את הערך של ה-return_value של task1 ולפי זה להחליט האם המשימה הבאה תהיה task3 או task4. בדומה הזו קל לראות שהמשימה הבאה תהיה task3. ולכן נקבל גרף כזה:
אבל אם נשנה ב-task1 את הפקודה ל-
bash_command='echo 2'
נראה ש-task4 היא זו שתרוץ ונקבל גרף כזה:
לסיכום, האופרטור BranchPythonOperator מאפשר גמישות גדולה לקביעת כל תנאי שאנו צריכים ב-DAG שלנו.
שימוש ב-ShortCircuitOperator
מספר המשימות שניתן להריץ במקביל - Airflow Pools
כדי להגביל את כמות המשימות שרצות במקביל משתמשים ב-pools. ניתן לגשת ל-Pools דרך ה-UI אם נלחץ על Admin->Pools. שם נראה משהו כזה:
רואים שיש לנו default_pool שיש לו 128 slots. בדיפולט כל המשימות חולקות את ה-pool הזה. כל פעם שרצה משימה היא מקבלת סלוט אחד. וכאשר המשימה מסתיימת הסלוט משתחרר. ערך הדיפולט הוא 128 וזה אומר שאפשר להריץ 128 משימות במקביל כל עוד לא נשנה את ערך ה-default_pool.
אם לא נשארו סלוטים פנויים, ויש משימות שצריכות לרוץ, המשימות הללו יכנסו לתור ויוצאו לפועל כל אחת בתורה ברגע שמתשחרר סלוט.
זה נכון לגבי כל המשימות שרצות במערכת ה-airflow, אבל יש מגבלה נוספת על כל DAG. אם לא נשנה את ערכי הדיפולט אז כל DAG יכול להריץ עד 16 משימות במקביל.
ניתן לשנות ערך זה בשתי דרכים:
- שימוש בשדה dag_concurrency בקונפיגורציה של מערכת ה-airflow. ואז ערך זה ישפיע על כל ה-DAGs במערכת
- שימוש בשדה concurrency ביצירת ה-DAG, כדי להשפיע רק על DAG מסוים.
ב-UI דרך Admin->Pools ניתן לשנות את ערך ה-default_pool. אפשר גם להוסיף pools אחרים, לתת להם שם ולהשתמש בהם עבור משימות. כדי להגדיר שמשימה מסויימת משתמשת ב-pool מסוים שהוא לא ה-default_pool נשתמש בשדה שנקרא pool ביצירת המשימה. אם למשל יצרנו pool עבור שאילתות SQL וקראנו לו sql_pool נוכל להשתמש בו במשימת SQL בצורה הזו:
mysql_task = MySqlOperator(
task_id='mysql_task',
mysql_conn_id='mysql_default_conn'
sql='./sample_sql.sql',
pool='sql_pool'
)
וכך המשימה הזו תשתמש ב-sql_pool ולא ב-default_pool. בצורה הזו נוכל לסדר את המערכת שלנו כך שלא יהיה מצב שיותר מדיי משימות רצות במקביל ועלולות לגרום למערכת לקרוס.
Airflow Sensor
כמו שכבר הזכרנו, סנסור זה סוג של אופרטור שנועד לחכות לאיזשהו אירוע חיצוני. למשל לקובץ או תיקייה שיגיעו למיקום מסוים, או לקרוא לאיזשהו שאילתת SQL עד שתנאי מסוים מתקיים. הסנסור בודק כל אינטרוול מסוים שאנו מגדירים האם התנאי התקיים ואם כן אז הסנסור הזה מסומן כ-succeed ומשימות שבאות אחריו ב-DAG ממשיכות לרוץ. בנוסף, אנחנו גם צריכים להגדיר עד מתי הסנסור הזה ממשיך לבדוק את מה שהגדרנו לו. נראה את זה עוד מעט ע"י שימוש בשדה timeout.
סנסורים פופולאריים לדוגמה:
- FileSensor - מחכה לקובץ או תיקייה שיגיעו למיקום מסוים
- SqlSensor - מריץ שאילתת SQL עד שתנאי מסוים מתקיים
- ExternalTaskSensor - מחכה ש-DAG מסוים או משימה מסויימת יסתיימו בתאריך מסוים
- DateTimeSensor - מחכה לזמן מסוים (תאריך ושעה)
- TimeDeltaSensor - מחכה שזמן מסוים יעבור לאחר סיום המשימה הקודמת
- S3KeySensor - מחכה ל-key מסוים שיופיע ב-bucket מסוים ב-S3
wait_for_file = FileSensor(
task_id='wait_for_file',
fs_conn_id='fs_default',
filepath='my_file_name.txt',
poke_interval=20
)
הדיפולט של poke_interval הוא 60 שניות.
בדוגמה הזו השתמשנו ב-FileSensor. השדה filepath צריך להכיל את שם הקובץ או התיקייה שלהם אנו מחכים. שדה חשוב נוסף בסנסור הזה הוא fs_conn_id. השדה הזה צריך להכיל את ה-connection_id למערכת שבה אנו רוצים לבדוק אם הקובץ קיים. ניתן להגדיר connection דרך ה-UI. לוחצים על Admin->Connections ושם מוסיפים הגדרות. לדוגמה, אם אנו רוצים לבדוק שהקובץ my_file_name.txt קיים בספריית ה-dags אז נגדיר את ה-connection שנקרא fd_default בצורה הבאה:
דוגמה מלאה לשימוש ב-FileSensor:
task1 = BashOperator(
task_id='task1_name',
bash_command='echo this is task1'
)
wait_for_file = FileSensor(
task_id='wait_for_file',
fs_conn_id='fs_default',
filepath='my_file_name.txt',
poke_interval=5
)
task2 = BashOperator(
task_id='task2_name',
bash_command='echo this is task2'
)
task1 >> wait_for_file >> task2
הגרף יראה כך:
רק ברגע ש-wait_for_file יגלה שהקובץ my_file_name.txt קיים בספריית dags הוא יסומן כ-success וה-DAG ימשיך להתקדם ל-task2.
כדי להגדיר עד מתי ירוץ הסנסור, נשתמש בשדה timeout. הערך שלו נמדד בשניות. ערך הדיפולט שלו הוא 7 ימים (604,800 בשניות). צריך לתת את הדעת לגבי קביעת ה-tiemout כי הוא יכול להציל אותנו ממה שנקרא sensor deadlock. מדובר על מצב שבו יש בעיה במערכת וסנסורים מסויימים מחכים המון זמן. כל סנסור תופס סלוט אחד כמו שלמדנו בפיסקה על airflow pool עד שמגיעים למצב שכל הסלוטים של ה-DAG תפוסים ואין לו יכולת להמשיך לרוץ.
לדוגמה, אם יש לנו DAG שלא שינינו בו את ה-timeout ולכן הוא 7 ימים. וגם לא שינינו בו את ה-concurrency ולכן הוא 16. ה-DAG הזה רץ כל יום והוא מתחיל ב-8 סנסורים שמחכים למשהו מסוים שיקרה. אבל כמו שאמרנו יש בבעיה במערכת והדבר שהם מחכים לו לא קורה. אז ביום הראשון נתפסים 8 סלוטים, ולא משוחררים. ביום השני נתפסים עוד 8 סלוטים, כך שכבר 16 סלוטים תפוסים. ביום השלישי כבר אין יותר סלוטים פנויים ל-DAG הזה והוא לא יכול לרוץ. רק בעוד 5 ימים יסתיים הזמן של ה-timeout והסנסורים יסגרו והסלוטים ישתחררו. עד אז אנחנו ב-sensor deadlock.
אופטימיזציה נוספת אפשרית לסנסור היא קביעת ה-mode שלו. יש שתי אפשרויות - poke או reschedule. הדיפולט הוא poke. ההסבר הוא פשוט, הרי סנסור ברוב הזמן לא עושה כלום. והשאלה היא האם בזמן הזה הוא משחרר את הסלוט או לא. ב-mode של poke הסנסור לא משחרר את הסלוט, ואילו ב-reschedule הוא משחרר את הסלוט וכל פעם שהוא צריך לבדוק את מה שהוא אמור לבדוק הוא תופס שוב סלוט.
אם נשתמש ב-reschedule נוכל להימנע מ-sensor deadlock שהזכרנו לעיל. מצד שני, אם יש הרבה משימות עלול להיות מצב שכשהסנסור שלנו צריך לפעול לא יהיה לא סלוט פנוי ואז הוא יצטרך להמתין.
כדי להחליט איזה mode לבחור צריך לבחון את כל מערכת ה-airflow שלכם ואת ה-DAG הספציפי שהסנסור הזה שייך אליו. אבל אפשר להגיד כלל אצבע, שאם ה-poke_interval של הסנסור קצר יחסית, הגיוני להשאיר אותו ב-mode של poke.
wait_for_file = FileSensor(
task_id='wait_for_file',
fs_conn_id='fs_default',
poke_interval=200,
timeout=60*60,
mode='reschedule',
)
ב-UI ניתן לראות את הסנסורים שמוגדרים כ-reschedule תחת Browse->Task Reschedules.
ניתן להגדיר פונקציה שתיקרא במקרה שה-timeout של הסנסור הגיע. נשתמש בשדה on_failure_callback. לדוגמה:
def failure_callback(context):
if isinstance(context['exception'], AirflowSensorTimeout):
print(context)
print("Sensor timed out")
with DAG(
dag_id='simple_dag',
start_date=days_ago(2),
) as dag:
wait_for_file = FileSensor(
task_id='wait_for_file',
poke_interval=200,
timeout=60*60,
mode='reschedule',
on_failure_callback=failure_callback,
)
בדוגמה הזו, במקרה שמגיע ה-timeout של הסנסור הפונקציה failure_callback תיקרא. הפונקציה מקבלת אובייקט שנקרא context שמכיל את ה-task_instance וממנו אפשר לקבל הרבה מידע על ה-task וכן על ה-DAG. במקרה הזה בדקנו למה שווה השדה exception שבתוך ה-context ואם הוא AirflowSensorTimeout זה אומר שהוא הגיע מאיזשהו סנסור. אם יש לנו כמה נצטרך לחשוב איך להבדיל ביניהם. אפשר פשוט שכל אחד יקרא לפונקציה אחרת.
דבר נוסף שאפשר להגדיר למקרה של timeout הוא soft_fail. השדה הזה קובע האם במקרה של timeout הסנסור יחשב כ-failed, או כ-skipped. אם הוא True אז במקרה של timeout הסנסור יחשב כ-skipped. בדיפולט הוא False ולכן הסנסור יחשב כ-failed. רק צריך לזכור שגם במקרה שמשימה היא skipped כל המשימות שאחריה לא ירוצו, אלא אם כן נשנה את ה-trigger_rule של המשימות הבאות.
עוד שדה שיכול לשפר את הסנסור שלנו הוא ה-exponential_backoff. אם השדה הזה שווה ל-True הוא גורם לכך שהזמן שעובר בין פעולות של הסנסור הולך וגדל. במקום שהוא יהיה קבוע, למשל בדוגמא לעיל קבענו שזה יהיה כל 200 שניות, הוא ילך ויגדל בכל פעם. לפעמים זה עוזר ולפעמים לא, כל מקרה צריך לבחון לגופו.
קצת על Security
באתר הרישמי ישנו עמוד שמדבר על security, והוא מחלק את הנושא הזה מספר נושאים. כאן נעסוק בקצרה בחלק מהנושאים הללו.
Access Control
ב-Airflow לכל משתמש יש role מסוים שקובע את ההרשאות שלו ב-webserver UI. ה-roles הדיפולטיביים לפי רמת ההרשאות שלהם הם:
- Admin - all possible permissions
- Op - User permissions + additional op permissions
- User - Viewer permissions + additional user permissions
- Viewer - limited viewer permissions
- Public - no permissions
ניתן לראות ב-UI את ה-role של כל משתמש Security->List Users ואת כל ה-roles הקיימים וכל אחד מה ההרשאות שלו ב-Security->List Roles.
יש שני סוגים של permissions:
1. Resource-Based permissions - ההרשאות הם פר resource. וכל הרשאה מוגדרת ע"י resource+action.
דוגמאות ל-resources הם - Dag, Task, Connection.
ה-actions הם - can_create, can_read, can_edit, can_delete.
דוגמאות להרשאות:
DAGs.can_read
Connections.can_create
Pool.can_delete
Variables.can_edit
Custome Roles
אפשר ליצור roles שיהיו מותאמים אישית לצרכים שלנו. כשיוצרים role אפשר להגביל אותו ל-DAGs מסוימים. כדי ליצור role אפשר להשתמש בעמוד Security->List Roles וזו הדרך הפשוטה. אפשר גם לעשות זאת מה-CLI.
הסתרת מידע סודי
[2021-08-17 15:37:30,821] {logging_mixin.py:104} INFO - dir_path is: /opt/***/dags/my_code/pythonOperator_simple_dag/pythonFileName
התקנת תוכנות בתוך ה-container של airflow
לפעמים יש לנו צורך להתקין בתוך ה-container של airflow תוכנות נוספות. למשל, אם אנחנו רוצים להריץ פקודות של AWS CLI נצטרך להתקין awscli ואם אנו צריכים להריץ פקודות של kubectl נצטרך להתקין kubectl.
כדי לעשות את זה, נריץ את airflow וניכנס לתוך ה-container של ה-worker. בתוך ה-container נתקין את התוכנות שאנו צריכים. ואז נצא מה-container. כדי ליצור image חדש שכולל את airflow יחד עם התוכנות שהתקנו בתוכו נשתמש בפקודה docker commit.
תיאור מפורט של התהליך הזה, סיכמתי פה.
התקנת Airflow בתוך kubernetes cluster ע"י Helm Chart
Helm הוא ה-package manager של Kubernetes שמאפשר יחסית בקלות להכניס תוכנות ל-Kubernetes cluster ע"י שימוש ב-helm chart.
במערכת שלי (ubuntu 18.04) ההתקנה של helm היתה ע"י הפקודה:
sudo snap install helm3
כרגיל בתוכנה, הרבה פעמים יש בעיות בלתי צפויות, שתמיד קורות דווקא לי... אז אם יש בעיות נסו את הפקודה הזו:
kubectl -n airflow get events --sort-by='{.lastTimestamp}'
היא אמורה לתת לכם רשימה של מה שקרה מאחורי הקלעים, ואולי תמצאו שם רמז למה גורם לבעיה שלכם.
באתר הרישמי ניתן לראות הסבר לגבי השימוש ב-helm עבור airflow. בגדול כל מה שצריך לעשות כדי להתקין את airflow ב-kubernetes cluster ע"י helm זה את הפקודות הבאות:
kubectl create namespace airflow
helm repo add apache-airflow https://airflow.apache.org
helm install airflow apache-airflow/airflow --namespace airflow
אישית לא יצא לי להנסות את זה ולכן אני רק מציין את האפשרות הזו, כיון שהיא חשובה ומאוד נוחה, אבל לצערי אין לי עדיין טיפים של ניסיון.
עד כאן להפעם, אולי בהמשך אמצא זמן להוסיף כאן נושאים נוספים. בהצלחה!
ו... אם אהבתם, תכתבו משהו למטה...