A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.
這表示Subject是 Observable,也是Observer。而這篇文章重點會放在每種類型的Subject的特性
RxJS 的 Subject類型除了基本型Subject
, 還有以下幾種類型的Subject BehaviorSubject
, ReplaySubject
, and AsyncSubject
Subject
程式碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 var source = Rx .Observable .interval (1000 ).take (5 ) .do (function (value ){ console .log ('source' + value);}) var subject = new Rx .Subject (); var ObserverA = { next : function (value ){ console .log ('A next ' + value); }, error : function (error ){ console .error ('A error ' + error); }, complete : function ( ){ console .log ('A Complete' );} } var ObserverB = { next : function (value ){ console .log ('B next ' + value); }, error : function (error ){ console .error ('B error ' + error); }, complete : function ( ){ console .log ('B Complete' );} } source.subscribe (subject); subject.subscribe (ObserverA ); console .log ('ObserverA subscribed' ); setTimeout (function ( ){ subject.subscribe (ObserverB ); console .log ('ObserverB subscribed' ); },2000 )
執行結果
BehaviorSubject
BehaviorSubjects are useful for representing 「values over time」. For instance, an event stream of birthdays is a Subject, but the stream of a person’s age would be a BehaviorSubject.
BehaviorSubject會記錄最後一次的值 ,當後來註冊進來的subscriber可以知道當下的值是什麼
所以BehaviorSubject在建立時,需要指定一個初始值
程式碼如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 var source = Rx .Observable .interval (1000 ).take (5 ) .do (function (value ){ console .log ('source ' + value);}) var subject = new Rx .BehaviorSubject (0 ); var ObserverA = { next : function (value ){ console .log ('A next ' + value); }, error : function (error ){ console .error ('A error ' + error); }, complete : function ( ){ console .log ('A Complete' );} } var ObserverB = { next : function (value ){ console .log ('B next ' + value); }, error : function (error ){ console .error ('B error ' + error); }, complete : function ( ){ console .log ('B Complete' );} } source.subscribe (subject); subject.subscribe (ObserverA ); console .log ('ObserverA subscribed' ); setTimeout (function ( ){ subject.subscribe (ObserverB ); console .log ('ObserverB subscribed' ); },2000 )
執行結果
ReplaySubject
A ReplaySubject
records multiple values from the Observable execution and replays them to new subscribers.
ReplaySubject有點類似於BehaviorSubject,可以取得subscribe之前的值,只是可以取不只一個。類似回播的功能
ReplaySubject在建立時有幾個參數可以設定,
1 var subject = new Rx .ReplaySubject (bufferSize, windowTime);
bufferSize: ReplaySubject可以儲存 x 數量的值
windowTime: ReplaySubject取最後 x milliseconds 期間的值
程式碼 (with no windowTime參數)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 var source = Rx .Observable .interval (500 ).take (7 ) .do (function (value ){ console .log ('source ' + value);}) var subject = new Rx .ReplaySubject (3 ); var ObserverA = { next : function (value ){ console .log ('A next ' + value); }, error : function (error ){ console .error ('A error ' + error); }, complete : function ( ){ console .log ('A Complete' );} } var ObserverB = { next : function (value ){ console .log ('B next ' + value); }, error : function (error ){ console .error ('B error ' + error); }, complete : function ( ){ console .log ('B Complete' );} } source.subscribe (subject); subject.subscribe (ObserverA ); console .log ('ObserverA subscribed' ); setTimeout (function ( ){ subject.subscribe (ObserverB ); console .log ('ObserverB subscribed' ); },2000 )
執行結果
程式碼 (with windowTime參數)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 var source = Rx .Observable .interval (500 ).take (7 ) .do (function (value ){ console .log ('source ' + value);}) var subject = new Rx .ReplaySubject (3 , 700 ); var ObserverA = { next : function (value ){ console .log ('A next ' + value); }, error : function (error ){ console .error ('A error ' + error); }, complete : function ( ){ console .log ('A Complete' );} } var ObserverB = { next : function (value ){ console .log ('B next ' + value); }, error : function (error ){ console .error ('B error ' + error); }, complete : function ( ){ console .log ('B Complete' );} } source.subscribe (subject); subject.subscribe (ObserverA ); console .log ('ObserverA subscribed' ); setTimeout (function ( ){ subject.subscribe (ObserverB ); console .log ('ObserverB subscribed' ); },2000 )
執行結果
AsyncSubject
AsyncSubject只會記錄 Observable 完成後 的值。
程式碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 var source = Rx.Observable.interval(500).take(7) .do(function(value){ console.log('source ' + value);}) var subject = new Rx.AsyncSubject(); // 0 is the initial value var ObserverA = { next: function(value){ console.log('A next '+ value); }, error: function(error){ console.error('A error '+ error); }, complete: function(){ console.log('A Complete');} } var ObserverB = { next: function(value){ console.log('B next '+ value); }, error: function(error){ console.error('B error '+ error); }, complete: function(){ console.log('B Complete');} } source.subscribe(subject); subject.subscribe(ObserverA); console.log('ObserverA subscribed'); setTimeout(function(){ subject.subscribe(ObserverB); console.log('ObserverB subscribed'); },2000)
執行結果
BehaviorSubject、ReplaySubject、AsyncSubject 與 Publish 的關係
在前一篇 提到說 publish 是 multicast的變化型,而multicast裡可以建立各式的Subject,那publish相對應的又是什麼,對照表如下
Subject Type
Multicasting Operator
Rx.Subject
publish()
Rx.BehaviorSubject
publishBehavior(initValue)
Rx.ReplaySubject
publishReplay(bufferSize, windowTime)
Rx.AsyncSubject
publishLast()
參考網址