Rx 学习笔记(1)
Reactive Programming
Reactive Programming 其实就是处理异步数据流
- 在 Reactive 的世界里,所有的东西都可以当做一个 stream,比如变量,用户的输入,属性,缓存,数据结构等等
A stream is a sequence of ongoing events ordered in time
stream 是一序列按时间排序的正在发生的事件,各个事件会在发生时 emit 出来,比如一个 ajax 请求,在返回时会 emit 一个 value 或者 error
我们可以使用观察者模式,采用监听的方法捕获已经 emit 的事件,通过回调函数的方法来执行响应
监听 stream 也就是所谓的 subscribing ;回调函数就是所谓的 observers ;而 stream 也就是所谓的 subject (observable)
初探 Rxjs
从 npm 上下载的 rxjs 版本是 5.2.0,以下代码均基于此版本
简单变量
Rx.Observable.of
方法接收一个简单变量,构建一个 stream,emit 一个值,我们需要订阅它,回调函数就是它 emit 的那个值,从而进行其他操作
var Rx = require('rxjs/Rx');
var singleValue = Rx.Observable.of('I am single value');
console.log(singleValue);
// ScalarObservable { _isScalar: true, value: 'I am single value', scheduler: null }
singleValue.subscribe(function(value) {
console.log(value);
// I am single value
});
看上去好像比原来直接定义一个值复杂了,但这也是 rx 的思路,将所有东西都变成一个 stream,提供很多有趣的便捷的操作函数,我们继续往下一探究竟
自定义 stream
Rx.Observable.create
方法接收一个参数,该参数为一个接收一个 Observer(观察者) 对象的方法,该方法中,可以用 next 来实现 emit,用 error 抛出错误,用 complete 结束 stream
var customStream = Rx.Observable.create(function(observer) {
observer.next('I am a custom stream');
observer.complete();
});
customStream.subscribe(function(value) {
console.log(value);
// I am a custom stream
}, function(err) {
console.log(err);
// nothing
}, function() {
console.log('Complete');
// Complete
});
如果我们在中间抛出了错误
var customStream = Rx.Observable.create(function(observable) {
observable.next('I am a custom stream');
observable.error('I am an error');
observable.complete();
});
customStream.subscribe(function(value) {
console.log(value);
// I am a custom stream
}, function(err) {
console.log(err);
// I am an error
}, function() {
console.log('Complete');
// nothing
});
可以总结:只要有 next 就会 emit value,只要有 error 就会抛出错误,并且中断该 stream;只要没有 error,则会继续执行直到遇到 complete
Promise 转化为 stream
我们先定义一个简单的 Promise
var simplePromise = new Promise(function(resolve, reject) {
setTimeout(function() {
resolve('I am simple promise');
}, 1000);
});
Rx.Observable.fromPromise
方法接收一个 Promise,转化为 stream,resolve 相当于 emit,reject 相当于 error,resolve 之后自动执行 complete
streamFromPromise.subscribe(function(value) {
console.log(value);
// I am simple promise
}, function(err) {
console.log(err);
}, function() {
console.log('Complete');
// Complete
});
map
stream 提供了 map
方法,能将每一个 emit 的值进行转换
var singleValue = Rx.Observable.of('I am single value');
singleValue
.map(function(value) {
return value + ' and map';
})
.subscribe(function(value) {
console.log(value);
// I am single value and map
});
stream 的流式操作和转换函数有很多,而且可以链式调用,特别方便。如果是需要根据 value 创建一个新的 stream,而我们只想要在 subscribe 时获取到新的 stream emit 的值,则可以用 flatMap
方法
function promiseWithParams(value) {
return new Promise(function(resolve, reject) {
setTimeout(function() {
resolve(value);
}, 2000);
});
}
singleValue
.flatMap(function(value) {
return Rx.Observable.fromPromise(promiseWithParams(value));
})
.subscribe(function(value) {
console.log(value);
// I am single value
});
merge
merge
方法能将两个 stream 进行合并
Rx.Observable.merge(singleValue, streamFromPromise)
.subscribe(function(value) {
console.log(value);
// I am single value
// I am simple promise
});
即两个 stream,只要有 emit 值,合并后的 stream 就会触发 emit
还有另的写法,结果是一样的
singleValue.merge(streamFromPromise)
.subscribe(function(value) {
console.log(value);
});
我们可以看出,这个很像是给 streamFromPromise 赋了一个初始值,在执行异步操作后又给了返回值,Rx 提供了一个方法来给 stream 赋初始值
streamFromPromise.startWith('initial value')
.subscribe(function(value) {
console.log(value);
// initial value
// I am simple promise
});
总结
以上就是对 Rxjs 的初步学习,主要是理解了其思路,将所有的东西都化作 stream 去操作,可以有很多便捷的方法来组织和聚合数据,最后绑定到 view 上,实现对界面展示的控制。