LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發文檔 其他文檔  
 
網站管理員

聊聊響應式編程的 JavaScript 庫 RXJS

admin
2024年11月11日 10:50 本文熱度 629

一 什么是rxjs?
RxJS(Reactive Extensions for JavaScript)是一個用于響應式編程的 JavaScript 庫。它通過使用可觀察對象(Observables)和操作符(Operators)來處理異步和事件驅動的代碼。
什么是響應式編程? 程序的輸入可以被當成一個數據流,例如 excel表格的累加。
響應式編程世界里知名度最高的框架 - Reactive Extension , 簡稱 RX,指實踐響應式編程的一套工具。在Rx官網有這樣一段文字
An API for asynchronous programming with observable streams.

[圖片]
Rx的概念最初由微軟公司實現并開源,也就是Rx.NET,因為Rx帶來的編程方式大大改進了異步編程模型,在.NET之后,眾多開發者在其他平臺和語言上也實現了Rx的類庫。可見,Rx其實是一個大家族,在這個大家族中,還有用Java實現的RxJava,用C++實現的RxCpp,用Ruby實現的Rx.rb,用Python實現的RxPy,當然,還有這個大家族中最年長的Rx.NET。在本書中,我們介紹的是RxJS,也就是Rx的JavaScript語言實現。
關于設計模式:
任何一種模式,指的都是解決某一個特定類型問題的套路和方法。現實世界的問題復雜多變,往往不是靠單獨一種模式能夠解決的,更需要的是多種模式的組合,RxJS的Observable就是觀察者模式和迭代器模式的組合。
觀察者模式 要解決的問題,就是在一個持續產生事件的系統中,如何分割功能,讓不同模塊只需要處理一部分邏輯,這種分而治之的思想是基本的系統設計概念,當然,“分”很容易,關鍵是如何“治”。觀察者模式對“治”這個問題提的解決方法是這樣,將邏輯分為發布者(Publisher)和觀察者(Observer),其中發布者只管負責產生事件,它會通知所有注冊掛上號的觀察者,而不關心這些觀察者如何處理這些事件,相對的,觀察者可以被注冊上某個發布者,只管接收到事件之后就處理,而不關心這些數據是如何產生的。
迭代器模式(Iterator,也稱為“迭代器”)指的是能夠遍歷一個數據集合的對象,因為數據集合的實現方式很多,可以是一個數組,也可以是一個樹形結構,也可以是一個單向鏈表……迭代器的作用就是提供一個通用的接口,讓使用者完全不用關心這個數據集合的具體實現方式。迭代器另一個容易理解的名字叫游標(cursor),就像是一個移動的指針一樣,從集合中一個元素移到另一個元素,完成對整個集合的遍歷。

  1. Rxjs 特點
  2. 使用數據流對現實問題進行抽象
  3. 擅長處理異步操作
  4. 把復雜問題抽象成簡單問題的組合
  5. Rxjs 的基本概念
  6. 數據源 - 可觀察對象 Observable
    import { Observable } from 'rxjs';
    //代表“流”的變量標示符,都是用$符號結尾,這是RxJS編程中普遍使用的風格,被稱為“芬蘭式命名法”(Finnish Notation)
    const source$ = new Observable((observer)=>{
    observer.next(1);
    observer.next(2);
    observer.complete();
    // 異常處理
    observer.error('error');
    });
  7. 操作符
    通過 pipe - 管道,提供的各種操作符來實現各種數據流的轉化和操作。
    包括:創建型/轉化型/過濾型/合并型/錯誤處理型
    import { ajax } from 'rxjs/ajax';
    import { retry } from 'rxjs/operators';

ajax('https://example.com/api/data').pipe(
// 最多重新訂閱 3 次,如果仍然失敗,則輸出錯誤信息
retry(3)
).subscribe(
response => console.log(response),
error => console.log('Request failed after multiple attempts:', error)
);

  1. 觀察者 - 訂閱可觀察對象
    import { Subject } from 'rxjs';

const source$ = new Subject();
// 訂閱
const sub = source$.subscribe({
next:(data)=>{}
complete:()=>{}
error:(err)=>{}
});
// 取消訂閱
sub.unsubscribe();
3. Rxjs 中的狀態

  1. 未開始:Observable 尚未開始發出任何值。這是 Observable 的初始狀態。
  2. 進行中:Observable 正在發出值,可以是零個或多個。在這種狀態下,觀察者可以訂閱并處理這些發出的值。
  3. 完成:Observable 成功地發出了所有值,并已經終止。在這種狀態下,觀察者可以做相應的清理工作。
  4. 錯誤:Observable 在發出值的過程中遇到錯誤,終止并發送錯誤通知。觀察者可以處理這個錯誤通知。
    注意:rxjs 只有一種終結狀態,要么完成,要么出錯。當 Observable 處于已經完成或錯誤狀態時,是不再允許通過 next 方法發送新的數據的。一旦 Observable 進入完成或錯誤狀態,它就不會再發出任何新的值。如果需要在完成或錯誤狀態后再次發送新的數據,可以使用其他操作符或技術來處理,例如創建一個新的 Observable,或者使用錯誤處理操作符捕獲并轉換錯誤。
  5. RxJS 與傳統 JavaScript 異步編程之間的對比:
  6. 響應式編程:RxJS 提供了一種響應式編程的范式,其中數據流被建模為可觀察對象,能夠在時間上發出值。你可以訂閱這些可觀察對象,以便在內部值發生變化時執行相應的操作。 傳統 JavaScript 異步編程: 傳統的 JavaScript 異步編程使用回調函數,事件處理程序或 Promise 來處理異步操作。
  7. 異步控制:RxJS 提供了一套強大的操作符,用于控制異步代碼的執行順序和并發性。你可以使用操作符如 mergeMap、switchMap 和 concatMap 來處理各種異步操作,并管理它們的執行順序和結果。 傳統 JavaScript 異步編程: 在傳統的 JavaScript 異步編程中,你可能會使用嵌套的回調函數或 Promise 鏈來控制異步操作的順序。
  8. 組合和轉換操作:RxJS 提供了許多操作符,用于組合和轉換可觀察對象。例如,map 操作符可以將一個可觀察對象的每個值映射到一個新值,filter 操作符可以過濾出符合特定條件的值。這些操作符使得處理數據流變得更加方便和靈活。 傳統 JavaScript 異步編程: 在傳統的 JavaScript 異步編程中,操作數據流通常需要手動編寫循環和條件語句。
  9. 錯誤處理:RxJS 提供了豐富的錯誤處理機制,例如通過 catchError 操作符捕獲錯誤并返回備用值,或者使用 retry 操作符重新訂閱可觀察對象以重試操作。 傳統 JavaScript 異步編程: 在傳統的 JavaScript 異步編程中,錯誤處理通常使用 try-catch 塊或者 Promise 的 catch 方法進行處理。
    總的來說,RxJS 提供了一個強大而靈活的工具集,用于處理異步和事件驅動的代碼。它可以幫助開發者將復雜的異步操作轉化為簡潔、可讀性強的代碼,并能有效地處理錯誤、控制執行順序和進行數據轉換。
  10. Rxjs 與 Promise 對比 : 加強版的Promise
    Promise 的寫法
    // Promise 的三種狀態,pending, fulfilled, rejected
    // Promise 定義出來 狀態變為pending
    const p = new Promise((resolve,reject)=>{
    resolve(); // 狀態變為fulfilled
    reject(); // 狀態變為 rejected
    });
    p.then(()=>{}).catch().finally();

// 對比Observable
const s$ = new Observable((observer)=>{
// 對比promise ,observable 可以發送多個值,而Promise 只能發送一個值就會進行完結狀態
// 這兒有點類似 ajax 和 websocket 的味道了
observer.next();
observer.next();
observer.compolete();
observer.error();
});
s$.pipe().subscribe({
next:(v)=>{// 接受數據},
complete:()=>{ // 完成狀態后的回調},
error:(err)=>{//獲取 observer.error() 發送過來的值}
});
// 使用完成后需要取消訂閱
s$.unsubscsribe();
RxJS 和 Promise 都是處理異步編程的強大工具,但它們在一些方面有所不同。下面是 RxJS 相對于 Promise 的一些優點:

  1. 組合性和靈活性:RxJS 提供了豐富的操作符,允許你以各種方式組合和轉換可觀察對象,使其更加靈活。你可以使用 map、filter、mergeMap 等操作符來處理數據流,而不僅僅是處理單個值。這樣可以使你的代碼更加簡潔、可讀性更強,并且更容易進行復雜的異步操作。
  2. 錯誤處理:RxJS 提供了一套完善的錯誤處理機制,你可以使用 catchError 操作符捕獲錯誤并返回備用值,或使用 retry 操作符進行錯誤重試。這使得在處理異步操作時更加容易處理和管理錯誤情況。
  3. 取消和中斷:RxJS 提供了 unsubscribe 方法,可以取消訂閱可觀察對象并中斷正在進行的異步操作。這對于在某些條件滿足時取消異步操作非常有用,而 Promise 并沒有直接提供這樣的機制。
  4. 動態數據流:RxJS 的可觀察對象是動態的,意味著你可以在需要時動態地添加、移除或改變流中的值。這使得操作流程非常靈活,可以根據實際需要進行動態調整。
    需要注意的是,Promise 也有其優勢。Promise 的語法相對簡單,容易上手,并且被廣泛支持和采用。它在處理單個值的異步操作時非常方便。如果你的需求只是處理一次性的異步操作,那么 Promise 可能已經足夠滿足你的需求。
    綜上所述,RxJS 相對于 Promise 在處理復雜的、多個值的異步操作以及靈活性方面具有一定的優勢。它提供了更多的工具和操作符,使得處理和組合異步操作更加方便和可讀。然而,選擇使用 RxJS 還是 Promise 取決于你的具體需求和個人偏好。
    二 為什么要使用rxjs ?
  5. 優秀的異步數據流處理 (異步具有時效性)
  6. 豐富的operator 來操控數據流
    做組件之間的通信,觀察者模式(Subject直接使用 )。畫布變化事件,主要是將拖拽縮放的快交互,從React的更新體系中剝離出來,因為前者的任務調度影響了整體更新的流暢度
    而不能直接定義emit的key - key是動態獲取的,這種情況,采取發布訂閱模式 - 事件總線的形式
  7. Canvas 模擬事件
  8. 大量的異步數據更新,并且這些數據之間還存在相互依賴的情況
  9. 通過 RxJS 整合了異步編程、時序控制以及各種遵循函數式編程思想的 Operators 的特性,優雅的實現了這種復雜的狀態機

三 rxjs語法
生產者 Observable
觀察者 Observer ( .subscribe)
operators
ajax請求 ,websocket,event事件
同步數據源
Of
range(1,100) 產生 1-100 的數據源
From
異步數據源
Interval
Timer
fromEvent
ajax
webSoket

組合數據源
Merge
combineLatest

Observable
被觀察者
const source$ = new Observable((subscriber)=>{
// 為啥不能解構 - 因為this綁定
subscriber.next();
subscriber.next();
subscriber.complete();
subscriber.error();
});
觀察者
source$ .subscribe({
next:()=>{},
complete:()=>{},
error:()=>{}
});

Subject
多播- Subject 是一種特殊類型的可觀察對象(Observable),同時也是觀察者(Observer)。它允許多個觀察者訂閱它,并且可以通過調用 next 方法來向觀察者發送新的值。
數據流既可以 next, complete, error
也可以 subscibe
注意:next發送的值,只能被已經訂閱的觀察者接收到。
注意:
多播也可以用作 takeUntil 的參數,用戶手動來
將流變成完成時
BehaviorSubject
對比Subject
BehaviorSubject 在被訂閱時會立即發送其最近的值給訂閱者 (可以拿到訂閱前最新一次的值)。而Subject不能傳遞初始值,且訂閱后才能接受next發送的值,訂閱之前的值,都拿不到。

of
在 RxJS 中,"of" 是一個創建 Observable 的操作符。它可以用來創建一個包含指定值的 Observable,并且這些值會按順序依次發出。
of(a,b,c) //會依次發送 a,b,c 三個值
of([a,b,c]) // 會發送一個 數組值
of是同步發送值,并且發送值以后狀態變成 complete

Interval (timer)
Interval (1000)每隔1000ms 連續發送數據,從0開始累加
timer(1000)隔1000ms發送數據0,發送一個數據后,進入完成狀態

from

將其他類型的數據源轉或者數據結構化為Observable

  1. 數組(數組同 of 數組不同,from 數組會依次發送,of 中數組會把數組當成一個整體進行發送)
  2. 轉化Promise。內部實現會手動調用promise中的 resolve或者reject,然后將返回值進行發。
  3. 轉化Iterotor 迭代器對象。 上面的數組,也就是迭代器對象,會依次調用next,將迭代器對象中每個值進行發送

fromEvent
將event 事件轉化為 Observable

merge
并行聚合 , 按照發送時間的先后順序,將多個源值并行為一個值。
of (1,2)
of(3,4,5)
Merge 后得到值, 1,2,3,4,5 。按照時間先后順序進行發送,而不會進行組合
—{count} +

一般用于不同的數據源返回相同的數據類型,
。觀察者不關心是哪個數據發送的數據,只關心
數據時間以及數據本身。

race
race(a1$, a2$,a3$) 哪一個流先發送數據,后續就一直訂閱這個流的數據。其他流全部放棄。

combineLatest

combineLatest([s1$, s2$, s3$] ).subscribe(([s1,s2,s3])=>{
// 操作
})
當 s1$, s2$, s3$ 都返回至少一個值時,會發送出一個值[s1,s2,s3],然后每次 三個流中有一個流發送變化都會發送出最新的[s1,s2,s3]
const serch$ = xxx;
const currentPage$ = xxx;
combineLatest([serch$,currentPage$ ]) .subscribe(([serch,currentPage])=>{
// 轉化成其他源文件

});
forkJoin

forkJoin( [a1$,a2$,a3$] ) , 需要等到 a1$, a2$, a3$ 流都進入完成時,最后返回一次 完成時最后一次發送的數據 數組[a1,a2,a3]
其中有一個流未進入完成時,都不會最終返回結果。
可以使用 take(1)讓流進入完成時
有點類似于 Promise.all

animationFrameScheduler
任務調度:類似于 window.requestAnimationFrame 每一幀去去篩選數據

Operator (寫在pipe中)
startWith
以什么開頭,給一個默認值

map
轉化數據 ,輸入的是一個函數

mapTo (已廢除)
轉化為常量。廢除了,直接用map(()=> 常量)來表示,過多的 operator 會增加知識復雜度

scan

類似于reduce積累函數 (可以累積發送的值,也可以累積是第幾次發送的值,并和當次的值進行組合返回)

withLatestFrom
withLatestFrom操作符是一種在源Observable發出值時將其與其他Observable的最新值進行組合的方式。當源Observable完成時,withLatestFrom不會觸發完成回調,并且不會再生成任何結果。source1$.pipe(withLagestFrom(second$)).subscribe(([s1,s2])=>{
// 保證每個source1發送的數據,都對應 second$數據為最新的值
})
注意,在外層源發出數據時,需要確保內層源已經發送出數據了(至少一個),不然取不到值。
也就是,每次外層源發送一個值,就去內層源中取 上次最新發送的值 進行組合。

switchMap

switchMap 接受一個函數作為參數,該函數將源 Observable 的值映射為一個新的 Observable。每當源 Observable 發出新的值時,switchMap 會取消訂閱前一個內部 Observable,并訂閱最新的內部 Observable。這樣可以確保只有最新的內部 Observable 的值被發出,而忽略前一個內部 Observable 的值。

當外層數據產生新值時,內層的數據源訂閱會被取消掉
從而確保內部源使用到的外部源的數據總是最新的

可用于解決翻頁渲染問題。例如,我們翻頁,從第一頁,翻頁到第二頁
又快速翻頁到第三頁。如果出現網絡問題。
switchMap 的使用,當翻頁數據到達第三頁時,如果第二頁還沒有返回數據,訂閱
訂閱會被取消,就算第二頁數據返回了,也不會監聽到。

mergeMap

串行聚合(有先后順序的聚合)

mousedow - mousemove(takeUntil - mouseup)

concatMap

用于將一個外部 Observable 序列中的每個值映射為內部Observable,并按照順序依次訂閱內部Observable。當一個內部Observable完成后,才會開始訂閱下一個內部Observable。(用于同步數據轉 其他數據源,當內部的其他數據源完成時狀態時,才會去訂閱 外部源的另外一個發送過來的數據 - 但 外部同步數據源肯定是沒有延遲的,同時發送了所有數據,但是只是被緩存了,沒有進行訂閱而已)

tap
拿傳過的來值進行一次不影響后續 Stream 的 “純操作”,
通過 tap 操作符進行 RxJS 的 Debug

take

取具體幾個數據,然后滿足條件后數據變成完成時。
take(3), 當流發送3個數據后,進入完成時。

takeUntil

獲取值,直到某個流發送數據為止,并且讓當前流進入完成時狀態

參數為一個函數,函數的返回值為一個新的流,當這個新的流
發送第一個數據的時候。外層的流就會終止訂閱
takeWhile
中一個常用的操作符是takeWhile,它可以根據條件判斷是否繼續發送數據或完成流。

delay

讓流的發送間隔一段時間再繼續發送
delay(300) 延遲300ms

catchError
它用于捕獲 Observable 中的錯誤并處理這些錯誤。
相當于拿到報錯信息,做一層處理,然后return 的值會被,subscribe 的的第二個參數 - 處理報錯信息函數監聽到。

retryWhen
在rxjs中,retryWhen運算符用于重新訂閱一個Observable對象,只有當retryWhen返回的新Observable對象發出complete通知時,才會停止重新訂閱。
類似Promise, 當進入錯誤狀態后,流狀態就會進入異常狀態,不會再進行監聽
retryWhen(errors$ => {
// 控制重試邏輯
return errors$
.pipe(
delay(1000) // 延遲1秒后重新訂閱
)
.pipe(
// 可以添加更多的操作符
take(3) // 重試最多3次
);
})

delayWhen

s1$.pipe(delayWhen(()=> s2$))
當 s2$ 流發送數據時, s1$ 才會發送數據,不過s1數據會不斷累積,在s2發送數據那一刻,全部爆發,一次性發送過來。

distinctUntilChanged
直到源文件發送的值發生改變后,才發送數據
Const currentPage$ = currentSubject$.pipe(distinctUntilChange())
debounceTime
防抖
debounceTime(300)

throttleTime
節流
throttleTime(300)

observeOn

調度器
Import { animationFrameScheduler } from 'rxjs';

  1. asyncScheduler: 調度器可以在異步上下文中執行操作符和觀察者。它通過使用 setTimeout 來異步調度任務。
  2. queueScheduler: 調度器會按順序執行任務,并且每個任務之間有一個最小延遲時間。它使用 setTimeout 進行任務調度
  3. asapScheduler:調度器用于盡快執行任務,優先于 setTimeout。它使用 setImmediate(在支持的環境中)或者 setTimeout(在不支持 setImmediate 的環境中)進行調度。
  4. animationFrameScheduler: 是用來執行與動畫相關的任務,基于瀏覽器的動畫幀調度器。
  5. virtualTimeScheduler: 是一個虛擬的調度器,主要用于測試目的。它使用虛擬的時間概念,可以提供對時間的精確控制
    使?了asap,是指產?全部數據的時間跨度是215毫秒,?
    不是獨占唯?的線程215毫秒,asap把產?每?個數據的?作都通過Micro Task來實現,這多繞了個路,時間跨度當然?,但是這也避免了同步調 ?,在這215毫秒?,其他插?的微任務可以獲得執?的機會,如果是在瀏 覽器中執?,?頁中的渲染和對?戶操作的響應也可以在事件處理的間隙
    獲得執?的機會。
    使?asap這樣的Scheduler,?的是提?整體的感知性能,?不 是為了縮短單獨?個數據流的執?時間。

asap會盡量使?Micro Task,?async利?的是Macro Task
queue這個Scheduler,如果調?它的schedule函數式參數delay是0,那 它就?同步的?式執?,如果delay參數?于0,那queue的表現其實就和 async?模?樣
調度器
時間調度:requestAnimationFrame - raf
驗證 mergeMap 和 switchMap 的區別
of(1000, 300, 200, 100)
.pipe(
// switchMap((v) => of(v).pipe(delay(v))),
mergeMap((v) => of(v).pipe(delay(v))),
//observeOn(animationFrameScheduler),
)
.subscribe((v) => {
console.log(v);
});
四 什么情況下需要使用rxjs

  1. 出現多重異步的情況,使用promise 或者async/await 都有點吃力,寫出來的代碼耦合度高、雜亂不易理解。
  2. 需要使用觀察者模式進行監聽發布時,可以考慮使用rxjs
    使用rxjs目的,減少異步書寫難度,減少代碼耦合、增強代碼可維護性。
    注意,一般能使用promise、async/await 解決的簡單問題,不建議使用rxjs。
    五 案例介紹
  3. 需求一: 用rxjs 實現一個簡單的 excel 表格
  4. 需求二:實現一個表單搜索與表格翻頁于一體的功能
  5. input框輸入要進行 防抖操作
  6. 翻頁數字必須變化才會進行搜索
  7. 最后渲染的數據必須同搜索的數據流一致(防止因某些網絡原因或者服務器原因,導致前進行請求的數據后返回的情況)
  8. 需求三: 你畫我猜 游戲
  9. 鼠標按下,鼠標移動根據鼠標移動的軌跡進行線條繪制
  10. 鼠標彈起,停止繪制
  11. 需求四:鍵盤按鍵觸發彩蛋- 大招
  12. 比如,需要在3秒內觸發 上下下左上右下 這幾個按鍵就能觸發一個彩蛋。
  13. 需求五:使用rxjs 實現 canvas 事件系統
    參考文檔:
  14. Rxjs 官網(英文)
  15. Rxjs 官網 (中文翻譯版)
  16. 學習rxjs 操作符 中文版
  17. 《深入淺出Rxjs》
  18. 從業務邏輯聊聊為什么我們需要Rxjs?
  19. 異步復雜到什么程度才需要使用rxjs?
  20. 彈珠圖

該文章在 2024/11/12 11:24:54 編輯過
關鍵字查詢
相關文章
正在查詢...
點晴ERP是一款針對中小制造業的專業生產管理軟件系統,系統成熟度和易用性得到了國內大量中小企業的青睞。
點晴PMS碼頭管理系統主要針對港口碼頭集裝箱與散貨日常運作、調度、堆場、車隊、財務費用、相關報表等業務管理,結合碼頭的業務特點,圍繞調度、堆場作業而開發的。集技術的先進性、管理的有效性于一體,是物流碼頭及其他港口類企業的高效ERP管理信息系統。
點晴WMS倉儲管理系統提供了貨物產品管理,銷售管理,采購管理,倉儲管理,倉庫管理,保質期管理,貨位管理,庫位管理,生產管理,WMS管理系統,標簽打印,條形碼,二維碼管理,批號管理軟件。
點晴免費OA是一款軟件和通用服務都免費,不限功能、不限時間、不限用戶的免費OA協同辦公管理系統。
Copyright 2010-2025 ClickSun All Rights Reserved

黄频国产免费高清视频,久久不卡精品中文字幕一区,激情五月天AV电影在线观看,欧美国产韩国日本一区二区
中文字幕日本乱码仑区在线 | 亚洲日本久久一区二区va | 日本三级香港三级三级人!妇久 | 日韩亚洲欧美一二三区 | 亚洲中文色资源 | 久久综合精品国产丝袜长腿 |