RxJS - The ReactiveX library for JavaScript





אם הגעת לכאן אני מעריך שאתה יודע על מה מדובר, אבל בכל זאת ניתן הקדמה קצרה.
אם ניכנס לאתר הרשמי של ReactiveX נראה את הסיסמה הבאה:




אז מה בעצם אומר המשפט הזה? ההסבר הוא פשוט, ReactiveX הוא שם כולל ל-API בשביל תכנות אסינכרוני עם observable streams. 
אז קודם צריך להבין ש-ReactiveX נוגע לתכנות אסינכרוני. ז"א לקוד שמחכה שמשהו יקרה וכשהוא קורה הקוד צריך להגיב למה שקרה. למשל קוד שמחכה ללחיצה על כפתור מסוים, או לתגובה מהשרת.
סבבה, אבל מה זה בכלל observable streams? פרוש המילה observe זה להתבונן או להבחין. כשיש לנו קוד שצריך להתבונן על זרם של מידע שמגיע אלינו מדי פעם ולהגיב למה שהגיע פה נכנסת האפשרות לשימוש ב-ReactiveX. אם למשל השרת שלי מדיי פעם מעדכן לי רשימה במידע נוסף ואני צריך לפלטר את מה שהגיע או לעשות פעולה אחרת כתלות במידע החדש, פה אני צריך קוד שיאזין לאותו מקור ויגיב בצורה הנכונה. אותו המקור שעליו אנו מתבוננים נקרא observable. וכיון שמדובר במקור מידע שיכול להתעדכן במידע חדש מדיי פעם לכן הוא נקרא observable stream.
אבל ReactiveX זה השם הכולל של ה-API. את אותו API מימשו לשפות שנות:



במאמר זה נתמקד בספריית RxJS שהיא מימוש ה-API לשפת JavaScript.


התקנות

נתחיל מהדברים שאנחנו צריכים להתקין כדי לעבוד עם RxJS.
נתחיל מספריה שיש בה רק קובץ index.html שמכיל שלד של דף html בסיסי.


index.html

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
<!DOCTYPE html>
<html>
<head>
 <title>RxJs</title>
</head>
<body>
 <div>Hello Israel</div>
</body>
</html>

באותה ספריה ב-command line נכתוב:

npm init

כדי לאתחל קובץ package.json וניתן איזה שם שנירצה בשדה name כשנתבקש.
לאחר מכן נתקין את RxJS ע"י הפקודה:

npm install rxjs --save

נשתמש ב"save--" כדי שזה יכתב לקובץ package.json. ספריית ה-RxJS תותקן בספריית  node_modules.

עכשיו נתקין מספר כלים שיעזרו לנו בפיתוח:


npm install webpack webpack-dev-server typescript typings ts-loader --save-dev


  1. webpack - כלי שבונה ומאגד את כל הקוד שלנו לקובץ אחד שישלח אל ה-browser
  2. webpack-dev-server - זה בעצם web server לפיתוח, שאנו נשתמש בו כדי לשלוח קבצים ל-browser דרך http
  3. typescript - נתקין את הקומפיילר של TypeScript כי אנחנו נכתוב ב-TypeScript
  4. typings - כלי שיעזור לנו בכתיבת TypeScript 
  5. ts-loader - כלי של webpack שעוזר לו לעבוד עם TypeScript
בסיום הפקודה כתבנו save-dev-- כדי שכל ההתקנות האלו יהיו תקפות רק לפיתוח ולא ל-product.

עכשיו יש לנו צורך בדבר נוסף, אנחנו צריכים קובץ של types definition של TypeScript כדי להשתמש בכל היכולות של RxJS. כיון ש-RxJS נכתב לשימוש ב-ES6 הוא עושה שימוש ביכולות החדשות של ES6. הקומפיילר של TypeScript לא מכיר את ה-APIs והפונקציות החדשות, אבל ניתן להכיר לו אותן אם נכתוב את הפקודה הבאה:

node_modules\.bin\typings install dt~es6-shim --global --save

ה-dt זה ראשי תיבות של definitly types, ולאחר מכן יש את שם החבילה שאנחנו רוצים להתקין es6-shim.
כתבנו global-- כיון שה-type definitions האלו מייצגים הגדרות גלובליות לצורך ה-JS.
בנוסף כתבנו save-- כיון שזה יגרום ל-typings ליצור קובץ typings.json שיכלול את כל ה-typings שאנחנו נתקין.



קינפוג המערכת

אנחנו צריכים לקנפג 2 רכיבים במערכת שלנו. הראשון הוא את הקומפיילר של TypeScript והשני הוא ה-webpack.
קינפוג הקומפיילר של TypeScript מתבצע בקובץ tsconfig.json.
קינפוג ה-webpack מתבצע בקובץ webpack.config.js.

tsconfig.json
{
    "compilerOptions": {
        "target": "es5",
        "module": "commonjs",
        "sourceMap": true
    }
}

target - מציין שהקומפילציה של TypeScript תהיה עבור הפורמט גרסת es5 של JavaScript.
module - ה- commonjs זה פורמט של module ש-webpack מבין.
sourceMap - עוזר לנו בדיבאג של האפליקציה.



הקובץ webpack.config.js יכיל קוד ש-webpack יריץ בסביבת ה-node והוא נועד לייצא אובייקט קונפיגורציה.

webpack.config.js
module.exports = {
    entry: "./main",
    output: { filename: "app.js" },
    module: {
        loaders: [
            {
                test: /.ts$/,
                loader: "ts-loader"
            }
        ]
    },
    resolve: {
        extensions: [".ts", ".js"]
    }
}

entry - מציין את קובץ ה-bootstrap של האפליקציה. זהו הקובץ שמתחיל את כל האפליקציה. ה-webpack ינתח את הקובץ הזה ואת הקבצים שהוא תלוי בהם וישתמש במידע הזה כדי להפוך את האפליקציה שלי לחבילה אחת.

output - מציין את שם הקובץ שיווצר ע"י webpack. בקובץ ה-index.html נוסיף את הקובץ הזה בשורה שלאחר 
<div>Hello Israel</div> בצורה הבאה: 
<script src="app.js"></script>

module - פה אנחנו כותבים את ה-module loaders שיש לנו. ה-loaders ב-webpack הם כלים שיכולים להשתמש בסוגים שונים של קבצים לעבד אותם ולתת ל-webpack את התוצאה כדי שיוכל לאגד הכל יחד לקובץ js יחיד. כיון שאנו נשתמש רק ב-TypeScript לכן נכתוב בשדה שנקרא test את ה-regex שמתאים לסיומת של קבצי TypeScript. ה-regex יהיה /$ts./. הדבר הזה מגדיר ל-webpack שעל כל קובץ ts הוא צריך להשתמש ב-loader שנקרא "ts-loader" שאותו התקנו בחלק של ההתקנות לעיל. ה-ts-loader הוא בעצם plugin של webpack שיודע איך להשתמש בקומפיילר של TypeScript כדי לעבד את קבצי ה-ts. ולאחר ש-TypeScript סיים להמיר את קובץ ה-ts לקובץ js אז webpack ייקח את הקובץ שנוצר ויכניס אותו לתוך הקובץ app.js.

resolve - בחלק שנקרא extensions נגדיר את סיומות הקבצים ש-webpack צריך להשתמש בהם. יש גרסאות של webpack שדורשות גם גרשיים ריקים ולכן נכתוב:

extensions: ["", ".ts", ".js"]

נעשה דבר נוסף שיקל עלינו את השימוש בכלים הללו. בקובץ package.json נכניס בשדה scripts סקריפט חדש כך שברגע שנכתוב npm start הפקודות שיתבצעו הם: 
webpack-dev-server - זה ייתן לנו שרת פשוט שיגיש את כל הקבצים שלנו בפורט 8080
watch --inline-- הדבר הזה יגרום לכך שכל פעם שנשנה קובץ ts ונשמור אותו אז ה-browser יעשה refresh ויקבל את השינויים החדשים.

סקריפט נוסף שנירצה יקרא postinstall. הסקריפט הזה ירוץ אוטומטי כל פעם אחרי שנריץ פקודת npm install. הפקודה שנכתוב היא typings install. הדבר הזה יגרום לכך שכל פעם שמישהו ייקח את הפרויקט הזה ויריץ npm install בהתחלה כדי להתקין את כל הספריות הנצרכות, אוטומטית גם ירוץ לו typings install ויותקן לו ה-es6-shim שהתקנו לעיל.
קובץ ה-packge.json שלנו יראה כך:
package.json
{
  "name": "psrsjx5",
  "version": "1.0.0",
  "description": "",
  "scripts": {
    "start": "webpack-dev-server --watch --inline",
    "postinstall": "typings install"
  },
  "dependencies": {
    "rxjs": "^5.2.0"
  },
  "devDependencies": {
    "ts-loader": "^2.0.3",
    "typescript": "^2.2.2",
    "typings": "^2.1.0",
    "webpack": "^2.3.2",
    "webpack-dev-server": "^2.4.2"
  }
}

לאחר כל זה נבדוק את הסביבה שלנו ע"י הפקודה npm start. 


npm start

נפתח ב-browser חלון בכתובת localhost:8080 ונראה את הסביבה פועלת ואת המילים Hello Israel.
עכשיו הכל מוכן להתחיל לעבוד עם RxJS.


Observable and Observer

ב-RxJS יש שני סוגים  יישויות עיקריות, ה-observable וה-observer. 
ה-observable זה בעצם הדבר שעליו אנחנו מסתכלים ופועל בצורה אסינכרונית. 
ה-observer הוא הצופה שמחכה לקבל את הקריאות האסינכרוניות ולטפל בהם.
נדגים את הפעילות שלהם ע"י דוגמה:


import {Observable} from "rxjs";

let dataStream = [11, 22, 33];
let source = Observable.from(dataStream);  

class myObserver {
 next (value) {
  console.log(`the value is ${value}`);
 }
 error (e) {
  console.log(`the error is ${e}`);
 }
 complete () {
  console.log(`the stream is completed`);
 }
}

source.subscribe(new myObserver());

בדוגמה זו ה- observable הוא מערך של מספרים. ה-observer משויך ל-observable ע"י הפונקציה subscribe.
הפונקציה next היא הפונקציה שתיקרא ע"י ה-observer כאשר מגיע איזשהו data.
הפונקציה error תיקרא אם יש איזשהו בעיה ולכן נזרק exception. ה-e הוא אובייקט של error שהגיע עם הקריאה ל-error.
הפונקציה complete תיקרא ע"י ה-observable אם הוא סיים את כל מה שהיה לו לשלוח (לדוגמה במקרה שה-observable רוצה לשלוח מערך שלם והוא סיים לשלוח את כל תוכן המערך). לא כל observable משתמש בפונקציה הזו כי לא תמיד יש סוף למה שנשלח (לדוגמה אם מאזינים להקלקות של עכבר זה משהו שלא אמור לקרוא לפונקציה complete).

צורה מקוצרת לבניית observer:


import {Observable} from "rxjs";

let dataStream = [11, 22, 33];
let source = Observable.from(dataStream);  

source.subscribe(
    value => console.log(`value: ${value}`),
    e => console.log(`error: ${e}`),
    () => console.log("complete")   
);

ע"י שימוש בפונקציה subscribe וב-arrow functions ניתן לכתוב את שלושת הפונקציות בצורה מקוצרת.


הפונקציה Observable.create

במקום להשתמש בפונקציה from ניתן להשתמש בפונקציה create שהיא פונקציה בסיסית יותר ועל ידה אפשר לשלוט טוב יותר ב-observer. 
הפונקציה create מקבלת פונקציה, והפונקציה הזו מקבלת כפרמטר observer object.

import {Observable} from "rxjs";

let numbers = [11, 22, 33];
let source = Observable.create(observer => {

    for (let n of numbers) {
    
        if(n > 20) {
            observer.error("some error");
        }

        observer.next(n);
    }

    observer.complete();
});

source.subscribe(
    value => console.log(`value: ${value}`),
    e => console.log(`error: ${e}`),
    () => console.log("complete")   
);

בדוגמה הזו רואים שניתן לשלוט על הקריאות ל-observer, מתי נקרא next, מתי נקרא error ומתי נקרא complete.
אם נריץ את הקוד הזה מה שיודפס לקונסול זה:

value: 11
error: some error

מפה ניתן להבין שה-observer לא ממשיך הלאה לאחר קריאה ל-error. ולכן לא הודפסו הערכים 22 ו-33, וההודעה של complete. קריאה ל-observer.error דומה לזריקה של exception שבעצם עוצרת את הפעילות על מה שמגיע מה-observable.


הסבר על מספר אופרטורים של observable

האופרטור map משנה כל ערך שמגיע מה-obserable על פי מה שכתבנו ב-map.
האופרטור filter מציין איזשהו כלל שעל פיו מחליטים האם אנחנו רוצים את הערך שהגיע או לא.

דוגמה:

import {Observable} from "rxjs";

let numbers = [11, 22, 33];
let source = Observable.create(observer => {

    let index = 0;
    let produceValue = () => {
        observer.next(numbers[index++]);

        if(index < numbers.length) {
            setTimeout(produceValue, 1000);
        }
        else {
            observer.complete();
        }
    }

    produceValue();

}).map(n => n * 2)
  .filter(n => n > 20);
  

source.subscribe(
    value => console.log(`value: ${value}`),
    e => console.log(`error: ${e}`),
    () => console.log("complete")   
);

ה-map יגרום לכך שכל ערך שמגיע יוכפל ב-2.
ה-filter יגרום לכך שרק ערכים שגדולים מ-20 יגיעו ל-observer.

אופרטור שימושי ופשוט נוסף הוא האופרטור delay. הוא משהה את התגובה של ה-observer במספר מילי שניות לפי מה שנכתוב לו. לדוגמה, אם נרצה השהיה של חצי שנייה נכתוב:
.delay(500)

מעקב אחרי events ע"י fromEvent

ע"י שימוש ב-fromEvent ניתן ליצור observable לאלמנט של DOM. אנחנו צריכים לספק לפונקציה הזו שני פרמטרים. הראשון הוא האלמנט של ה-DOM שאנחנו רוצים ליצור לו observer. והשני זה ה-event שלו אנו רוצים להאזין.
אם לדוגמה נרצה לעקוב אחרי כל תזוזה של העכבר ולהדפיס את המיקום שלו:



import {Observable} from "rxjs";

let source = Observable.fromEvent(document, "mousemove");

source.subscribe(
    value => console.log(value),
    e => console.log(`error: ${e}`),
    () => console.log("complete")   
);

אם נריץ את זה נראה שבקונסול מודפסים לנו הערכים הבאים: isTrusted, screenX, screenY, clientX, clientY.
על גבי זה ניתן להשתמש באופרטורים האחרים שלמדנו כמו map ו-filter. 
לדוגמה, אם נירצה להדפיס רק את המיקום של העכבר בחלון (clientX, clientY) ורק כאשר ערכי ה-X קטנים מ-300, אז נעשה זאת כך:



import {Observable} from "rxjs";

let source = Observable.fromEvent(document, "mousemove")
   .map((e: MouseEvent) => {
    return{
     x: e.clientX,
     y: e.clientY
          }
   })
   .filter(f => f.x < 300);

source.subscribe(
    value => console.log(value),
    e => console.log(`error: ${e}`),
    () => console.log("complete")   
);


עבודה מול שרת

לשימוש ב-RxJS לעבודה מול שרתים יש מספר יתרונות. RxJS מציע הרבה פונקציות שעוזרות לנו לממש בקלות כל מיני אפשרויות מול שרתים. בנוסף, RxJS מבצע אופטימיזציה לעבודה מול שרתים. למשל אם יש observable ששולח פקודת http get לשרת, אבל אף אחד לא נרשם (subscribe) ל-observer שלו אז לא תתבצע שום קריאת http get כדי לחסוך בתעבורת רשת.

דוגמה פשוטה לעבודה מול שרת:
נכין קובץ json שיכיל שמות של מספר מוצרים והוא יהיה כביכול המידע שחוזר מהשרת שלנו.


products.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[
   {
     "title": "productName1"
   },
   {
     "title": "productName2"
   },
   {
     "title": "productName3"
   },
]

נבנה דף שמכיל כפתור שבלחיצה עליו נשלחת בקשת http get לשרת כדי לקחת את המידע מ-products.json ולהציג אותו על המסך. דף ה-html שלנו יהיה:


index.html
<!DOCTYPE html>
<html>
<head>
 <title>RxJs</title>
</head>
<body>
 <button id="myButton">Get Products</button>
 <div id="output"></div>
 <script src="app.js"></script>
</body>
</html>

והקוד שלנו ב-main.ts יהיה:

main.ts
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import {Observable} from "rxjs";

let myButton = document.getElementById("myButton");
let output = document.getElementById("output");

let click = Observable.fromEvent(myButton, "click");

function loadProducts(url: string){
 return Observable.create(observer => {
  let xhr = new XMLHttpRequest();

  xhr.addEventListener("load", () => {
   let data = JSON.parse(xhr.responseText);
   observer.next(data);
   observer.complete();
  });

  xhr.open("GET", url);
  xhr.send();  
 });
}

function renderData(data){
 data.forEach(d => {
  let div = document.createElement("div");
  div.innerText = d.title;
  output.appendChild(div);
 })

}

click.flatMap(e => loadProducts("products.json"))
  .subscribe(
     renderData,
     e => console.log(`error: ${e}`),
     () => console.log("complete")   
);

הסבר:
בקוד שלעיל יצרנו observable בשם click שמאזין ל-event של לחיצה על כפתור "myButton". ברגע שלוחצים על הכפתור הזה נקראת הפונקציה loadProducts שמחזירה גם היא observable. ה-observable הזה שולח בקשת http get ל-products.json ומאזין לחזרה של המידע. ברגע שהמידע מתקבל הוא קורא לפונקציה next של ה-observer שלו עם המידע שהתקבל. פונקציית ה-next של ה-obvserver נקראת renderData והיא מציגה את המידע על המסך.
ב-observable שנקרא click עשינו שימוש בפונקציית flatMap. הפונקציה הזו מממשת פונקציית next (שבעצם קוראת ל-loadProducts) וגורמת לכך שה-observable שחוזר מהפונקציה יקרא לפונקציית ה-subscribe שבשורה 33. ניתן לקרוא הסבר על flatMap באתר הרישמי. אם היינו משתמשים בפונקציית map במקום flatMap, אז ה-subscribe שבשורה 33 היה מתייחס ל-observable שנקרא click ולא ל-observable שחוזר מ-loadProducts. הפונקציה flatMap היא פונקציה חשובה ושימושית ולכן כדאי להבין אותה לעומק.


Retries

במקרה שהשרת מחזיר לנו הודעת שגיאה נשתמש בפונקציית ה-error של ה-observer ונשלח אליה את סטטוס השגיאה שהשרת החזיר. בנוסף ננסה לשלוח לשרת בקשת GET עוד כמה פעמים למקרה שהייתה בעיה רגעית.
הקוד הבא מתבסס על הקוד שהובא לעיל ומדגים את הטיפול בשגיאה מהשרת (הטקסט המודגש מראה את השינויים העיקריים):


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import {Observable} from "rxjs";

let myButton = document.getElementById("myButton");
let output = document.getElementById("output");

let click = Observable.fromEvent(myButton, "click");

function loadProducts(url: string){
 return Observable.create(observer => {
  let xhr = new XMLHttpRequest();

  xhr.addEventListener("load", () => {
   if(xhr.status === 200){
    let data = JSON.parse(xhr.responseText);
    observer.next(data);
    observer.complete();
   }
   else{
    observer.error(xhr.statusText);
   }
  });

  xhr.open("GET", url);
  xhr.send();  
 }).retry(3);
}

function renderData(data){
 data.forEach(d => {
  let div = document.createElement("div");
  div.innerText = d.title;
  output.appendChild(div);
 })

}

click.flatMap(e => loadProducts("someNotExistFile.json"))
  .subscribe(
     renderData,
     e => console.log(`the error is: ${e}`),
     () => console.log("complete")   
);

ביקשנו מהשרת קובץ שלא קיים ולכן השרת החזיר לנו שגיאה שהעמוד לא נמצא (שגיאת 404). את סטטוס השגיאה העברנו לפונקציית ה-error של ה-observer שמדפיסה לקונסול את סטטוס השגיאה. בנוסף עשינו 3 ניסיונות חוזרים לבקש מהשרת את אותה הבקשה ע"י שימוש בפונקציית retry של RxJS.

בעולם האמיתי בדרך כלל נרצה לבצע ניסיונות חוזרים אחרי השהיה מסוימת. כדי לבצע את זה נשתמש בפונקציה retryWhen. הפונקציה הזו מורכבת ולכן נדגים את השימוש בה ולאחר מכן נסביר אותה:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import {Observable} from "rxjs";

let myButton = document.getElementById("myButton");
let output = document.getElementById("output");

let click = Observable.fromEvent(myButton, "click");

function loadProducts(url: string){
 return Observable.create(observer => {
  let xhr = new XMLHttpRequest();

  xhr.addEventListener("load", () => {
   if(xhr.status === 200){
    let data = JSON.parse(xhr.responseText);
    observer.next(data);
    observer.complete();
   }
   else{
    observer.error(xhr.statusText);
   }
  });

  xhr.open("GET", url);
  xhr.send(); 
 }).retryWhen(retryPolicy({attempts: 3, delay: 1000}));
}

function retryPolicy({attempts = 5, delay = 2000}){
 return function(errorFromLoadProduct){
  return errorFromLoadProduct
  .scan((acc, value)=>{
   console.log(acc, value);
   return acc+1;
  },0)
  .takeWhile(acc => acc < attempts)
  .delay (delay);
 }
}


function renderData(data){
 data.forEach(d => {
  let div = document.createElement("div");
  div.innerText = d.title;
  output.appendChild(div);
 })

}

click.flatMap(e => loadProducts("someNotExistFile.json"))
  .subscribe(
     renderData,
     e => console.log(`the error is: ${e}`),
     () => console.log("complete")   
);

הפונקציה retryWhen מקבלת כפרמטר פונקציה שמקבלת observable ומחזירה observable.
ה-observable שהפונקציה הזו מקבלת הוא ה-observable שמכיל את ה-error שהתקבל מה-observable האבא. ה-input הזה מתבצע אוטומטית ע"י RxJS אנחנו רק צריכים לתת ל-observable שמתקבל שם. במקרה שלנו נקרא לו errorFromLoadProduct. ולכן בכל פעם ששורה 19 תיקרא, ישלח ה-xhr.statusText ל-errorFromLoadProduct. ולכן תהיה לנו גישה רק לנתוני ה-error ולא לנתוני ה-success.
על פי ה-observable שמוחזר מהפונקציה שמחזירה retryPolicy יוחלט האם לעשות ניסיון נוסף (retry) או לא. ולכן אם ה-observable המוחזר יחזיר איזשהו ערך אז ה-retryPolicy יבצע ניסיון נוסף של הפעולה הקודמת. אבל אם ה-observable המוחזר ייצר error או קריאת complete אז ה-retryPolicy לא יבצע ניסיון נוסף.
ניסיתי להסביר את זה בצורה הכי מובנת שיכולתי אבל אני יודע שזה עדיין מסובך, ולכן אני ממליץ לקרוא את הסבר הזה כמה פעמים תוך כדי התבוננות בקוד ולנסות גם לכתוב את זה בעצמכם, ואז ההסבר יישב לכם טוב טוב בראש.

כדי שנקבל את מה שרצינו היה צורך להשתמש בעוד כמה אופרטורים שיופעלו על ה-observable שנקרא errorFromLoadProduct:

האופרטור scan - משמש בד"כ לספירה של דברים. במקרה שלנו לספירה של כמות ה-retries. האופרטור scan מקבל כ-Input פונקציה ופרמטר. הפונקציה מקבלת 2 פרמטרים. הראשון acc ישמש אותנו כמשתנה שסופר את כמות ה-retries. והפרמטר value מקבל לתוכו את הערך שנשלח מה-observable שלנו (errorFromLoadProduct). במקרה שלנו הערך שנשלח הוא xhr.statusText. את שני הערכים הדפסנו לקונסול, ואת הערך של acc החזרנו בתוספת אחד כדי לספור את כמות ה-retries. 
הפרמטר השני שהאופרטור scan מקבל הוא הערך ההתחלתי של acc. במקרה שלנו איפסתי אותו.

האופרטור takeWhile - קובע האם לקרוא ל-complete של ה-observable הנוכחי. היא מקבלת פונקציה שמחזירה true או false. אם היא מחזירה true אז ה-observable הנוכחי ימשיך לרוץ. אם היא מחזירה false אז ייקרא ה-complete של ה-observable הנוכחי. במקרה שלנו יצרנו פונקציה שמקבלת את acc שמחזיק את כמות ה-retries ומחזירה true כל עוד acc קטן ממספר הניסיונות שהגדרנו.

האופרטור delay - פשוט יוצר השהיה של מספר המילי-שניות ששולחים לו.
כדי ליצור תוכנה יותר גנרית, כתבנו את הפונקציה retryPolicy עם שני פרמטרים, הראשון זה מספר הניסיונות (עם default של 5) והשני זה אורך זמן ההשהיה (עם default של 2000) .
כדי להבין את כל הדברים האלו מומלץ מאוד לממש את זה ולשחק עם כל האופרטורים האלו.


הבדלים בין promises ל-observables

כשמתלבטים בין שימוש ב-observables ל-promises צריך להבין את ההבדלים ביניהם:
  1. observable יכול לשלוח כמות אירועים אינסופית. לעומת זאת promise שולח אירוע בודד ומסתיים. לדוגמה, טיפול באירוע של הקלקות עכבר או תזוזת העכבר יכול להיות מאתגר בשימוש ב-promise כיון שנצטרך ליצור כל הזמן promises נוספים. לעומת זאת עם observable זה די פשוט כפי שראינו לעיל.
  2. בשימוש ב-promises אין לנו את כל האופרטורים שה-observable מספק לנו ועוזרים לנו בטיפול באירועים, כמו map, flatMap, retryWhen ועוד.


בחירת האופרטור הנכון

ב-RxJS ישנם המון אופרטורים, וצריך לדעת איך לבחור נכון את האופרטור המתאים ביותר לצורך שלי. באתר הרשמי עשו עבודה מצוינת בכלל בכל התיעוד ובפרט בעניין בחירת האופרטורים. 
לעניין תיעוד האופרטורים, על כל אופרטור יש הסבר שכולל איור שמסביר את פעולת האופרטור.
לדוגמה עבור repeat האיור הוא פשוט:

החלק העליון מייצג אירוע/נתון שהגיע, החלק האמצעי הוא האופרטור המדובר, והחלק התחתון מייצג את התוצאה לאחר פעולת האופרטור. במקרה של repeat קל לראות שהנתון שנכנס לאופרטור יוצא מספר פעמים לפי הגדרת ה-repeat.

ניקח עוד אופרטור שקצת יותר מורכב כדי להסביר את האיורים. 
אופרטור concat משרשר לנו את האירועים/נתונים שנכנסו. והאיור שלו הוא:

כפי שרואים באיור, האופרטור הזה יודע לקחת רצף של אירועים ממקור 1 ולשרשר אליהם רצף אירועים ממקור 2 כאשר האירועים ממקור 2 יתחילו רק לאחר סיום האירועים ממקור 1.בשונה מ-concat יש את אופרטור merge שמאחד אירועים ממקורות שונים לזרם אחד של אירועים לפי סדר הגעתם. והאיור המדגים אותו הוא:

לגבי בחירת האופרטור הנכון לנו, גם פה עשו עבודת תיעוד מצוינת.
דבר ראשון שמאוד עוזר זה חלוקת האופרטורים לפי קטגוריות כדי שנוכל למצוא את האופרטור הנחוץ ביתר קלות.
דבר שני, יצרו עץ החלטות שעוזר לנו להחליט על האופרטור המתאים. זה דומה למצב שבו אתה מסביר לחבר שלך מה בדיוק אתה צריך, ולפי משפט ההסבר שלך אתה מגיע לאופרטור המתאים. 
למשל: אם הצורך שלי נשמע ככה "אני רוצה ליצור observable שמייצר סדרה של פרטים בצורה שחוזרת על עצמה" או באנגלית - 
I want to create a new Observable that emits a sequence of items repeatedly
במקרה הזה העץ מוביל אותנו לשימוש באופרטור repeat.


טיפול ב-errors

כפי שתיארנו בתחילת המאמר, ל-observer יש פונקציית error שנקראת ברגע שנזרק איזשהו error מה-observable. בחלק זה ננסה להבין יותר לעומק את הטיפול הנכון ב-errors.
נתחיל בדוגמה פשוטה:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import {Observable} from "rxjs";

let source = Observable.create(observer => {
 observer.next(1);
 observer.next(2);
 observer.error('something went wrong!!!');
 observer.next(3);
});

source.subscribe(
 value => console.log(`the value is ${value}`)
);


בדוגמה הזו נקראה הפונקציה next עם הערך 1 ולאחר מכן עם הערך 2. ואז נקראה פונקציית ה-error, ולאחר מכן נקראה פונקציית next עם הערך 3. מה שנקבל בקונסול זה את התוצאה הבאה:



כפי שניתן לראות הגיע הערך 1 ולאחריו 2 ואז הקונסול מדווח שהגיע שגיאה שלא נתפסה עם הטקסט "!!!something went wrong". מכיון שלא מימשנו את פונקציית ה-error אז השגיאה מדווחת כשגיאה שלא נתפסה. בנוזף יש לשים לב שהערך 3 לא הגיע בכלל. וזה מכיון ש-error קוטע את המשך הדיווחים מה-observable.

נתקדם הלאה לדוגמה טובה יותר:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import {Observable} from "rxjs";

let source = Observable.create(observer => {
 observer.next(1);
 observer.next(2);
 observer.error('something went wrong!!!');
 observer.next(3);
 observer.complete();
});

source.subscribe(
 value => console.log(`the value is ${value}`),
 e => console.log(`an error arrived: ${e}`),
 () => console.log(`complete is here!`)
);

בדוגמה הזו כבר יש מימוש של פונקציית ה-error. ומה שנראה בקונסול הוא:




פה כבר רואים שמגיע ה-error והוא לא מדווח כשגיאה שלא נתפסה כיון שפונקציית ה-error שמממשנו מטפלת בו. גם במצב הזה, ברגע שמגיע error הוא עוצר את המשך הדיווחים מה-observable.

יש לציין שאם יזרק error שלא מתוך ה-observable לדוגמה בצורה הבא:
throw new Error("special error");
הוא לא ייתפס על ידי פונקציית ה-error של ה-observer שלנו אלא ידווח כשגיאה שלא נתפסה.

נביא דוגמה נוספת תוך שימוש באופרטורים נוספים.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import {Observable} from "rxjs";

let source = Observable.merge(
 Observable.of(1),
 Observable.from([2,3,4]),
 Observable.throw(new Error('new error')),
 Observable.of(5)
);

source.subscribe(
 value => console.log(`the value is ${value}`),
 e => console.log(`an error arrived: ${e}`),
 () => console.log(`complete is here!`)
);

בדוגמה הזו השתמשנו באופרטור merge שמשרשר סדרה של observables. ה-observable הראשון שנוצר ע"י שימוש בפונקציה of שמייצרת observable שמשגר את הערכים שה-of קיבל כ-input.
לאחר מכן יש שימוש בפונקציית from שמייצר observable ממערך של נתונים.
לאחר מכן זורקים error ע"י פונקציית throw. ה-error הזה יטופל ע"י פונקציית ה-error שלנו ולכן בקונסול נקבל את המצב הבא:


לפעמים יש מצב שאנחנו מקבלים error אבל לא רוצים שהוא יעצור את כל שאר הדיווחים מה-observable. יש מספר אפשרויות לגרום לכך. קודם כל ניתן להשתמש בפונקציה onErrorResumeNext.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import {Observable} from "rxjs";

let source = Observable.onErrorResumeNext(
 Observable.of(1),
 Observable.from([2,3,4]),
 Observable.throw(new Error('new error')),
 Observable.of(5)
);

source.subscribe(
 value => console.log(`the value is ${value}`),
 e => console.log(`an error arrived: ${e}`),
 () => console.log(`complete is here!`)
);

במצב הזה, גם כשנזרק error ימשיכו להתקבל שאר הדיווחים מאותו ה-observable.

ישנה אפשרות נוספת ע"י שימוש בפונקציית catch. פונקציית catch תופסת את ה-error ובתוכה ניתן להחזיר בתגובה ל-error איזשהו observable אחר ולאחריו תקרא פונקציית ה-complete אבל לא שאר הדיווחים מאותו מקור. לדוגמה:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import {Observable} from "rxjs";

let source = Observable.merge(
 Observable.of(1),
 Observable.from([2,3,4]),
 Observable.throw(new Error('new error')),
 Observable.of(5)
).catch(e => {
 console.log(`an error caught: ${e}`);
 return Observable.of(88);
});

source.subscribe(
 value => console.log(`the value is ${value}`),
 e => console.log(`an error arrived: ${e}`),
 () => console.log(`complete is here!`)
);

במקרה הזה בקונסול נקבל:

כפי שרואים ברגע שהגיע error מדווח לנו שהוא נתפס ובתגובה קיבלנו את הערך 88. אבל הערך 5 לא הגיע אלא ישר לאחריו נקראה פונקציית ה-complete.


Unsubscribe from observable

במאמר הזה ראינו שיש אפשרות להתחבר ל-observable ע"י הפונקציה subscribe. מה שלא הזכרנו זה את האפשרות להתנתק מ-observable. הפונקציה subscribe מחזירה לנו subscruption שאותו ניתן לשמור במשתנה ולהשתמש בו לאחר מכן לצורך התנתקות (unsubscribe). 

לצורך הדגמה נחזור לדוגמה שהבאנו לעיל:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import {Observable} from "rxjs";

let myButton = document.getElementById("myButton");
let output = document.getElementById("output");

let click = Observable.fromEvent(myButton, "click");

function loadProducts(url: string){
 return Observable.create(observer => {
  let xhr = new XMLHttpRequest();

  xhr.addEventListener("load", () => {
   let data = JSON.parse(xhr.responseText);
   observer.next(data);
   observer.complete();
  });

  xhr.open("GET", url);
  xhr.send();  
 });
}

function renderData(data){
 data.forEach(d => {
  let div = document.createElement("div");
  div.innerText = d.title;
  output.appendChild(div);
 })

}

click.flatMap(e => loadProducts("products.json"))
  .subscribe(
     renderData,
     e => console.log(`error: ${e}`),
     () => console.log("complete")   
);

בקוד שלעיל יצרנו observable בשם click שמאזין ל-event של לחיצה על כפתור "myButton". ברגע שלוחצים על הכפתור הזה נקראת הפונקציה loadProducts שמחזירה גם היא observable. ה-observable הזה שולח בקשת http get ל-products.json ומאזין לחזרה של המידע. ברגע שהמידע מתקבל הוא קורא לפונקציה next של ה-observer שלו עם המידע שהתקבל. פונקציית ה-next של ה-obvserver נקראת renderData והיא מציגה את המידע על המסך.

כדי להדגים את ההתנתקות, נשנה קצת את הקוד:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import {Observable} from "rxjs";

let output = document.getElementById("output");

function loadProducts(url: string){
 return Observable.create(observer => {
  let xhr = new XMLHttpRequest();

  let parseAndContinue = ()=>{
 let data = JSON.parse(xhr.responseText);
    observer.next(data);
    observer.complete();
  }
  xhr.addEventListener("load", parseAndContinue);

  xhr.open("GET", url);
  xhr.send();  

  return () => {
   console.log("unsubscribe done")
   xhr.removeEventListener("load",parseAndContinue);
   xhr.abort();
  }
 });
}

function renderData(data){
 data.forEach(d => {
  let div = document.createElement("div");
  div.innerText = d.title;
  output.appendChild(div);
 })
}

let mySubscription = loadProducts("productss.json")
.subscribe(
     renderData,
     e => console.log(`error: ${e}`),
     () => console.log("complete")   
);

mySubscription.unsubscribe();
console.log(mySubscription);

בקוד הזה אנחנו קוראים לפונקציה loadProducts בשורה 35 בלי תלות בלחיצה על הכפתור. בשורה 42 אנחנו מתנתקים מה-observable ובשורה 43 מדפיסים את ה-subscription.
פונקציית ה-unsubscribe מבצעת את הפונקציה שמוחזרת מפונקציית ה-create של ה-observable (מודגשת בצהוב). 
ולכן מה שנראה בקונסול זה את הדבר הבא:



כפי שניתן לראות קריאת ה-HTTP GET התבטלה בגלל שפונקציית ה-unsubscribe ביצעה את הפקודה:

xhr.abort();

אם תנסו את זה בעצמכם לא תמיד תראו את הקריאה מתבטלת כי לפעמים הקריאה מספיקה להתבצע לפני הביטול. אבל אם תעשו כמה ניסיונות תראו את זה מדיי פעם קורה.

סיכום

במאמר זה למדנו את היסודות של RxJS. ספריית ה-RxJS מספקת המון פונקציות נוספות ואותן אפשר למצוא בתיעוד באתר הרשמי. כמובן שיש עוד הרבה מה להרחיב וללמוד על ספרייה זו אבל לאחר קריאת מאמר זה ניתן להתחיל להיכנס לראש של RxJS שמכוון אותנו לתכנות בצורה מסויימת עם כל היתרונות והחסרונות שבכך. 

בהצלחה!







29 תגובות:

  1. אנונימי23/11/17 14:11

    תודה רבה!
    מאמר חשוב ומוסיף המון הבנה

    השבמחק
    תשובות
    1. תודה על התגובה. אתה הראשון שמגיב לי בבלוג!!! יאללה אני הולך לחגוג.

      מחק
  2. אנונימי1/2/18 09:04

    בכנות, עוד לא קראתי את המאמר ואני עוד אחזור ואקרא אותו (קיוויתי שמישהו יכתוב בעברית על הספרייה הזו), אבל לדעתי ההתחלה, בה אתה מסביר על npm, ts, webpack קצת לא שייכת מסיבה אחת שיש לה 2 כיוונים:
    אם אני באתי ללמוד rxjs, סביר להניח שאני כבר מכיר (לפחות באופן כללי) מה זה npm init, --save וכו'.
    אם אני בכל זאת לא מכיר את הנ"ל - 2 שורות על הנושא לא באמת יסבירו לי מה קורה פה. (וזה גם נראה מפחיד ומאיים, שצריך ללמוד את כל הדברים האלה בשביל להשתמש בספרייה, אבל כידוע זה לא נכון, כל אלה כלים שהורדת כדי שיהיה לך נוח יותר. זה הכל).

    עלה והצלח!

    השבמחק
  3. רפי עבודה מצוינת !! מאמר נפלא! תודה על ההשקעה:)

    השבמחק
    תשובות
    1. בשמחה. אכן זו היתה השקעה לא קטנה

      מחק
  4. תודה על ההסבר המפורט!

    השבמחק
  5. אלווווווווווווווווווווווף!!!!!!

    השבמחק
  6. אנונימי4/4/19 15:47

    תודה רבה! אחלה של הסבר

    השבמחק
  7. אנונימי7/7/19 12:44

    מאמר מצוין!!!
    תודה רבה

    השבמחק
  8. אנונימי1/1/20 11:42

    הבלוג שלך פשוט מצויין. בזמנו הגעתי לרשומה הזו ולא הייתי כל כך זקוק לה, כעת אני כותב יותר באנגולר וממש עזרת לי להבין יותר מה אני עושה.
    על הדרך הגעתי למאמר שלך על רואוטינג באנגולר, והפיסקה על כך שככל שהpath כללי יותר הוא צריך להיות מאוחר יותר, פתחה לי את העיניים. יש לי באג בפרוייקט קודם מלפני כמה חודשים שאף פעם לא הצלחתי להבין למה הוא לא עובד נכון, וכעת אני קולט שזו הבעיה.
    תודה רבה.

    השבמחק
    תשובות
    1. ממש משמח אותי לשמוע שהבלוג עזר לך!
      בזכות תגובות כאלה אני ב"ה ישקיע עוד זמן לכתוב על דברים נוספים כי בזמן האחרון הזנחתי פה את העסק.
      בהצלחה!

      מחק
  9. תגובה זו הוסרה על ידי המחבר.

    השבמחק
  10. אנונימי20/4/20 22:26

    אין על ההסברים
    הרבה זמן חיפשתי הסבר לכל הנושא שנחשפתי אליו וסוף כל סוף מצאתי
    איזה הקלה להבין באמת מה כותבים
    אין מילים....
    מחכים לבלוגים נוספים בכל התחומים :)

    השבמחק
    תשובות
    1. משמח מאוד לשמוע! ב"ה נכתוב על עוד נושאים

      מחק
  11. אם יצא לך אשמח ל RxJava

    השבמחק
  12. אנונימי26/7/20 17:21

    המון תודה עזר לי מאוד להבין את הספרייה הזו
    אתה מסייע מאוד !!

    השבמחק
    תשובות
    1. שמח מאוד שעזרתי אנונימי יקר

      מחק
  13. תודה רפי!
    המאמרים בבלוג מוסברים בצורה מאירת עיניים ומפשיטים את הדברים, נהניתי מאד!

    השבמחק
  14. תודה רפי!
    המאמרים בבלוג מוסברים בצורה מאירת עיניים ומפשיטים את הדברים, נהניתי מאד!

    השבמחק
  15. יישר כח. מצויין שיש הסבר בעברית שאפשר להבין...

    השבמחק
    תשובות
    1. בשמחה אבי. באמת זו ספריה לא פשוטה

      מחק
  16. אנונימי13/3/22 03:01

    תודה על ההסבר!! עזר לי.

    השבמחק
  17. תודה על מאמר מצוין!
    סיכום מעולה ומפשט לנושא מורכב.

    השבמחק