我們都知道RxJS的Observeable會在subscribe的時候才會執行,所以每一次的subscribe都會執行一次,但是,某些情況下我們並不想要那樣子做,而在RxJS裡面有一個MultiCasting的觀念,主要是用來處理一個Observeable多個Observer的情況時,而不重複執行Observable. 這篇會整理一下關於MultiCasting的相關觀念
假設情境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 var source = Rx .Observable .interval (1000 ).take (5 )var ObserverA = { next : function (value ){ console .log ('A next ' + value); }, error : function (error ){ console .error ('A error ' + error); }, complete : function ( ){ console .log ('A Complete' );} } source.subscribe (ObserverA ); var ObserverB = { next : function (value ){ console .log ('B next ' + value); }, error : function (error ){ console .error ('B error ' + error); }, complete : function ( ){ console .log ('B Complete' );} } setTimeout (function ( ){ source.subscribe (ObserverB ); },2000 );
這樣子跑出來的結果如下
Subject
Observer A和Observer B都有各自己的結果, 如果,我們想要Observer A和Observer B共用同一個資料流的話,該怎麼處理? 這時候就要借用RxJS裡面的 Subject 這個類型的幫助了
官網是這樣子定義Subject的
A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.
用法如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 var source = Rx .Observable .interval (1000 ).take (5 )var subject = Rx .Subject .create ();var ObserverA = { next : function (value ){ console .log ('A next ' + value); }, error : function (error ){ console .error ('A error ' + error); }, complete : function ( ){ console .log ('A Complete' );} } subject.subscribe (ObserverA ); source.subscribe (subject); var ObserverB = { next : function (value ){ console .log ('B next ' + value); }, error : function (error ){ console .error ('B error ' + error); }, complete : function ( ){ console .log ('B Complete' );} } setTimeout (function ( ){ subject.subscribe (ObserverB ); },2000 );
這樣Observer A與Observer B就共用同一個資料流的資料了, 但是每次都這樣子寫有點麻煩.
multicast
可以使用multicast的方式將Object.create的方式包起來, 程式碼如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 var source = Rx .Observable .interval (1000 ).take (5 ) .multicast (Rx .Subject .create ()) var ObserverA = { next : function (value ){ console .log ('A next ' + value); }, error : function (error ){ console .error ('A error ' + error); }, complete : function ( ){ console .log ('A Complete' );} } source.connect (); source.subscribe (ObserverA ); var ObserverB = { next : function (value ){ console .log ('B next ' + value); }, error : function (error ){ console .error ('B error ' + error); }, complete : function ( ){ console .log ('B Complete' );} } setTimeout (function ( ){ source.subscribe (ObserverB ); },2000 );
使用multicast這個operator, 必須使用 .connect() 來執行 Observable了,因為,這裡的 source.subscribe
是針對Subject做subscribe而非Observable本身.
publish
publish為mulitcast的變化型, 在mulitcast裡面需要給予一個Rx.Subject, 例如
1 2 3 4 5 6 7 8 .multicast (new Rx .Subject ()) .publish () .multicast (new Rx .ReplaySubject ()) .publishReplay ()
refCount
可是,這樣子寫又有點麻煩,有沒有自動開始結束的寫法. 其實是有的,那就是 refCount
refCount: 啟動條件: subscriber數量大於0時。停止條件: subscriber數量等於0時
recCount makes the multicasted Observable automatically start executing when the first subscriber arrives, and stop executing when the last subscriber leaves.
程式碼如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 var source = Rx .Observable .interval (1000 ) .do (x => console .log ('souce ' + x)) .publish () .refCount () var ObserverA = { next : function (value ){ console .log ('A next ' + value); }, error : function (error ){ console .error ('A error ' + error); }, complete : function ( ){ console .log ('A Complete' );} } var subA = source.subscribe (ObserverA );var ObserverB = { next : function (value ){ console .log ('B next ' + value); }, error : function (error ){ console .error ('B error ' + error); }, complete : function ( ){ console .log ('B Complete' );} } var subBsetTimeout (function ( ){ subB = source.subscribe (ObserverB ); },2000 ); setTimeout (function ( ){ subA.unsubscribe (); console .log ('unsubscribe A' ); },5000 ) setTimeout (function ( ){ subB.unsubscribe (); console .log ('unsubscribe B' ); },7000 )
執行結果如下
share
RxJS有提供更簡便的寫法 share
,share是publish
, refCount
這兩個operator的簡寫,程式碼如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 var source = Rx .Observable .interval (1000 ) .do (x => console .log ('souce ' + x)) .share () var ObserverA = { next : function (value ){ console .log ('A next ' + value); }, error : function (error ){ console .error ('A error ' + error); }, complete : function ( ){ console .log ('A Complete' );} } var subA = source.subscribe (ObserverA );var ObserverB = { next : function (value ){ console .log ('B next ' + value); }, error : function (error ){ console .error ('B error ' + error); }, complete : function ( ){ console .log ('B Complete' );} } var subBsetTimeout (function ( ){ subB = source.subscribe (ObserverB ); },2000 ); setTimeout (function ( ){ subA.unsubscribe (); console .log ('unsubscribe A' ); },5000 ) setTimeout (function ( ){ subB.unsubscribe (); console .log ('unsubscribe B' ); },7000 )
執行結果是一樣的
結論
以上為最基本的multicast的幾種用法,但是這都只是基本款而已,因為RxJS裡面的Subject有好幾種,每一種類型的Subject所呈現的結果又都不一樣,這就待下一篇再來討論