כמה מילים על - Hadoop, HDFS, MapReduce, YARN


Hadoop

פרויקט קוד פתוח של Java עבור איחסון מבוזר ועיבוד מבוזר. הוא נועד לרוץ על cluster של מאות ואפילו אלפי מחשבים בשביל כמויות נתונים גדולות – big data. הפרויקט התחיל ע"י Doug Cutting שעבד עבור yahoo (בהמשך הוא עבר להיות ה-Chief Architect ב-Cloudera, והיום – אני לא יודע מתי אתם קוראים את זה... אז חפשו בגוגל אם זה מעניין אתכם). הפרויקט הוא תחת Apache license 2.0 והוא קיבל השראה מ-MapReduce ו-Google File System.
Apache Hadoop framework מורכב מהחלקים הבאים:
  • Hadoop common – מכיל ספריות ופונקציות עבור שימוש שאר המודלים של Hadoop
  • Hadoop Distributed File System (HDFS) – מערכת קבצים מבוזרת, בעלת ביצועים מהירים בתוך ה-cluster.
  • Hadoop MapReduce – המודל שאחראי על large-scale data processing
  • Hadoop YARN – המודל שאחראי על ניהול המשאבים בתוך ה-cluster.

HDFS

מערכת קבצים (file system) שפותחה ב-Java ודומה למערכת הקבצים של Linux, אבל מאחורי הקלעים הקבצים מבוזרים. המערכת בנויה כך שתהיה עמידה בפני תקלות (fault-tolerant). כך שאם יש תקלה בשרת מסוים המערכת יודעת לזהות את התקלה ולהתגבר עליה בצורה מהירה ואוטומטית.
המערכת בנויה לתמיכה במאות מליוני קבצים ע"י scaling של המערכת לעוד ועוד מכונות (נקראות גם Nodes) ב-cluster. המערכת משתמשת בשיטה של write once read multiple times כדי לשמור על קוהרנטיות המידע וכדי להשיג גישה מהירה למידע.
המערכת בנויה לתת יותר תפוקה מאשר תגובה מהירה (optimized for throughput rather than latency). משמעות הדבר היא שהמערכת יותר טובה בהרצה של עיבוד ארוך על הרבה מידע מאשר לעשות אנליזות אינטרקטיביות על מידע שזורם ומגיע עכשיו.
עיבוד הנתונים "זז" כדי להיות קרוב לנתונים. משמעות הדבר היא שהמערכת תנסה להריץ את העיבוד על אותה מכונה ששם נמצאים הנתונים כדי להקטין את השפעת מהירות הרשת.

ארכיטקטורה של HDFS


הארכיטקטורה בנויה בצורה של Master-slaves. כאשר ה-master נקרא Namenode וה-slaves נקראים Datanodes (הריבועים הצהובים). בד"כ יש רק master אחד ב-cluster

הקבצים מחולקים ל-blocks (הריבועים הירוקים) שמאוחסנים על Datanodes. בכל מכונה (node) יש בד"כ Datanode אחד. ה- Datanode אחראי על ניהול האיחסון באותו node שבו הוא מותקן, וה- Namenode אחראי על ניהול כל ה-metadata, שכולל איזה blocks נמצאים על כל Datanode, ואיזה קבצים יש על כל block.

ברגע שיש פקודה ל-file system כמו פתיחת קובץ, סגירת קובץ, או שינוי שם של קובץ או ספריה, ה- Namenode שולח את הפקודה אל ה- Datanode הרלוונטי, ואותו Datanode מבצע את הפקודה. ה- Datanode אחראי גם על יצירת blocks, מחיקת blocks והכפלה של blocks (שנועדה בשביל fault-tolerance ו-resilience) לפי הפקודות שהוא מקבל מה-Namenode.

כל קובץ נשמר על מספר blocks לצורך fault-tolerance. גודל כל block וכמה פעמים יוכפל כל block (replication factor) ניתן לקינפוג במערכת.
המילה Rack בד"כ מייצגת קבוצה של Datanodes שנמצאים במיקום גיאוגרפי מסוים.

ה- Namenodeמקבל כל כמה זמן heartbeat ו-BlockReport מכל Datanode ב-cluster כדי לדעת מה המצב של כל Datanode והאם הוא פועל כראוי.

Hadoop MapReduce

תוכנה שמאפשרת עיבוד כמויות גדולות של מידע בסדרי גודל של multi-terabyte. התוכנה מאפשרת עיבוד מבוזר על cluster שמכיל אלפי nodes (שרתים). התוכנה יכולה לרוץ על מערכות רגילות יחסית (commodity hardware) שידוע מראש שמדי פעם יהיו Nodes שייפלו, ועדיין לספק מערכת שהיא מאוד אמינה ועמידה בפני תקלות (reliable and fault tolerant).

המערכת נכתבה ב-Java ובנויה משני מרכיבים:
  • API שמאפשר כתיבה של MapReduce workflows ב-Java
  • סט של services לצורך ניהול ה-workflows כמו למשל scheduling, distribution and parallelizing.

שלבים ב-Hadoop MapReduce job

1.     מחלקים את ה-data לחלקים בלתי תלויים
2.     כל חלק מעובד בצורה מקבילית ע"י map task אחר ב-node אחר
3.     לאחר מכן מערכת ה-MapReduce ממיינת את הפלט של ה-map tasks ושולחת את הפלט ל-reduce task
4.     הקלט והפלט של ה-map tasks ושל ה-reduce tasks נשמרים ב-file system
5.     מערכת ה-MapReduce אחראית על תזמון המשימות, מנטרת אותן ומריצה מחדש משימות שניכשלו
6.     ה-MapReduce שאחראי על העיבוד וה-HDFS שאחראי על ה-file system רצים על אותם Nodes. וכך העיבוד רץ על אותו Node שבו נמצא ה-data ורוחב הפס של התקשורת בין ה-nodes לא משפיע ולא מתבזבז
ה-MapReduce מחלק את ה-data לזוגות של key, value. לאחר מכן בתהליך ה-map ה-data מעורבב, ממויין, מסודר בקבוצות ומועבר לתהליכי ה-reduce. פעולת ה-reduce מתבצעת לפי key, value כך שכל reduce מבוצע על זוגות בעלות אותו key.

חשוב לדעת, שבגלל אופן הפעילות הזאת לא ניתן להשתמש ב-MapReduce על נתונים שנתון אחד תלוי בנתון הקודם, כמו סידרת פיבונאצ'י לדוגמה.

קונפיגורציה של job כוללת מיקומי קלט ופלט ב-HDFS, פונקציית map  ופונקציית reduce ע"י הממשק של MapReduce או בדרכים אחרות שקיימות ופרמטרים נוספים על ה-job. לאחר מכן ה-hadoop job client שולח את ה-job (כ-jar או executable אחר) יחד עם הקונפיגורציה ל-ResourceManager ב-YARN שאחראי להפיץ אותם ל-nodes ולנהל את המשימה ע"י תיזמון, ניטור, ודיאגנוסטיקה של הריצה.

איור מויקיפדיה שמדגים את פעולת MapReduce עבור משימה של ספירת מילים זהות בטקסט:

YARN

מנהל משאבים גנרי שהוצג לראשונה ב-Hadoop 2.0. YARN זה ראשי תיבות של Yet Another Resource Negotiator. YARN הוא כלי עיקרי ב-hadoop כיון שהוא מנהל משאבים רבים במערכת ולא רק את ה-MapReduce.

הארכיטקטורה של YARN נראית כך (מתוך האתר הרישמי):


ב-YARN יש ResourceManager אחד שאחראי על חלוקת המשאבים בין האפליקציות במערכת. בכל node יש NodeManager שאחראי על ה-containers שיש בכל Node. המושג container ב-YARN מייצג אוסף של משאבי חומרה (CPU cores, memory, disk, network) שעליהם רצה האפליקציה בפועל. 

ה-NodeManager מנטר את ניצול המשאבים ומדווח ל-ResourceManager כדי שיוכל להחליט על תזמון המשאבים למשימות השונות.

ה-ResourceManager מורכב משני חלקים: Scheduler and ApplicationsManager.

ה-Scheduler אחראי על הקצאת המשאבים לאפליקציות השונות. ואילו ה- ApplicationsManager אחראי על קבלת משימות חדשות (job submissions) ועל ההחלטה איפה המשימות האלו ירוצו. הוא מתקשר עם ה-NodeManager שנמצא ב-node שעליו הוא רוצה להריץ וכך מחליט אם להריץ על ה-node הזה או על node אחר.

רכיב נוסף נקרא ApplicationMaster והוא קיים לכל אפליקציה. ה-ApplicationsManager מריץ את ה-ApplicationMaster של כל אפליקציה ושולט על כל ה-ApplicationMasters. כמו כן הוא מספק service שנועד להריץ מחדש את האפליקציה במקרה של נפילה.

ה-ApplicationMaster אחראי לנטר את מצב האפליקציה ואת ההתקדמות שלה. כמו כן הוא מתקשר עם ה- Scheduler לצורך קבלת הקצאות של משאבים לפי הצורך.

נסכם את סדר הפעולות:
  1. ה-ResourceManager מקבל בקשה מה-client להריץ אפליקציה
  2. ה-ResourceManager מקבל את מצב ה-node מכל NodeManager ומחליט איפה להריץ את האפליקציה
  3. ה-ResourceManager מריץ את ה-ApplicationMaster של אותה אפליקציה (לאו דווקא על אותו node שעליו ירוץ ה-container של האפליקציה, כפי שניתן לראות באיור של הארכיטקטורה)
  4. ה-ApplicationMaster מנטר את ה-container שעליו רצה האפליקציה ומבקש משאבים מה- ResourceManager לפי הצורך


Hadoop Ecosystem

עד כאן עסקנו בחלקים העיקריים של Hadoop. בפוסטים הבאים נעסוק בחלקים נוספים ב-ecosystem של Hadoop. חלקם נועדו להוסיף יכולות וחלקם לפשט את העבודה.
אפשר לחלק את תפקיד החלקים הנוספים הללו ל-5 נושאים עיקריים:
1.     Data management – כולל איפה פיזית ה-data מאוחסן פלוס ניהול ה-Nodes שעליהם מאוחסן המידע. בנושא הזה יש את HDFS ו-YARN שהם חלק מה-core של Hadoop אבל יש גם את HBase שהוא no-SQL DB.
2.     Data access – מדובר על פעולות כמו נירמול המידע, פעולות סטטיסטיות, פעולות SQL כמו join או filter ועוד. הכלים שיעזרו לנו בזה הם MapReduce Hive עבור שאילתות SQL, ו-Pig עבור סקריפטינג.
3.     Data ingestion and integration – בביטוי ingestion הכוונה לתהליך לקיחת ה-data. יכול להיות עבור שימוש מיידי או עבור שמירה ב-DB. בד"כ לא כולל תהליך של המרה ועיבוד המידע. בביטוי integration הכוונה להכנת המידע עבור השימוש שלו הוא מיועד. הכלים לתחומים האלה הם Flume, Sqoop, Kafka, Storm. הם עוזרים בקליטת המידע ממגוון מקורות כמו רשתות חברתיות, שרתי רשת, SQL DB, no-SQL DB, מערכות messaging, Queues, ועוד.
4.     Data Monitoring – בקטגוריה הזו יש כלים שנועדו לנטר את המידע בצורה מרוכזת. הכלים בתחום הזה הם Ambari, Zookeeper, Oozie.

5.     Data governance and Security – מדובר על תהליכים שאחראים על ניהול המידע כך שיהיה זמין, שלם ומאובטח (authentication, Authorization, Encryption). כמו כן ניתן לקבוע סט של כללים על המידע לפי הצורך ולאכוף אותו. הכלים בתחום הזה הם Falcon, Ranger, Knox.



אם אהבתם, תכתבו משהו למטה...

ספריית pandas - חלק 3 - מבני נתונים - DataFrame - חלק ב

Image result for pandas logo

הוספת עמודות על בסיס עמודות אחרות - פונקציית assign

פונקציית assign מאפשרת הוספת עמודות על בסיס עמודות אחרות. הפונקצייה לא משנה את ה-df אלא מחזירה העתק של ה-df עם השינוי החדש.

>>> df = pd.DataFrame({'a':np.arange(1,5),'b':np.arange(10,50,10),
                       'c':np.arange(100,500,100)})
>>> df
   a   b    c
0  1  10  100
1  2  20  200
2  3  30  300
3  4  40  400

>>> df.assign(d = df['a'] + df['b'])
   a   b    c   d
0  1  10  100  11
1  2  20  200  22
2  3  30  300  33
3  4  40  400  44

ניתן גם לתת פונקציה שתחושב על אותו df שעליו פועלים.
לדוגמה:

>>> df.assign(e = lambda x: x['c'] / x['b'] )
   a   b    c     e
0  1  10  100  10.0
1  2  20  200  10.0
2  3  30  300  10.0
3  4  40  400  10.0

בדוגמה הזו x הוא df.
הצורה הזו שימושית במקרה שאין לנו אפשרות לגשת ישירות ל-df למשל כשמשרשרים פקודות.

>>> df.query('a > 2').assign(f = lambda x: x.a * a.b) 
   a   b    c   f
2  3  30  300  12
3  4  40  400  16

בדוגמה הזו ע"י פונקציית query מקבלים df חדש שמכיל רק את השורות שבהם a>2 ועל ה-df החדש הזה יוצרים את עמודה f עם פונקציה פנימית בתוך ה-assign. ה-x בפונקציה הפנימית מייצג את ה-df החדש.

החל מגירסה 0.23.0 (פייתון 3.6) ניתן להתייחס באותו assign לעמודות חדשות שנוצרו באותה הפקודה לפני העמודה הנוכחית, לדוגמה:


>>> df = pd.DataFrame({'a':[1,2,3],
                       'b':[3,4,5]})
>>> df
   a  b
0  1  3
1  2  4
2  3  5

>>> df.assign(c = lambda x: x['a'] + x['b'],
              d = lambda x: x['c'] + x['a'])
   a  b  c   d
0  1  3  4   5
1  2  4  6   8
2  3  5  8  11

כשעמודה d נוצרת היא מסתמכת על עמודה c שנוצרה ממש לפניה באותה פקודת assign.

אינדוקס ובחירת חתך מ-DataFrame


קריאת עמודה

df[col] --> return Series
קריאת שורה ע"י תוית
df.loc[label] --> return Series
קריאת שורה ע"י מיקום
df.iloc[col] --> return Series
קריאת טווח שורות
df[5:10] --> return DataFrame
קריאת שורות ע"י וקטור בוליאני
df[bool_vec] --> return DataFrame

כשקוראים שורה, האינדקסים שלה הם שמות העמודות של ה-df.
לדוגמה:


>>> df = pd.DataFrame({'red':[1,2,3],
                       'blue':[3,4,5],
                       'green':[6,7,8]},index=['a','b','c'])
>>> df
   red  blue  green
a    1     3      6
b    2     4      7
c    3     5      8

>>> df.loc['a']
red      1
blue     3
green    6
Name: a, dtype: int64

>>> df.iloc[1]
red      2
blue     4
green    7
Name: b, dtype: int64


פעולות אריתמטיות ובוליניות

פעולות אריתמטיות עם סקלר יתבצעו על כל האיברים:

>>> df1
   red  blue  green
a    1     4      7
b    2     5      8
c    3     6      9

>>> df1 + 2
   red  blue  green
a    3     6      9
b    4     7     10
c    5     8     11

>>> df1 * 5 
   red  blue  green
a    5    20     35
b   10    25     40
c   15    30     45

פעולות בוליניות גם הם יתבצעו בצורה אינטואיטיבית.
הפעולות הבוליניות הן:
or  --> |
and --> &
xor --> ^
not --> -
לדוגמה:

>>> df1 = pd.DataFrame({'a': [1, 0, 1], 'b': [0, 1, 1]}, dtype = bool)

>>> df2 = pd.DataFrame({'a': [1, 1, 0], 'b': [0, 1, 0]}, dtype = bool)

>>> df1
       a      b
0   True  False
1  False   True
2   True   True

>>> df2
       a      b
0   True  False
1   True   True
2  False  False

>>> df1 | df2
      a      b
0  True  False
1  True   True
2  True   True

>>> -df1
       a      b
0  False   True
1   True  False
2  False  False

היפוך (transpose) של DataFrame

הפיכה של DataFrame כך שהשורות יהיו העמודות ולהפך מתבצעת בדומה ל-np.ndarray פשוט ע"י T:

>>> df1 = pd.DataFrame({'red':[1,2,3],
                        'blue':[4,5,6],
                        'green':[7,8,9]},index=['a','b','c'])

>>> df1
   red  blue  green
a    1     4      7
b    2     5      8
c    3     6      9

>>> df1.T
       a  b  c
red    1  2  3
blue   4  5  6
green  7  8  9


Data alignment

בדומה ל-label alignment שהסברנו על Series בחלק 1 גם בפעולות בין df שונים יש data alignment. ההתאמה מתבצעת לפי עמודות ואינדקסים (שורות) והתוצאה תהיה איחוד של העמודות והאינדקסים הזהים.
לדוגמה:

>>> df1 = pd.DataFrame({'red':[1,2,3],
                        'blue':[4,5,6],
                        'green':[7,8,9]},index=['a','b','c'])
>>> df2 = pd.DataFrame({'red':[10,20,30],
                        'blue':[40,50,60],
                        'yellow':[70,80,90]},index=['d','c','b'])

>>> df1
   red  blue  green
a    1     4      7
b    2     5      8
c    3     6      9

>>> df2
   red  blue  yellow
d   10    40      70
c   20    50      80
b   30    60      90

>>> df1 + df2
   blue  green   red  yellow
a   NaN    NaN   NaN     NaN
b  65.0    NaN  32.0     NaN
c  56.0    NaN  23.0     NaN
d   NaN    NaN   NaN     NaN

בפעולה בין df ל-Series ה-alignment יהיה בין האינדקסים של ה-Series לעמודות של ה-df  ויתבצע broadcasting של ה-Series על פני כל השורות של ה-df.
לדוגמה:

>>> df1
   red  blue  green
a    1     4      7
b    2     5      8
c    3     6      9

>>> df1.iloc[1] # this is Series

red      2
blue     5
green    8
Name: b, dtype: int64

>>> df1 - df1.iloc[1]
   red  blue  green
a   -1    -1     -1
b    0     0      0
c    1     1      1

ביצוע פונקציות NumPy על DataFrame ו-Series

פונקציות NumPy שמתבצעות פר אלמנט ניתן להשתמש ישירות על df או Series.
לדוגמא:

>>> np.exp(df)
>>> np.sin(df)

וכן פונקציות NumPy רבות נוספות:

>>> np.max(df)
>>> np.mean(df)
>>> np.median(df)

פונקציות שפועלות על שני אובייקטים למשל שני Series, יבצעו קודם כל label alignment ואז יבצעו את הפונקציה. למשל הפונקציה np.remainder שמחלקת מערך אחד בשני ומחזירה שארית של כל תא תבצע קודם alignment:

>>> a = pd.Series([10,3,5], index=['a','b','c'])
>>> b = pd.Series([1,2,7], index=['c','b','a'])
>>> np.remainder(a,b)
a    3
b    1
c    0
dtype: int64


עד כאן להפעם. בהצלחה.

אם אהבתם, תכתבו משהו למטה...