Rx 学习笔记(2)

Author Avatar
Splendour 4月 19, 2017

前言

在前文,我们通过 Observable 的相关方法创建了 Observable,并且用 subscribe 方法实现了订阅。然而我们发现,每次 subscribe ,都会执行 create 方法,产生一个新的 observer,只有当该 observer 调用 next 方法 emit 了一个值之后,该 subscriber 才会触发接收到事件。也即不同的 subscriber 是不能共享 observer 的值的。

var observer$$;
var customStream = Rx.Observable.create(function(observer) {
  observer$$ = observer;
});
var sub1 = customStream.subscribe(function(value) {
  console.log('1 ', value);
});
observer$$.next(1);
setTimeout(function() {
  var sub2 = customStream.subscribe(function(value) {
    console.log('2 ', value);
  });
  observer$$.next(2);
}, 1000);
setTimeout(function() {
  observer$$.next(3);
}, 2000);

// console
1 1
2 2
2 3

当 customerStream 被 subscribe 多次之后,每一个被订阅的 observer 都拥有单独的一份数据,在某些场景下是很有意义的。然而,在一个 SPA 应用中,有时候我们需要实现数据的共享,即利用 store 存储数据供不同的组件同时使用,此时就需要使用另一个很有用的类——Subject

Subject

A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.

从文档的描述很容易理解,Subject 相对于 Observable 来说,提供了广播数据的功能,相当于共用一份 observer,接收来自多方的订阅。

  • Every Subject is an Observable. 每一个 subject 都可以被订阅并提供一个 observer,只不过这个 observer 不会拥有一份单独的数据,而是被加入到了一个 observer 列表里面去,共同享有同一份数据。
  • Every Subject is an Observer. 每一个 sbuject 都有 next、error、complete 方法,调用后将会广播数据到每一个 observer
var subject = new Rx.Subject();
var sub1 = subject.subscribe(function(value) {
  console.log('1 ', value);
});
var sub2 = subject.subscribe(function(value) {
  console.log('2 ', value);
});
subject.next(1);
setTimeout(function() {
  subject.next(2);
}, 1000);

// console
1 1
2 1
1 2
2 2

几种类型的 Subject

BehaviorSubject

  • BehaviorSubject 是一种特殊的 Subject,会存储它 emit 的最后一个值,并且在有新的 subscriber 时,能够获取到这个值
  • 我们可以在构造函数中给它设置初始值
  • BehaviorSubject 通常用来作为页面中的 store,存储可共享的数据
var subject = new Rx.BehaviorSubject(0);
var sub1 = subject.subscribe(function(value) {
  console.log('1 ', value);
});
subject.next(1);
subject.next(2);
console.log(subject.getValue());
var sub2 = subject.subscribe(function(value) {
  console.log('2 ', value);
});
subject.next(3);

// console
1 0
1 1
1 2
2
2 2
1 3
2 3

可以看到,我们给 subject 设置了初始值,在 sub1 subscribe 时,获取到了初始值;在 sub2 subscribe 时,马上获得之前的最后一个值;之后 subject emit 值的时候,sub1 sub2 均能获取到该值。另外,BehaviorSubject 提供了 getValue 方法,可以方便地获取到当前的值,非常实用。

ReplaySubject

  • ReplySubject 和 Behavior 有点类似,也具有存储功能
  • ReplySubject 可以存储指定个数的最新数据
  • 当 ReplySubject 有了新的 subscriber,可以获取到 replay 的几个值
var subject = new Rx.ReplaySubject(3);
var sub1 = subject.subscribe(function(value) {
  console.log('1 ', value);
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
var sub2 = subject.subscribe(function(value) {
  console.log('2 ', value);
});
subject.next(5)

// console
1 1
1 2
1 3
1 4
2 2
2 3
2 4
1 5
2 5

可以看到,我们给 subject 设置了 replay 次数 为3,则在执行了 4 次 next 之后,存储了最新的三次,并且在 sub2 subscirbe 时,获取到了这三个数据。往后的 emit 按照 subject 的规则继续进行下去。
另外,ReplaySubject 也接受第二个参数,是一个时间间隔,表示会存储最近的 N 毫秒之内的值,当然最大 replay 次数也是由第一个参数决定的。

AsyncSubject

  • AsyncSubject 只会将最后一个值给到 subscriber
  • AsyncSubject 只有在收到 complete 之后才会将值给出
var subject = new Rx.AsyncSubject();
var sub1 = subject.subscribe(function(value) {
  console.log('1 ', value);
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
var sub2 = subject.subscribe(function(value) {
  console.log('2 ', value);
});
subject.next(5);
subject.complete();

// console
1 5
2 5

AsyncSubject 有点像 Observable 的 last 方法,但区别是只会在收到 complete 之后才会将最终值给到

总结

Subject 是 Rxjs 中非常重要,而且也是非常常用的一种类型。在构建 Web 应用时,我们通常会隔离出 data 层和 view 层,那么 data 层的数据是很有可能被 view 层复用的,这个时候就需要 subject 来作为数据流,不同的 view subscribe 这个流来共享数据。另外,BehaviorSubject 这个类,可以用来作为 store,存储数据,并且随时获取到最新的值,非常实用。