Rx 学习笔记(1)

Author Avatar
Splendour 3月 20, 2017

Reactive Programming

Reactive Programming 其实就是处理异步数据流

  • 在 Reactive 的世界里,所有的东西都可以当做一个 stream,比如变量,用户的输入,属性,缓存,数据结构等等

A stream is a sequence of ongoing events ordered in time

  • stream 是一序列按时间排序的正在发生的事件,各个事件会在发生时 emit 出来,比如一个 ajax 请求,在返回时会 emit 一个 value 或者 error

  • 我们可以使用观察者模式,采用监听的方法捕获已经 emit 的事件,通过回调函数的方法来执行响应

监听 stream 也就是所谓的 subscribing ;回调函数就是所谓的 observers ;而 stream 也就是所谓的 subject (observable)

初探 Rxjs

从 npm 上下载的 rxjs 版本是 5.2.0,以下代码均基于此版本

简单变量

Rx.Observable.of 方法接收一个简单变量,构建一个 stream,emit 一个值,我们需要订阅它,回调函数就是它 emit 的那个值,从而进行其他操作

var Rx = require('rxjs/Rx');
var singleValue = Rx.Observable.of('I am single value');
console.log(singleValue);
// ScalarObservable { _isScalar: true, value: 'I am single value', scheduler: null }

singleValue.subscribe(function(value) {
  console.log(value);
  // I am single value
});

看上去好像比原来直接定义一个值复杂了,但这也是 rx 的思路,将所有东西都变成一个 stream,提供很多有趣的便捷的操作函数,我们继续往下一探究竟

自定义 stream

Rx.Observable.create 方法接收一个参数,该参数为一个接收一个 Observer(观察者) 对象的方法,该方法中,可以用 next 来实现 emit,用 error 抛出错误,用 complete 结束 stream

var customStream = Rx.Observable.create(function(observer) {
  observer.next('I am a custom stream');
  observer.complete();
});
customStream.subscribe(function(value) {
  console.log(value);
  // I am a custom stream
}, function(err) {
  console.log(err);
  // nothing
}, function() {
  console.log('Complete');
  // Complete
});

如果我们在中间抛出了错误

var customStream = Rx.Observable.create(function(observable) {
  observable.next('I am a custom stream');
  observable.error('I am an error');
  observable.complete();
});
customStream.subscribe(function(value) {
  console.log(value);
  // I am a custom stream
}, function(err) {
  console.log(err);
  // I am an error
}, function() {
  console.log('Complete');
  // nothing
});

可以总结:只要有 next 就会 emit value,只要有 error 就会抛出错误,并且中断该 stream;只要没有 error,则会继续执行直到遇到 complete

Promise 转化为 stream

我们先定义一个简单的 Promise

var simplePromise = new Promise(function(resolve, reject) {
  setTimeout(function() {
    resolve('I am simple promise');
  }, 1000);
});

Rx.Observable.fromPromise 方法接收一个 Promise,转化为 stream,resolve 相当于 emit,reject 相当于 error,resolve 之后自动执行 complete

streamFromPromise.subscribe(function(value) {
  console.log(value);
  // I am simple promise
}, function(err) {
  console.log(err);
}, function() {
  console.log('Complete');
  // Complete
});

map

stream 提供了 map 方法,能将每一个 emit 的值进行转换

var singleValue = Rx.Observable.of('I am single value');
singleValue
  .map(function(value) {
    return value + ' and map';
  })
  .subscribe(function(value) {
    console.log(value);
    // I am single value and map
  });

stream 的流式操作和转换函数有很多,而且可以链式调用,特别方便。如果是需要根据 value 创建一个新的 stream,而我们只想要在 subscribe 时获取到新的 stream emit 的值,则可以用 flatMap 方法

function promiseWithParams(value) {
  return new Promise(function(resolve, reject) {
    setTimeout(function() {
      resolve(value);
    }, 2000);
  });
}

singleValue
  .flatMap(function(value) {
    return Rx.Observable.fromPromise(promiseWithParams(value));
  })
  .subscribe(function(value) {
    console.log(value);
    // I am single value
  });

merge

merge 方法能将两个 stream 进行合并

Rx.Observable.merge(singleValue, streamFromPromise)
  .subscribe(function(value) {
    console.log(value);
    // I am single value
    // I am simple promise
  });

即两个 stream,只要有 emit 值,合并后的 stream 就会触发 emit

还有另的写法,结果是一样的

singleValue.merge(streamFromPromise)
  .subscribe(function(value) {
    console.log(value);
  });

我们可以看出,这个很像是给 streamFromPromise 赋了一个初始值,在执行异步操作后又给了返回值,Rx 提供了一个方法来给 stream 赋初始值

streamFromPromise.startWith('initial value')
  .subscribe(function(value) {
    console.log(value);
    // initial value
    // I am simple promise
  });

总结

以上就是对 Rxjs 的初步学习,主要是理解了其思路,将所有的东西都化作 stream 去操作,可以有很多便捷的方法来组织和聚合数据,最后绑定到 view 上,实现对界面展示的控制。