一 什么是rxjs?
RxJS(Reactive Extensions for JavaScript)是一個用于響應式編程的 JavaScript 庫。它通過使用可觀察對象(Observables)和操作符(Operators)來處理異步和事件驅動的代碼。
什么是響應式編程? 程序的輸入可以被當成一個數據流,例如 excel表格的累加。
響應式編程世界里知名度最高的框架 - Reactive Extension , 簡稱 RX,指實踐響應式編程的一套工具。在Rx官網有這樣一段文字
An API for asynchronous programming with observable streams.
[圖片]
Rx的概念最初由微軟公司實現并開源,也就是Rx.NET,因為Rx帶來的編程方式大大改進了異步編程模型,在.NET之后,眾多開發者在其他平臺和語言上也實現了Rx的類庫。可見,Rx其實是一個大家族,在這個大家族中,還有用Java實現的RxJava,用C++實現的RxCpp,用Ruby實現的Rx.rb,用Python實現的RxPy,當然,還有這個大家族中最年長的Rx.NET。在本書中,我們介紹的是RxJS,也就是Rx的JavaScript語言實現。
關于設計模式:
任何一種模式,指的都是解決某一個特定類型問題的套路和方法。現實世界的問題復雜多變,往往不是靠單獨一種模式能夠解決的,更需要的是多種模式的組合,RxJS的Observable就是觀察者模式和迭代器模式的組合。
觀察者模式 要解決的問題,就是在一個持續產生事件的系統中,如何分割功能,讓不同模塊只需要處理一部分邏輯,這種分而治之的思想是基本的系統設計概念,當然,“分”很容易,關鍵是如何“治”。觀察者模式對“治”這個問題提的解決方法是這樣,將邏輯分為發布者(Publisher)和觀察者(Observer),其中發布者只管負責產生事件,它會通知所有注冊掛上號的觀察者,而不關心這些觀察者如何處理這些事件,相對的,觀察者可以被注冊上某個發布者,只管接收到事件之后就處理,而不關心這些數據是如何產生的。
迭代器模式(Iterator,也稱為“迭代器”)指的是能夠遍歷一個數據集合的對象,因為數據集合的實現方式很多,可以是一個數組,也可以是一個樹形結構,也可以是一個單向鏈表……迭代器的作用就是提供一個通用的接口,讓使用者完全不用關心這個數據集合的具體實現方式。迭代器另一個容易理解的名字叫游標(cursor),就像是一個移動的指針一樣,從集合中一個元素移到另一個元素,完成對整個集合的遍歷。
- Rxjs 特點
- 使用數據流對現實問題進行抽象
- 擅長處理異步操作
- 把復雜問題抽象成簡單問題的組合
- Rxjs 的基本概念
- 數據源 - 可觀察對象 Observable
import { Observable } from 'rxjs';
//代表“流”的變量標示符,都是用$符號結尾,這是RxJS編程中普遍使用的風格,被稱為“芬蘭式命名法”(Finnish Notation)
const source$ = new Observable((observer)=>{
observer.next(1);
observer.next(2);
observer.complete();
// 異常處理
observer.error('error');
}); - 操作符
通過 pipe - 管道,提供的各種操作符來實現各種數據流的轉化和操作。
包括:創建型/轉化型/過濾型/合并型/錯誤處理型
import { ajax } from 'rxjs/ajax';
import { retry } from 'rxjs/operators';
ajax('https://example.com/api/data').pipe(
// 最多重新訂閱 3 次,如果仍然失敗,則輸出錯誤信息
retry(3)
).subscribe(
response => console.log(response),
error => console.log('Request failed after multiple attempts:', error)
);
- 觀察者 - 訂閱可觀察對象
import { Subject } from 'rxjs';
const source$ = new Subject();
// 訂閱
const sub = source$.subscribe({
next:(data)=>{}
complete:()=>{}
error:(err)=>{}
});
// 取消訂閱
sub.unsubscribe();
3. Rxjs 中的狀態
- 未開始:Observable 尚未開始發出任何值。這是 Observable 的初始狀態。
- 進行中:Observable 正在發出值,可以是零個或多個。在這種狀態下,觀察者可以訂閱并處理這些發出的值。
- 完成:Observable 成功地發出了所有值,并已經終止。在這種狀態下,觀察者可以做相應的清理工作。
- 錯誤:Observable 在發出值的過程中遇到錯誤,終止并發送錯誤通知。觀察者可以處理這個錯誤通知。
注意:rxjs 只有一種終結狀態,要么完成,要么出錯。當 Observable 處于已經完成或錯誤狀態時,是不再允許通過 next 方法發送新的數據的。一旦 Observable 進入完成或錯誤狀態,它就不會再發出任何新的值。如果需要在完成或錯誤狀態后再次發送新的數據,可以使用其他操作符或技術來處理,例如創建一個新的 Observable,或者使用錯誤處理操作符捕獲并轉換錯誤。 - RxJS 與傳統 JavaScript 異步編程之間的對比:
- 響應式編程:RxJS 提供了一種響應式編程的范式,其中數據流被建模為可觀察對象,能夠在時間上發出值。你可以訂閱這些可觀察對象,以便在內部值發生變化時執行相應的操作。 傳統 JavaScript 異步編程: 傳統的 JavaScript 異步編程使用回調函數,事件處理程序或 Promise 來處理異步操作。
- 異步控制:RxJS 提供了一套強大的操作符,用于控制異步代碼的執行順序和并發性。你可以使用操作符如 mergeMap、switchMap 和 concatMap 來處理各種異步操作,并管理它們的執行順序和結果。 傳統 JavaScript 異步編程: 在傳統的 JavaScript 異步編程中,你可能會使用嵌套的回調函數或 Promise 鏈來控制異步操作的順序。
- 組合和轉換操作:RxJS 提供了許多操作符,用于組合和轉換可觀察對象。例如,map 操作符可以將一個可觀察對象的每個值映射到一個新值,filter 操作符可以過濾出符合特定條件的值。這些操作符使得處理數據流變得更加方便和靈活。 傳統 JavaScript 異步編程: 在傳統的 JavaScript 異步編程中,操作數據流通常需要手動編寫循環和條件語句。
- 錯誤處理:RxJS 提供了豐富的錯誤處理機制,例如通過 catchError 操作符捕獲錯誤并返回備用值,或者使用 retry 操作符重新訂閱可觀察對象以重試操作。 傳統 JavaScript 異步編程: 在傳統的 JavaScript 異步編程中,錯誤處理通常使用 try-catch 塊或者 Promise 的 catch 方法進行處理。
總的來說,RxJS 提供了一個強大而靈活的工具集,用于處理異步和事件驅動的代碼。它可以幫助開發者將復雜的異步操作轉化為簡潔、可讀性強的代碼,并能有效地處理錯誤、控制執行順序和進行數據轉換。 - Rxjs 與 Promise 對比 : 加強版的Promise
Promise 的寫法
// Promise 的三種狀態,pending, fulfilled, rejected
// Promise 定義出來 狀態變為pending
const p = new Promise((resolve,reject)=>{
resolve(); // 狀態變為fulfilled
reject(); // 狀態變為 rejected
});
p.then(()=>{}).catch().finally();
// 對比Observable
const s$ = new Observable((observer)=>{
// 對比promise ,observable 可以發送多個值,而Promise 只能發送一個值就會進行完結狀態
// 這兒有點類似 ajax 和 websocket 的味道了
observer.next();
observer.next();
observer.compolete();
observer.error();
});
s$.pipe().subscribe({
next:(v)=>{// 接受數據},
complete:()=>{ // 完成狀態后的回調},
error:(err)=>{//獲取 observer.error() 發送過來的值}
});
// 使用完成后需要取消訂閱
s$.unsubscsribe();
RxJS 和 Promise 都是處理異步編程的強大工具,但它們在一些方面有所不同。下面是 RxJS 相對于 Promise 的一些優點:
- 組合性和靈活性:RxJS 提供了豐富的操作符,允許你以各種方式組合和轉換可觀察對象,使其更加靈活。你可以使用 map、filter、mergeMap 等操作符來處理數據流,而不僅僅是處理單個值。這樣可以使你的代碼更加簡潔、可讀性更強,并且更容易進行復雜的異步操作。
- 錯誤處理:RxJS 提供了一套完善的錯誤處理機制,你可以使用 catchError 操作符捕獲錯誤并返回備用值,或使用 retry 操作符進行錯誤重試。這使得在處理異步操作時更加容易處理和管理錯誤情況。
- 取消和中斷:RxJS 提供了 unsubscribe 方法,可以取消訂閱可觀察對象并中斷正在進行的異步操作。這對于在某些條件滿足時取消異步操作非常有用,而 Promise 并沒有直接提供這樣的機制。
- 動態數據流:RxJS 的可觀察對象是動態的,意味著你可以在需要時動態地添加、移除或改變流中的值。這使得操作流程非常靈活,可以根據實際需要進行動態調整。
需要注意的是,Promise 也有其優勢。Promise 的語法相對簡單,容易上手,并且被廣泛支持和采用。它在處理單個值的異步操作時非常方便。如果你的需求只是處理一次性的異步操作,那么 Promise 可能已經足夠滿足你的需求。
綜上所述,RxJS 相對于 Promise 在處理復雜的、多個值的異步操作以及靈活性方面具有一定的優勢。它提供了更多的工具和操作符,使得處理和組合異步操作更加方便和可讀。然而,選擇使用 RxJS 還是 Promise 取決于你的具體需求和個人偏好。
二 為什么要使用rxjs ? - 優秀的異步數據流處理 (異步具有時效性)
- 豐富的operator 來操控數據流
做組件之間的通信,觀察者模式(Subject直接使用 )。畫布變化事件,主要是將拖拽縮放的快交互,從React的更新體系中剝離出來,因為前者的任務調度影響了整體更新的流暢度
而不能直接定義emit的key - key是動態獲取的,這種情況,采取發布訂閱模式 - 事件總線的形式 - Canvas 模擬事件
- 大量的異步數據更新,并且這些數據之間還存在相互依賴的情況
- 通過 RxJS 整合了異步編程、時序控制以及各種遵循函數式編程思想的 Operators 的特性,優雅的實現了這種復雜的狀態機
三 rxjs語法
生產者 Observable
觀察者 Observer ( .subscribe)
operators
ajax請求 ,websocket,event事件
同步數據源
Of
range(1,100) 產生 1-100 的數據源
From
異步數據源
Interval
Timer
fromEvent
ajax
webSoket
組合數據源
Merge
combineLatest
Observable
被觀察者
const source$ = new Observable((subscriber)=>{
// 為啥不能解構 - 因為this綁定
subscriber.next();
subscriber.next();
subscriber.complete();
subscriber.error();
});
觀察者
source$ .subscribe({
next:()=>{},
complete:()=>{},
error:()=>{}
});
Subject
多播- Subject 是一種特殊類型的可觀察對象(Observable),同時也是觀察者(Observer)。它允許多個觀察者訂閱它,并且可以通過調用 next 方法來向觀察者發送新的值。
數據流既可以 next, complete, error
也可以 subscibe
注意:next發送的值,只能被已經訂閱的觀察者接收到。
注意:
多播也可以用作 takeUntil 的參數,用戶手動來
將流變成完成時
BehaviorSubject
對比Subject
BehaviorSubject 在被訂閱時會立即發送其最近的值給訂閱者 (可以拿到訂閱前最新一次的值)。而Subject不能傳遞初始值,且訂閱后才能接受next發送的值,訂閱之前的值,都拿不到。
of
在 RxJS 中,"of" 是一個創建 Observable 的操作符。它可以用來創建一個包含指定值的 Observable,并且這些值會按順序依次發出。
of(a,b,c) //會依次發送 a,b,c 三個值
of([a,b,c]) // 會發送一個 數組值
of是同步發送值,并且發送值以后狀態變成 complete
Interval (timer)
Interval (1000)每隔1000ms 連續發送數據,從0開始累加
timer(1000)隔1000ms發送數據0,發送一個數據后,進入完成狀態
from
將其他類型的數據源轉或者數據結構化為Observable
- 數組(數組同 of 數組不同,from 數組會依次發送,of 中數組會把數組當成一個整體進行發送)
- 轉化Promise。內部實現會手動調用promise中的 resolve或者reject,然后將返回值進行發。
- 轉化Iterotor 迭代器對象。 上面的數組,也就是迭代器對象,會依次調用next,將迭代器對象中每個值進行發送
fromEvent
將event 事件轉化為 Observable
merge
并行聚合 , 按照發送時間的先后順序,將多個源值并行為一個值。
of (1,2)
of(3,4,5)
Merge 后得到值, 1,2,3,4,5 。按照時間先后順序進行發送,而不會進行組合
—{count} +
一般用于不同的數據源返回相同的數據類型,
。觀察者不關心是哪個數據發送的數據,只關心
數據時間以及數據本身。
race
race(a1$, a2$,a3$) 哪一個流先發送數據,后續就一直訂閱這個流的數據。其他流全部放棄。
combineLatest
combineLatest([s1$, s2$, s3$] ).subscribe(([s1,s2,s3])=>{
// 操作
})
當 s1$, s2$, s3$ 都返回至少一個值時,會發送出一個值[s1,s2,s3],然后每次 三個流中有一個流發送變化都會發送出最新的[s1,s2,s3]
const serch$ = xxx;
const currentPage$ = xxx;
combineLatest([serch$,currentPage$ ]) .subscribe(([serch,currentPage])=>{
// 轉化成其他源文件
});
forkJoin
forkJoin( [a1$,a2$,a3$] ) , 需要等到 a1$, a2$, a3$ 流都進入完成時,最后返回一次 完成時最后一次發送的數據 數組[a1,a2,a3]
其中有一個流未進入完成時,都不會最終返回結果。
可以使用 take(1)讓流進入完成時
有點類似于 Promise.all
animationFrameScheduler
任務調度:類似于 window.requestAnimationFrame 每一幀去去篩選數據
Operator (寫在pipe中)
startWith
以什么開頭,給一個默認值
map
轉化數據 ,輸入的是一個函數
mapTo (已廢除)
轉化為常量。廢除了,直接用map(()=> 常量)來表示,過多的 operator 會增加知識復雜度
scan
類似于reduce積累函數 (可以累積發送的值,也可以累積是第幾次發送的值,并和當次的值進行組合返回)
withLatestFrom
withLatestFrom操作符是一種在源Observable發出值時將其與其他Observable的最新值進行組合的方式。當源Observable完成時,withLatestFrom不會觸發完成回調,并且不會再生成任何結果。source1$.pipe(withLagestFrom(second$)).subscribe(([s1,s2])=>{
// 保證每個source1發送的數據,都對應 second$數據為最新的值
})
注意,在外層源發出數據時,需要確保內層源已經發送出數據了(至少一個),不然取不到值。
也就是,每次外層源發送一個值,就去內層源中取 上次最新發送的值 進行組合。
switchMap
switchMap 接受一個函數作為參數,該函數將源 Observable 的值映射為一個新的 Observable。每當源 Observable 發出新的值時,switchMap 會取消訂閱前一個內部 Observable,并訂閱最新的內部 Observable。這樣可以確保只有最新的內部 Observable 的值被發出,而忽略前一個內部 Observable 的值。
當外層數據產生新值時,內層的數據源訂閱會被取消掉
從而確保內部源使用到的外部源的數據總是最新的
可用于解決翻頁渲染問題。例如,我們翻頁,從第一頁,翻頁到第二頁
又快速翻頁到第三頁。如果出現網絡問題。
switchMap 的使用,當翻頁數據到達第三頁時,如果第二頁還沒有返回數據,訂閱
訂閱會被取消,就算第二頁數據返回了,也不會監聽到。
mergeMap
串行聚合(有先后順序的聚合)
mousedow - mousemove(takeUntil - mouseup)
concatMap
用于將一個外部 Observable 序列中的每個值映射為內部Observable,并按照順序依次訂閱內部Observable。當一個內部Observable完成后,才會開始訂閱下一個內部Observable。(用于同步數據轉 其他數據源,當內部的其他數據源完成時狀態時,才會去訂閱 外部源的另外一個發送過來的數據 - 但 外部同步數據源肯定是沒有延遲的,同時發送了所有數據,但是只是被緩存了,沒有進行訂閱而已)
tap
拿傳過的來值進行一次不影響后續 Stream 的 “純操作”,
通過 tap 操作符進行 RxJS 的 Debug
take
取具體幾個數據,然后滿足條件后數據變成完成時。
take(3), 當流發送3個數據后,進入完成時。
takeUntil
獲取值,直到某個流發送數據為止,并且讓當前流進入完成時狀態
參數為一個函數,函數的返回值為一個新的流,當這個新的流
發送第一個數據的時候。外層的流就會終止訂閱
takeWhile
中一個常用的操作符是takeWhile,它可以根據條件判斷是否繼續發送數據或完成流。
delay
讓流的發送間隔一段時間再繼續發送
delay(300) 延遲300ms
catchError
它用于捕獲 Observable 中的錯誤并處理這些錯誤。
相當于拿到報錯信息,做一層處理,然后return 的值會被,subscribe 的的第二個參數 - 處理報錯信息函數監聽到。
retryWhen
在rxjs中,retryWhen運算符用于重新訂閱一個Observable對象,只有當retryWhen返回的新Observable對象發出complete通知時,才會停止重新訂閱。
類似Promise, 當進入錯誤狀態后,流狀態就會進入異常狀態,不會再進行監聽
retryWhen(errors$ => {
// 控制重試邏輯
return errors$
.pipe(
delay(1000) // 延遲1秒后重新訂閱
)
.pipe(
// 可以添加更多的操作符
take(3) // 重試最多3次
);
})
delayWhen
s1$.pipe(delayWhen(()=> s2$))
當 s2$ 流發送數據時, s1$ 才會發送數據,不過s1數據會不斷累積,在s2發送數據那一刻,全部爆發,一次性發送過來。
distinctUntilChanged
直到源文件發送的值發生改變后,才發送數據
Const currentPage$ = currentSubject$.pipe(distinctUntilChange())
debounceTime
防抖
debounceTime(300)
throttleTime
節流
throttleTime(300)
observeOn
調度器
Import { animationFrameScheduler } from 'rxjs';
- asyncScheduler: 調度器可以在異步上下文中執行操作符和觀察者。它通過使用 setTimeout 來異步調度任務。
- queueScheduler: 調度器會按順序執行任務,并且每個任務之間有一個最小延遲時間。它使用 setTimeout 進行任務調度
- asapScheduler:調度器用于盡快執行任務,優先于 setTimeout。它使用 setImmediate(在支持的環境中)或者 setTimeout(在不支持 setImmediate 的環境中)進行調度。
- animationFrameScheduler: 是用來執行與動畫相關的任務,基于瀏覽器的動畫幀調度器。
- virtualTimeScheduler: 是一個虛擬的調度器,主要用于測試目的。它使用虛擬的時間概念,可以提供對時間的精確控制
使?了asap,是指產?全部數據的時間跨度是215毫秒,?
不是獨占唯?的線程215毫秒,asap把產?每?個數據的?作都通過Micro Task來實現,這多繞了個路,時間跨度當然?,但是這也避免了同步調 ?,在這215毫秒?,其他插?的微任務可以獲得執?的機會,如果是在瀏 覽器中執?,?頁中的渲染和對?戶操作的響應也可以在事件處理的間隙
獲得執?的機會。
使?asap這樣的Scheduler,?的是提?整體的感知性能,?不 是為了縮短單獨?個數據流的執?時間。
asap會盡量使?Micro Task,?async利?的是Macro Task
queue這個Scheduler,如果調?它的schedule函數式參數delay是0,那 它就?同步的?式執?,如果delay參數?于0,那queue的表現其實就和 async?模?樣
調度器
時間調度:requestAnimationFrame - raf
驗證 mergeMap 和 switchMap 的區別
of(1000, 300, 200, 100)
.pipe(
// switchMap((v) => of(v).pipe(delay(v))),
mergeMap((v) => of(v).pipe(delay(v))),
//observeOn(animationFrameScheduler),
)
.subscribe((v) => {
console.log(v);
});
四 什么情況下需要使用rxjs
- 出現多重異步的情況,使用promise 或者async/await 都有點吃力,寫出來的代碼耦合度高、雜亂不易理解。
- 需要使用觀察者模式進行監聽發布時,可以考慮使用rxjs
使用rxjs目的,減少異步書寫難度,減少代碼耦合、增強代碼可維護性。
注意,一般能使用promise、async/await 解決的簡單問題,不建議使用rxjs。
五 案例介紹 - 需求一: 用rxjs 實現一個簡單的 excel 表格
- 需求二:實現一個表單搜索與表格翻頁于一體的功能
- input框輸入要進行 防抖操作
- 翻頁數字必須變化才會進行搜索
- 最后渲染的數據必須同搜索的數據流一致(防止因某些網絡原因或者服務器原因,導致前進行請求的數據后返回的情況)
- 需求三: 你畫我猜 游戲
- 鼠標按下,鼠標移動根據鼠標移動的軌跡進行線條繪制
- 鼠標彈起,停止繪制
- 需求四:鍵盤按鍵觸發彩蛋- 大招
- 比如,需要在3秒內觸發 上下下左上右下 這幾個按鍵就能觸發一個彩蛋。
- 需求五:使用rxjs 實現 canvas 事件系統
參考文檔: - Rxjs 官網(英文)
- Rxjs 官網 (中文翻譯版)
- 學習rxjs 操作符 中文版
- 《深入淺出Rxjs》
- 從業務邏輯聊聊為什么我們需要Rxjs?
- 異步復雜到什么程度才需要使用rxjs?
- 彈珠圖
該文章在 2024/11/12 11:24:54 編輯過