אם הגעת לכאן אני מעריך שאתה יודע על מה מדובר, אבל בכל זאת ניתן הקדמה קצרה.
אם ניכנס לאתר הרשמי של ReactiveX נראה את הסיסמה הבאה:
אז מה בעצם אומר המשפט הזה? ההסבר הוא פשוט, ReactiveX הוא שם כולל ל-API בשביל תכנות אסינכרוני עם observable streams.
אז קודם צריך להבין ש-ReactiveX נוגע לתכנות אסינכרוני. ז"א לקוד שמחכה שמשהו יקרה וכשהוא קורה הקוד צריך להגיב למה שקרה. למשל קוד שמחכה ללחיצה על כפתור מסוים, או לתגובה מהשרת.
סבבה, אבל מה זה בכלל observable streams? פרוש המילה observe זה להתבונן או להבחין. כשיש לנו קוד שצריך להתבונן על זרם של מידע שמגיע אלינו מדי פעם ולהגיב למה שהגיע פה נכנסת האפשרות לשימוש ב-ReactiveX. אם למשל השרת שלי מדיי פעם מעדכן לי רשימה במידע נוסף ואני צריך לפלטר את מה שהגיע או לעשות פעולה אחרת כתלות במידע החדש, פה אני צריך קוד שיאזין לאותו מקור ויגיב בצורה הנכונה. אותו המקור שעליו אנו מתבוננים נקרא observable. וכיון שמדובר במקור מידע שיכול להתעדכן במידע חדש מדיי פעם לכן הוא נקרא observable stream.
אבל ReactiveX זה השם הכולל של ה-API. את אותו API מימשו לשפות שנות:
במאמר זה נתמקד בספריית RxJS שהיא מימוש ה-API לשפת JavaScript.
התקנות
נתחיל מהדברים שאנחנו צריכים להתקין כדי לעבוד עם RxJS.נתחיל מספריה שיש בה רק קובץ index.html שמכיל שלד של דף html בסיסי.
index.html
1 2 3 4 5 6 7 8 9 10 | <!DOCTYPE html>
<html>
<head>
<title>RxJs</title>
</head>
<body>
<div>Hello Israel</div>
</body>
</html>
|
npm init
כדי לאתחל קובץ package.json וניתן איזה שם שנירצה בשדה name כשנתבקש.
לאחר מכן נתקין את RxJS ע"י הפקודה:
npm install rxjs --save
נשתמש ב"save--" כדי שזה יכתב לקובץ package.json. ספריית ה-RxJS תותקן בספריית node_modules.
עכשיו נתקין מספר כלים שיעזרו לנו בפיתוח:
npm install webpack webpack-dev-server typescript typings ts-loader --save-dev
- webpack - כלי שבונה ומאגד את כל הקוד שלנו לקובץ אחד שישלח אל ה-browser
- webpack-dev-server - זה בעצם web server לפיתוח, שאנו נשתמש בו כדי לשלוח קבצים ל-browser דרך http
- typescript - נתקין את הקומפיילר של TypeScript כי אנחנו נכתוב ב-TypeScript
- typings - כלי שיעזור לנו בכתיבת TypeScript
- ts-loader - כלי של webpack שעוזר לו לעבוד עם TypeScript
בסיום הפקודה כתבנו save-dev-- כדי שכל ההתקנות האלו יהיו תקפות רק לפיתוח ולא ל-product.
עכשיו יש לנו צורך בדבר נוסף, אנחנו צריכים קובץ של types definition של TypeScript כדי להשתמש בכל היכולות של RxJS. כיון ש-RxJS נכתב לשימוש ב-ES6 הוא עושה שימוש ביכולות החדשות של ES6. הקומפיילר של TypeScript לא מכיר את ה-APIs והפונקציות החדשות, אבל ניתן להכיר לו אותן אם נכתוב את הפקודה הבאה:
node_modules\.bin\typings install dt~es6-shim --global --save
ה-dt זה ראשי תיבות של definitly types, ולאחר מכן יש את שם החבילה שאנחנו רוצים להתקין es6-shim.
כתבנו global-- כיון שה-type definitions האלו מייצגים הגדרות גלובליות לצורך ה-JS.
בנוסף כתבנו save-- כיון שזה יגרום ל-typings ליצור קובץ typings.json שיכלול את כל ה-typings שאנחנו נתקין.
קינפוג המערכת
אנחנו צריכים לקנפג 2 רכיבים במערכת שלנו. הראשון הוא את הקומפיילר של TypeScript והשני הוא ה-webpack.קינפוג הקומפיילר של TypeScript מתבצע בקובץ tsconfig.json.
קינפוג ה-webpack מתבצע בקובץ webpack.config.js.
tsconfig.json
{ "compilerOptions": { "target": "es5", "module": "commonjs", "sourceMap": true } }
target - מציין שהקומפילציה של TypeScript תהיה עבור הפורמט גרסת es5 של JavaScript.
module - ה- commonjs זה פורמט של module ש-webpack מבין.
sourceMap - עוזר לנו בדיבאג של האפליקציה.
הקובץ webpack.config.js יכיל קוד ש-webpack יריץ בסביבת ה-node והוא נועד לייצא אובייקט קונפיגורציה.
webpack.config.js
module.exports = { entry: "./main", output: { filename: "app.js" }, module: { loaders: [ { test: /.ts$/, loader: "ts-loader" } ] }, resolve: { extensions: [".ts", ".js"] } }
entry - מציין את קובץ ה-bootstrap של האפליקציה. זהו הקובץ שמתחיל את כל האפליקציה. ה-webpack ינתח את הקובץ הזה ואת הקבצים שהוא תלוי בהם וישתמש במידע הזה כדי להפוך את האפליקציה שלי לחבילה אחת.
output - מציין את שם הקובץ שיווצר ע"י webpack. בקובץ ה-index.html נוסיף את הקובץ הזה בשורה שלאחר <div>Hello Israel</div> בצורה הבאה:
<script src="app.js"></script>
module - פה אנחנו כותבים את ה-module loaders שיש לנו. ה-loaders ב-webpack הם כלים שיכולים להשתמש בסוגים שונים של קבצים לעבד אותם ולתת ל-webpack את התוצאה כדי שיוכל לאגד הכל יחד לקובץ js יחיד. כיון שאנו נשתמש רק ב-TypeScript לכן נכתוב בשדה שנקרא test את ה-regex שמתאים לסיומת של קבצי TypeScript. ה-regex יהיה /$ts./. הדבר הזה מגדיר ל-webpack שעל כל קובץ ts הוא צריך להשתמש ב-loader שנקרא "ts-loader" שאותו התקנו בחלק של ההתקנות לעיל. ה-ts-loader הוא בעצם plugin של webpack שיודע איך להשתמש בקומפיילר של TypeScript כדי לעבד את קבצי ה-ts. ולאחר ש-TypeScript סיים להמיר את קובץ ה-ts לקובץ js אז webpack ייקח את הקובץ שנוצר ויכניס אותו לתוך הקובץ app.js.
resolve - בחלק שנקרא extensions נגדיר את סיומות הקבצים ש-webpack צריך להשתמש בהם. יש גרסאות של webpack שדורשות גם גרשיים ריקים ולכן נכתוב:
extensions: ["", ".ts", ".js"]
נעשה דבר נוסף שיקל עלינו את השימוש בכלים הללו. בקובץ package.json נכניס בשדה scripts סקריפט חדש כך שברגע שנכתוב npm start הפקודות שיתבצעו הם:
webpack-dev-server - זה ייתן לנו שרת פשוט שיגיש את כל הקבצים שלנו בפורט 8080
watch --inline-- הדבר הזה יגרום לכך שכל פעם שנשנה קובץ ts ונשמור אותו אז ה-browser יעשה refresh ויקבל את השינויים החדשים.
סקריפט נוסף שנירצה יקרא postinstall. הסקריפט הזה ירוץ אוטומטי כל פעם אחרי שנריץ פקודת npm install. הפקודה שנכתוב היא typings install. הדבר הזה יגרום לכך שכל פעם שמישהו ייקח את הפרויקט הזה ויריץ npm install בהתחלה כדי להתקין את כל הספריות הנצרכות, אוטומטית גם ירוץ לו typings install ויותקן לו ה-es6-shim שהתקנו לעיל.
קובץ ה-packge.json שלנו יראה כך:
package.json
{ "name": "psrsjx5", "version": "1.0.0", "description": "", "scripts": { "start": "webpack-dev-server --watch --inline", "postinstall": "typings install" }, "dependencies": { "rxjs": "^5.2.0" }, "devDependencies": { "ts-loader": "^2.0.3", "typescript": "^2.2.2", "typings": "^2.1.0", "webpack": "^2.3.2", "webpack-dev-server": "^2.4.2" } }
לאחר כל זה נבדוק את הסביבה שלנו ע"י הפקודה npm start.
npm start
נפתח ב-browser חלון בכתובת localhost:8080 ונראה את הסביבה פועלת ואת המילים Hello Israel.
עכשיו הכל מוכן להתחיל לעבוד עם RxJS.
Observable and Observer
ב-RxJS יש שני סוגים יישויות עיקריות, ה-observable וה-observer.ה-observable זה בעצם הדבר שעליו אנחנו מסתכלים ופועל בצורה אסינכרונית.
ה-observer הוא הצופה שמחכה לקבל את הקריאות האסינכרוניות ולטפל בהם.
נדגים את הפעילות שלהם ע"י דוגמה:
import {Observable} from "rxjs"; let dataStream = [11, 22, 33]; let source = Observable.from(dataStream); class myObserver { next (value) { console.log(`the value is ${value}`); } error (e) { console.log(`the error is ${e}`); } complete () { console.log(`the stream is completed`); } }
source.subscribe(new myObserver());
בדוגמה זו ה- observable הוא מערך של מספרים. ה-observer משויך ל-observable ע"י הפונקציה subscribe.
הפונקציה next היא הפונקציה שתיקרא ע"י ה-observer כאשר מגיע איזשהו data.
הפונקציה error תיקרא אם יש איזשהו בעיה ולכן נזרק exception. ה-e הוא אובייקט של error שהגיע עם הקריאה ל-error.
הפונקציה complete תיקרא ע"י ה-observable אם הוא סיים את כל מה שהיה לו לשלוח (לדוגמה במקרה שה-observable רוצה לשלוח מערך שלם והוא סיים לשלוח את כל תוכן המערך). לא כל observable משתמש בפונקציה הזו כי לא תמיד יש סוף למה שנשלח (לדוגמה אם מאזינים להקלקות של עכבר זה משהו שלא אמור לקרוא לפונקציה complete).
צורה מקוצרת לבניית observer:
import {Observable} from "rxjs"; let dataStream = [11, 22, 33]; let source = Observable.from(dataStream); source.subscribe( value => console.log(`value: ${value}`), e => console.log(`error: ${e}`), () => console.log("complete") );
ע"י שימוש בפונקציה subscribe וב-arrow functions ניתן לכתוב את שלושת הפונקציות בצורה מקוצרת.
הפונקציה Observable.create
במקום להשתמש בפונקציה from ניתן להשתמש בפונקציה create שהיא פונקציה בסיסית יותר ועל ידה אפשר לשלוט טוב יותר ב-observer.הפונקציה create מקבלת פונקציה, והפונקציה הזו מקבלת כפרמטר observer object.
import {Observable} from "rxjs"; let numbers = [11, 22, 33]; let source = Observable.create(observer => { for (let n of numbers) { if(n > 20) { observer.error("some error"); } observer.next(n); } observer.complete(); }); source.subscribe( value => console.log(`value: ${value}`), e => console.log(`error: ${e}`), () => console.log("complete") );
בדוגמה הזו רואים שניתן לשלוט על הקריאות ל-observer, מתי נקרא next, מתי נקרא error ומתי נקרא complete.
אם נריץ את הקוד הזה מה שיודפס לקונסול זה:
value: 11error: some error
מפה ניתן להבין שה-observer לא ממשיך הלאה לאחר קריאה ל-error. ולכן לא הודפסו הערכים 22 ו-33, וההודעה של complete. קריאה ל-observer.error דומה לזריקה של exception שבעצם עוצרת את הפעילות על מה שמגיע מה-observable.
הסבר על מספר אופרטורים של observable
האופרטור map משנה כל ערך שמגיע מה-obserable על פי מה שכתבנו ב-map.האופרטור filter מציין איזשהו כלל שעל פיו מחליטים האם אנחנו רוצים את הערך שהגיע או לא.
דוגמה:
import {Observable} from "rxjs"; let numbers = [11, 22, 33]; let source = Observable.create(observer => { let index = 0; let produceValue = () => { observer.next(numbers[index++]); if(index < numbers.length) { setTimeout(produceValue, 1000); } else { observer.complete(); } } produceValue(); }).map(n => n * 2) .filter(n => n > 20); source.subscribe( value => console.log(`value: ${value}`), e => console.log(`error: ${e}`), () => console.log("complete") );
ה-map יגרום לכך שכל ערך שמגיע יוכפל ב-2.
ה-filter יגרום לכך שרק ערכים שגדולים מ-20 יגיעו ל-observer.
אופרטור שימושי ופשוט נוסף הוא האופרטור delay. הוא משהה את התגובה של ה-observer במספר מילי שניות לפי מה שנכתוב לו. לדוגמה, אם נרצה השהיה של חצי שנייה נכתוב:
.delay(500)
מעקב אחרי events ע"י fromEvent
ע"י שימוש ב-fromEvent ניתן ליצור observable לאלמנט של DOM. אנחנו צריכים לספק לפונקציה הזו שני פרמטרים. הראשון הוא האלמנט של ה-DOM שאנחנו רוצים ליצור לו observer. והשני זה ה-event שלו אנו רוצים להאזין.אם לדוגמה נרצה לעקוב אחרי כל תזוזה של העכבר ולהדפיס את המיקום שלו:
import {Observable} from "rxjs"; let source = Observable.fromEvent(document, "mousemove"); source.subscribe( value => console.log(value), e => console.log(`error: ${e}`), () => console.log("complete") );
אם נריץ את זה נראה שבקונסול מודפסים לנו הערכים הבאים: isTrusted, screenX, screenY, clientX, clientY.
על גבי זה ניתן להשתמש באופרטורים האחרים שלמדנו כמו map ו-filter.
לדוגמה, אם נירצה להדפיס רק את המיקום של העכבר בחלון (clientX, clientY) ורק כאשר ערכי ה-X קטנים מ-300, אז נעשה זאת כך:
import {Observable} from "rxjs"; let source = Observable.fromEvent(document, "mousemove") .map((e: MouseEvent) => { return{ x: e.clientX, y: e.clientY } }) .filter(f => f.x < 300); source.subscribe( value => console.log(value), e => console.log(`error: ${e}`), () => console.log("complete") );
עבודה מול שרת
לשימוש ב-RxJS לעבודה מול שרתים יש מספר יתרונות. RxJS מציע הרבה פונקציות שעוזרות לנו לממש בקלות כל מיני אפשרויות מול שרתים. בנוסף, RxJS מבצע אופטימיזציה לעבודה מול שרתים. למשל אם יש observable ששולח פקודת http get לשרת, אבל אף אחד לא נרשם (subscribe) ל-observer שלו אז לא תתבצע שום קריאת http get כדי לחסוך בתעבורת רשת.דוגמה פשוטה לעבודה מול שרת:
נכין קובץ json שיכיל שמות של מספר מוצרים והוא יהיה כביכול המידע שחוזר מהשרת שלנו.
products.json
1 2 3 4 5 6 7 8 9 10 11 | [ { "title": "productName1" }, { "title": "productName2" }, { "title": "productName3" }, ] |
נבנה דף שמכיל כפתור שבלחיצה עליו נשלחת בקשת http get לשרת כדי לקחת את המידע מ-products.json ולהציג אותו על המסך. דף ה-html שלנו יהיה:
index.html
<!DOCTYPE html> <html> <head> <title>RxJs</title> </head> <body> <button id="myButton">Get Products</button> <div id="output"></div> <script src="app.js"></script> </body> </html>
והקוד שלנו ב-main.ts יהיה:
main.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | import {Observable} from "rxjs"; let myButton = document.getElementById("myButton"); let output = document.getElementById("output"); let click = Observable.fromEvent(myButton, "click"); function loadProducts(url: string){ return Observable.create(observer => { let xhr = new XMLHttpRequest(); xhr.addEventListener("load", () => { let data = JSON.parse(xhr.responseText); observer.next(data); observer.complete(); }); xhr.open("GET", url); xhr.send(); }); } function renderData(data){ data.forEach(d => { let div = document.createElement("div"); div.innerText = d.title; output.appendChild(div); }) } click.flatMap(e => loadProducts("products.json")) .subscribe( renderData, e => console.log(`error: ${e}`), () => console.log("complete") ); |
הסבר:
בקוד שלעיל יצרנו observable בשם click שמאזין ל-event של לחיצה על כפתור "myButton". ברגע שלוחצים על הכפתור הזה נקראת הפונקציה loadProducts שמחזירה גם היא observable. ה-observable הזה שולח בקשת http get ל-products.json ומאזין לחזרה של המידע. ברגע שהמידע מתקבל הוא קורא לפונקציה next של ה-observer שלו עם המידע שהתקבל. פונקציית ה-next של ה-obvserver נקראת renderData והיא מציגה את המידע על המסך.
ב-observable שנקרא click עשינו שימוש בפונקציית flatMap. הפונקציה הזו מממשת פונקציית next (שבעצם קוראת ל-loadProducts) וגורמת לכך שה-observable שחוזר מהפונקציה יקרא לפונקציית ה-subscribe שבשורה 33. ניתן לקרוא הסבר על flatMap באתר הרישמי. אם היינו משתמשים בפונקציית map במקום flatMap, אז ה-subscribe שבשורה 33 היה מתייחס ל-observable שנקרא click ולא ל-observable שחוזר מ-loadProducts. הפונקציה flatMap היא פונקציה חשובה ושימושית ולכן כדאי להבין אותה לעומק.
Retries
במקרה שהשרת מחזיר לנו הודעת שגיאה נשתמש בפונקציית ה-error של ה-observer ונשלח אליה את סטטוס השגיאה שהשרת החזיר. בנוסף ננסה לשלוח לשרת בקשת GET עוד כמה פעמים למקרה שהייתה בעיה רגעית.הקוד הבא מתבסס על הקוד שהובא לעיל ומדגים את הטיפול בשגיאה מהשרת (הטקסט המודגש מראה את השינויים העיקריים):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | import {Observable} from "rxjs"; let myButton = document.getElementById("myButton"); let output = document.getElementById("output"); let click = Observable.fromEvent(myButton, "click"); function loadProducts(url: string){ return Observable.create(observer => { let xhr = new XMLHttpRequest(); xhr.addEventListener("load", () => { if(xhr.status === 200){ let data = JSON.parse(xhr.responseText); observer.next(data); observer.complete(); } else{ observer.error(xhr.statusText); } }); xhr.open("GET", url); xhr.send(); }).retry(3); } function renderData(data){ data.forEach(d => { let div = document.createElement("div"); div.innerText = d.title; output.appendChild(div); }) } click.flatMap(e => loadProducts("someNotExistFile.json")) .subscribe( renderData, e => console.log(`the error is: ${e}`), () => console.log("complete") ); |
ביקשנו מהשרת קובץ שלא קיים ולכן השרת החזיר לנו שגיאה שהעמוד לא נמצא (שגיאת 404). את סטטוס השגיאה העברנו לפונקציית ה-error של ה-observer שמדפיסה לקונסול את סטטוס השגיאה. בנוסף עשינו 3 ניסיונות חוזרים לבקש מהשרת את אותה הבקשה ע"י שימוש בפונקציית retry של RxJS.
בעולם האמיתי בדרך כלל נרצה לבצע ניסיונות חוזרים אחרי השהיה מסוימת. כדי לבצע את זה נשתמש בפונקציה retryWhen. הפונקציה הזו מורכבת ולכן נדגים את השימוש בה ולאחר מכן נסביר אותה:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | import {Observable} from "rxjs"; let myButton = document.getElementById("myButton"); let output = document.getElementById("output"); let click = Observable.fromEvent(myButton, "click"); function loadProducts(url: string){ return Observable.create(observer => { let xhr = new XMLHttpRequest(); xhr.addEventListener("load", () => { if(xhr.status === 200){ let data = JSON.parse(xhr.responseText); observer.next(data); observer.complete(); } else{ observer.error(xhr.statusText); } }); xhr.open("GET", url); xhr.send(); }).retryWhen(retryPolicy({attempts: 3, delay: 1000})); } function retryPolicy({attempts = 5, delay = 2000}){ return function(errorFromLoadProduct){ return errorFromLoadProduct .scan((acc, value)=>{ console.log(acc, value); return acc+1; },0) .takeWhile(acc => acc < attempts) .delay (delay); } } function renderData(data){ data.forEach(d => { let div = document.createElement("div"); div.innerText = d.title; output.appendChild(div); }) } click.flatMap(e => loadProducts("someNotExistFile.json")) .subscribe( renderData, e => console.log(`the error is: ${e}`), () => console.log("complete") ); |
הפונקציה retryWhen מקבלת כפרמטר פונקציה שמקבלת observable ומחזירה observable.
ה-observable שהפונקציה הזו מקבלת הוא ה-observable שמכיל את ה-error שהתקבל מה-observable האבא. ה-input הזה מתבצע אוטומטית ע"י RxJS אנחנו רק צריכים לתת ל-observable שמתקבל שם. במקרה שלנו נקרא לו errorFromLoadProduct. ולכן בכל פעם ששורה 19 תיקרא, ישלח ה-xhr.statusText ל-errorFromLoadProduct. ולכן תהיה לנו גישה רק לנתוני ה-error ולא לנתוני ה-success.
על פי ה-observable שמוחזר מהפונקציה שמחזירה retryPolicy יוחלט האם לעשות ניסיון נוסף (retry) או לא. ולכן אם ה-observable המוחזר יחזיר איזשהו ערך אז ה-retryPolicy יבצע ניסיון נוסף של הפעולה הקודמת. אבל אם ה-observable המוחזר ייצר error או קריאת complete אז ה-retryPolicy לא יבצע ניסיון נוסף.
ניסיתי להסביר את זה בצורה הכי מובנת שיכולתי אבל אני יודע שזה עדיין מסובך, ולכן אני ממליץ לקרוא את הסבר הזה כמה פעמים תוך כדי התבוננות בקוד ולנסות גם לכתוב את זה בעצמכם, ואז ההסבר יישב לכם טוב טוב בראש.
כדי שנקבל את מה שרצינו היה צורך להשתמש בעוד כמה אופרטורים שיופעלו על ה-observable שנקרא errorFromLoadProduct:
הפרמטר השני שהאופרטור scan מקבל הוא הערך ההתחלתי של acc. במקרה שלנו איפסתי אותו.
האופרטור takeWhile - קובע האם לקרוא ל-complete של ה-observable הנוכחי. היא מקבלת פונקציה שמחזירה true או false. אם היא מחזירה true אז ה-observable הנוכחי ימשיך לרוץ. אם היא מחזירה false אז ייקרא ה-complete של ה-observable הנוכחי. במקרה שלנו יצרנו פונקציה שמקבלת את acc שמחזיק את כמות ה-retries ומחזירה true כל עוד acc קטן ממספר הניסיונות שהגדרנו.
האופרטור delay - פשוט יוצר השהיה של מספר המילי-שניות ששולחים לו.
כדי ליצור תוכנה יותר גנרית, כתבנו את הפונקציה retryPolicy עם שני פרמטרים, הראשון זה מספר הניסיונות (עם default של 5) והשני זה אורך זמן ההשהיה (עם default של 2000) .
כדי להבין את כל הדברים האלו מומלץ מאוד לממש את זה ולשחק עם כל האופרטורים האלו.
הבדלים בין promises ל-observables
כשמתלבטים בין שימוש ב-observables ל-promises צריך להבין את ההבדלים ביניהם:- observable יכול לשלוח כמות אירועים אינסופית. לעומת זאת promise שולח אירוע בודד ומסתיים. לדוגמה, טיפול באירוע של הקלקות עכבר או תזוזת העכבר יכול להיות מאתגר בשימוש ב-promise כיון שנצטרך ליצור כל הזמן promises נוספים. לעומת זאת עם observable זה די פשוט כפי שראינו לעיל.
- בשימוש ב-promises אין לנו את כל האופרטורים שה-observable מספק לנו ועוזרים לנו בטיפול באירועים, כמו map, flatMap, retryWhen ועוד.
בחירת האופרטור הנכון
ב-RxJS ישנם המון אופרטורים, וצריך לדעת איך לבחור נכון את האופרטור המתאים ביותר לצורך שלי. באתר הרשמי עשו עבודה מצוינת בכלל בכל התיעוד ובפרט בעניין בחירת האופרטורים.לעניין תיעוד האופרטורים, על כל אופרטור יש הסבר שכולל איור שמסביר את פעולת האופרטור.
לדוגמה עבור repeat האיור הוא פשוט:
החלק העליון מייצג אירוע/נתון שהגיע, החלק האמצעי הוא האופרטור המדובר, והחלק התחתון מייצג את התוצאה לאחר פעולת האופרטור. במקרה של repeat קל לראות שהנתון שנכנס לאופרטור יוצא מספר פעמים לפי הגדרת ה-repeat.
ניקח עוד אופרטור שקצת יותר מורכב כדי להסביר את האיורים.
אופרטור concat משרשר לנו את האירועים/נתונים שנכנסו. והאיור שלו הוא:
כפי שרואים באיור, האופרטור הזה יודע לקחת רצף של אירועים ממקור 1 ולשרשר אליהם רצף אירועים ממקור 2 כאשר האירועים ממקור 2 יתחילו רק לאחר סיום האירועים ממקור 1.בשונה מ-concat יש את אופרטור merge שמאחד אירועים ממקורות שונים לזרם אחד של אירועים לפי סדר הגעתם. והאיור המדגים אותו הוא:
לגבי בחירת האופרטור הנכון לנו, גם פה עשו עבודת תיעוד מצוינת.דבר שני, יצרו עץ החלטות שעוזר לנו להחליט על האופרטור המתאים. זה דומה למצב שבו אתה מסביר לחבר שלך מה בדיוק אתה צריך, ולפי משפט ההסבר שלך אתה מגיע לאופרטור המתאים.
למשל: אם הצורך שלי נשמע ככה "אני רוצה ליצור observable שמייצר סדרה של פרטים בצורה שחוזרת על עצמה" או באנגלית -
I want to create a new Observable that emits a sequence of items repeatedly
במקרה הזה העץ מוביל אותנו לשימוש באופרטור repeat.
נתחיל בדוגמה פשוטה:
כפי שניתן לראות הגיע הערך 1 ולאחריו 2 ואז הקונסול מדווח שהגיע שגיאה שלא נתפסה עם הטקסט "!!!something went wrong". מכיון שלא מימשנו את פונקציית ה-error אז השגיאה מדווחת כשגיאה שלא נתפסה. בנוזף יש לשים לב שהערך 3 לא הגיע בכלל. וזה מכיון ש-error קוטע את המשך הדיווחים מה-observable.
נתקדם הלאה לדוגמה טובה יותר:
בדוגמה הזו כבר יש מימוש של פונקציית ה-error. ומה שנראה בקונסול הוא:
פה כבר רואים שמגיע ה-error והוא לא מדווח כשגיאה שלא נתפסה כיון שפונקציית ה-error שמממשנו מטפלת בו. גם במצב הזה, ברגע שמגיע error הוא עוצר את המשך הדיווחים מה-observable.
ישנה אפשרות נוספת ע"י שימוש בפונקציית catch. פונקציית catch תופסת את ה-error ובתוכה ניתן להחזיר בתגובה ל-error איזשהו observable אחר ולאחריו תקרא פונקציית ה-complete אבל לא שאר הדיווחים מאותו מקור. לדוגמה:
במקרה הזה בקונסול נקבל:
כפי שרואים ברגע שהגיע error מדווח לנו שהוא נתפס ובתגובה קיבלנו את הערך 88. אבל הערך 5 לא הגיע אלא ישר לאחריו נקראה פונקציית ה-complete.
לצורך הדגמה נחזור לדוגמה שהבאנו לעיל:
בקוד שלעיל יצרנו observable בשם click שמאזין ל-event של לחיצה על כפתור "myButton". ברגע שלוחצים על הכפתור הזה נקראת הפונקציה loadProducts שמחזירה גם היא observable. ה-observable הזה שולח בקשת http get ל-products.json ומאזין לחזרה של המידע. ברגע שהמידע מתקבל הוא קורא לפונקציה next של ה-observer שלו עם המידע שהתקבל. פונקציית ה-next של ה-obvserver נקראת renderData והיא מציגה את המידע על המסך.
כדי להדגים את ההתנתקות, נשנה קצת את הקוד:
בקוד הזה אנחנו קוראים לפונקציה loadProducts בשורה 35 בלי תלות בלחיצה על הכפתור. בשורה 42 אנחנו מתנתקים מה-observable ובשורה 43 מדפיסים את ה-subscription.
פונקציית ה-unsubscribe מבצעת את הפונקציה שמוחזרת מפונקציית ה-create של ה-observable (מודגשת בצהוב).
ולכן מה שנראה בקונסול זה את הדבר הבא:
כפי שניתן לראות קריאת ה-HTTP GET התבטלה בגלל שפונקציית ה-unsubscribe ביצעה את הפקודה:
אם תנסו את זה בעצמכם לא תמיד תראו את הקריאה מתבטלת כי לפעמים הקריאה מספיקה להתבצע לפני הביטול. אבל אם תעשו כמה ניסיונות תראו את זה מדיי פעם קורה.
בהצלחה!
טיפול ב-errors
כפי שתיארנו בתחילת המאמר, ל-observer יש פונקציית error שנקראת ברגע שנזרק איזשהו error מה-observable. בחלק זה ננסה להבין יותר לעומק את הטיפול הנכון ב-errors.נתחיל בדוגמה פשוטה:
1 2 3 4 5 6 7 8 9 10 11 12 | import {Observable} from "rxjs"; let source = Observable.create(observer => { observer.next(1); observer.next(2); observer.error('something went wrong!!!'); observer.next(3); }); source.subscribe( value => console.log(`the value is ${value}`) ); |
בדוגמה הזו נקראה הפונקציה next עם הערך 1 ולאחר מכן עם הערך 2. ואז נקראה פונקציית ה-error, ולאחר מכן נקראה פונקציית next עם הערך 3. מה שנקבל בקונסול זה את התוצאה הבאה:
כפי שניתן לראות הגיע הערך 1 ולאחריו 2 ואז הקונסול מדווח שהגיע שגיאה שלא נתפסה עם הטקסט "!!!something went wrong". מכיון שלא מימשנו את פונקציית ה-error אז השגיאה מדווחת כשגיאה שלא נתפסה. בנוזף יש לשים לב שהערך 3 לא הגיע בכלל. וזה מכיון ש-error קוטע את המשך הדיווחים מה-observable.
נתקדם הלאה לדוגמה טובה יותר:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | import {Observable} from "rxjs"; let source = Observable.create(observer => { observer.next(1); observer.next(2); observer.error('something went wrong!!!'); observer.next(3); observer.complete(); }); source.subscribe( value => console.log(`the value is ${value}`), e => console.log(`an error arrived: ${e}`), () => console.log(`complete is here!`) ); |
בדוגמה הזו כבר יש מימוש של פונקציית ה-error. ומה שנראה בקונסול הוא:
פה כבר רואים שמגיע ה-error והוא לא מדווח כשגיאה שלא נתפסה כיון שפונקציית ה-error שמממשנו מטפלת בו. גם במצב הזה, ברגע שמגיע error הוא עוצר את המשך הדיווחים מה-observable.
יש לציין שאם יזרק error שלא מתוך ה-observable לדוגמה בצורה הבא:
throw new Error("special error");
הוא לא ייתפס על ידי פונקציית ה-error של ה-observer שלנו אלא ידווח כשגיאה שלא נתפסה.
נביא דוגמה נוספת תוך שימוש באופרטורים נוספים.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import {Observable} from "rxjs"; let source = Observable.merge( Observable.of(1), Observable.from([2,3,4]), Observable.throw(new Error('new error')), Observable.of(5) ); source.subscribe( value => console.log(`the value is ${value}`), e => console.log(`an error arrived: ${e}`), () => console.log(`complete is here!`) ); |
בדוגמה הזו השתמשנו באופרטור merge שמשרשר סדרה של observables. ה-observable הראשון שנוצר ע"י שימוש בפונקציה of שמייצרת observable שמשגר את הערכים שה-of קיבל כ-input.
לאחר מכן יש שימוש בפונקציית from שמייצר observable ממערך של נתונים.
לאחר מכן זורקים error ע"י פונקציית throw. ה-error הזה יטופל ע"י פונקציית ה-error שלנו ולכן בקונסול נקבל את המצב הבא:
לאחר מכן יש שימוש בפונקציית from שמייצר observable ממערך של נתונים.
לאחר מכן זורקים error ע"י פונקציית throw. ה-error הזה יטופל ע"י פונקציית ה-error שלנו ולכן בקונסול נקבל את המצב הבא:
לפעמים יש מצב שאנחנו מקבלים error אבל לא רוצים שהוא יעצור את כל שאר הדיווחים מה-observable. יש מספר אפשרויות לגרום לכך. קודם כל ניתן להשתמש בפונקציה onErrorResumeNext.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import {Observable} from "rxjs"; let source = Observable.onErrorResumeNext( Observable.of(1), Observable.from([2,3,4]), Observable.throw(new Error('new error')), Observable.of(5) ); source.subscribe( value => console.log(`the value is ${value}`), e => console.log(`an error arrived: ${e}`), () => console.log(`complete is here!`) ); |
במצב הזה, גם כשנזרק error ימשיכו להתקבל שאר הדיווחים מאותו ה-observable.
ישנה אפשרות נוספת ע"י שימוש בפונקציית catch. פונקציית catch תופסת את ה-error ובתוכה ניתן להחזיר בתגובה ל-error איזשהו observable אחר ולאחריו תקרא פונקציית ה-complete אבל לא שאר הדיווחים מאותו מקור. לדוגמה:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | import {Observable} from "rxjs"; let source = Observable.merge( Observable.of(1), Observable.from([2,3,4]), Observable.throw(new Error('new error')), Observable.of(5) ).catch(e => { console.log(`an error caught: ${e}`); return Observable.of(88); }); source.subscribe( value => console.log(`the value is ${value}`), e => console.log(`an error arrived: ${e}`), () => console.log(`complete is here!`) ); |
כפי שרואים ברגע שהגיע error מדווח לנו שהוא נתפס ובתגובה קיבלנו את הערך 88. אבל הערך 5 לא הגיע אלא ישר לאחריו נקראה פונקציית ה-complete.
Unsubscribe from observable
במאמר הזה ראינו שיש אפשרות להתחבר ל-observable ע"י הפונקציה subscribe. מה שלא הזכרנו זה את האפשרות להתנתק מ-observable. הפונקציה subscribe מחזירה לנו subscruption שאותו ניתן לשמור במשתנה ולהשתמש בו לאחר מכן לצורך התנתקות (unsubscribe).לצורך הדגמה נחזור לדוגמה שהבאנו לעיל:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | import {Observable} from "rxjs"; let myButton = document.getElementById("myButton"); let output = document.getElementById("output"); let click = Observable.fromEvent(myButton, "click"); function loadProducts(url: string){ return Observable.create(observer => { let xhr = new XMLHttpRequest(); xhr.addEventListener("load", () => { let data = JSON.parse(xhr.responseText); observer.next(data); observer.complete(); }); xhr.open("GET", url); xhr.send(); }); } function renderData(data){ data.forEach(d => { let div = document.createElement("div"); div.innerText = d.title; output.appendChild(div); }) } click.flatMap(e => loadProducts("products.json")) .subscribe( renderData, e => console.log(`error: ${e}`), () => console.log("complete") ); |
בקוד שלעיל יצרנו observable בשם click שמאזין ל-event של לחיצה על כפתור "myButton". ברגע שלוחצים על הכפתור הזה נקראת הפונקציה loadProducts שמחזירה גם היא observable. ה-observable הזה שולח בקשת http get ל-products.json ומאזין לחזרה של המידע. ברגע שהמידע מתקבל הוא קורא לפונקציה next של ה-observer שלו עם המידע שהתקבל. פונקציית ה-next של ה-obvserver נקראת renderData והיא מציגה את המידע על המסך.
כדי להדגים את ההתנתקות, נשנה קצת את הקוד:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | import {Observable} from "rxjs"; let output = document.getElementById("output"); function loadProducts(url: string){ return Observable.create(observer => { let xhr = new XMLHttpRequest(); let parseAndContinue = ()=>{ let data = JSON.parse(xhr.responseText); observer.next(data); observer.complete(); } xhr.addEventListener("load", parseAndContinue); xhr.open("GET", url); xhr.send(); return () => { console.log("unsubscribe done") xhr.removeEventListener("load",parseAndContinue); xhr.abort(); } }); } function renderData(data){ data.forEach(d => { let div = document.createElement("div"); div.innerText = d.title; output.appendChild(div); }) } let mySubscription = loadProducts("productss.json") .subscribe( renderData, e => console.log(`error: ${e}`), () => console.log("complete") ); mySubscription.unsubscribe(); console.log(mySubscription); |
פונקציית ה-unsubscribe מבצעת את הפונקציה שמוחזרת מפונקציית ה-create של ה-observable (מודגשת בצהוב).
ולכן מה שנראה בקונסול זה את הדבר הבא:
כפי שניתן לראות קריאת ה-HTTP GET התבטלה בגלל שפונקציית ה-unsubscribe ביצעה את הפקודה:
xhr.abort();
סיכום
במאמר זה למדנו את היסודות של RxJS. ספריית ה-RxJS מספקת המון פונקציות נוספות ואותן אפשר למצוא בתיעוד באתר הרשמי. כמובן שיש עוד הרבה מה להרחיב וללמוד על ספרייה זו אבל לאחר קריאת מאמר זה ניתן להתחיל להיכנס לראש של RxJS שמכוון אותנו לתכנות בצורה מסויימת עם כל היתרונות והחסרונות שבכך.בהצלחה!
תודה רבה!
השבמחקמאמר חשוב ומוסיף המון הבנה
תודה על התגובה. אתה הראשון שמגיב לי בבלוג!!! יאללה אני הולך לחגוג.
מחקבכנות, עוד לא קראתי את המאמר ואני עוד אחזור ואקרא אותו (קיוויתי שמישהו יכתוב בעברית על הספרייה הזו), אבל לדעתי ההתחלה, בה אתה מסביר על npm, ts, webpack קצת לא שייכת מסיבה אחת שיש לה 2 כיוונים:
השבמחקאם אני באתי ללמוד rxjs, סביר להניח שאני כבר מכיר (לפחות באופן כללי) מה זה npm init, --save וכו'.
אם אני בכל זאת לא מכיר את הנ"ל - 2 שורות על הנושא לא באמת יסבירו לי מה קורה פה. (וזה גם נראה מפחיד ומאיים, שצריך ללמוד את כל הדברים האלה בשביל להשתמש בספרייה, אבל כידוע זה לא נכון, כל אלה כלים שהורדת כדי שיהיה לך נוח יותר. זה הכל).
עלה והצלח!
רפי עבודה מצוינת !! מאמר נפלא! תודה על ההשקעה:)
השבמחקבשמחה. אכן זו היתה השקעה לא קטנה
מחקתודה על ההסבר המפורט!
השבמחקבשמחה
מחקאלווווווווווווווווווווווף!!!!!!
השבמחקתווווווווווווווווודה
מחקתודה רבה! אחלה של הסבר
השבמחקשמח לשמוע שההסבר טוב
מחקמאמר מצוין!!!
השבמחקתודה רבה
בשמחה
מחקהבלוג שלך פשוט מצויין. בזמנו הגעתי לרשומה הזו ולא הייתי כל כך זקוק לה, כעת אני כותב יותר באנגולר וממש עזרת לי להבין יותר מה אני עושה.
השבמחקעל הדרך הגעתי למאמר שלך על רואוטינג באנגולר, והפיסקה על כך שככל שהpath כללי יותר הוא צריך להיות מאוחר יותר, פתחה לי את העיניים. יש לי באג בפרוייקט קודם מלפני כמה חודשים שאף פעם לא הצלחתי להבין למה הוא לא עובד נכון, וכעת אני קולט שזו הבעיה.
תודה רבה.
ממש משמח אותי לשמוע שהבלוג עזר לך!
מחקבזכות תגובות כאלה אני ב"ה ישקיע עוד זמן לכתוב על דברים נוספים כי בזמן האחרון הזנחתי פה את העסק.
בהצלחה!
תגובה זו הוסרה על ידי המחבר.
השבמחקאין על ההסברים
השבמחקהרבה זמן חיפשתי הסבר לכל הנושא שנחשפתי אליו וסוף כל סוף מצאתי
איזה הקלה להבין באמת מה כותבים
אין מילים....
מחכים לבלוגים נוספים בכל התחומים :)
משמח מאוד לשמוע! ב"ה נכתוב על עוד נושאים
מחקאם יצא לך אשמח ל RxJava
השבמחקהמון תודה עזר לי מאוד להבין את הספרייה הזו
השבמחקאתה מסייע מאוד !!
שמח מאוד שעזרתי אנונימי יקר
מחקתודה רפי!
השבמחקהמאמרים בבלוג מוסברים בצורה מאירת עיניים ומפשיטים את הדברים, נהניתי מאד!
שמח מאוד לשמוע :-)
מחקתודה רפי!
השבמחקהמאמרים בבלוג מוסברים בצורה מאירת עיניים ומפשיטים את הדברים, נהניתי מאד!
תודה על התגובה. כיף לשמוע
מחקיישר כח. מצויין שיש הסבר בעברית שאפשר להבין...
השבמחקבשמחה אבי. באמת זו ספריה לא פשוטה
מחקתודה על ההסבר!! עזר לי.
השבמחקתודה על מאמר מצוין!
השבמחקסיכום מעולה ומפשט לנושא מורכב.