简析redux技术栈(二):认识saga的buffer和chanel
flytam Lv4


我们知道redux-saga 也是通过中间件的形式与 redux 本身连接起来。例如下面使用了redux-saga的react项目需要以下这样的 初始化

1
2
3
4
5
6
7
8
9
10
11
12
function configureStore(initialState) {
// 运行返回一个redux middleware
const sagaMiddleware = createSagaMiddleware();
return {
...createStore(
reducer,
initialState,
applyMiddleware(middleware1, middleware2, sagaMiddleware)
),
runSaga: sagaMiddleware.run
};
}

所以分析 redux-saga 的第一步,就从 redux-saga 的中间件开始。我们平时写代码在 react 中与 saga 进行交互,都是dispatch一个action到与我们的 saga 逻辑进行交互。翻看createSagaMiddleware源码,可以很清晰的看到,这就是使用了中间件后,我们每次dispatch一个 action 后,在 saga 中间件内会往channelput这个action,进而触发我们 saga 里面的逻辑。就实现了 react 组件和 saga 的交互了。那么这个stdChannel是什么呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 省略一些多余部分
function sagaMiddlewareFactory({ channel = stdChannel() } = {}) {
//...
function sagaMiddleware({ getState, dispatch }) {
return next => action => {
const result = next(action);
// 实现了react和saga的交互
channel.put(action);
return result;
};
}
//...
return sagaMiddleware;
}

在了解 saga 的运行机制之前,先学习 redux-saga 源码内部的两个比较常用的数据结构bufferchanel

buffer

buffer 是一个固定长度类似队列的数据结构,它有四种类型(下面介绍),对外暴露了几个函数,如下

  • put 用来缓存 action
  • take 取出一个 action
  • isEmpty 判断 buffer 是否为空
  • flush 取出缓存的内的所有 action

我们知道如果我们直接使用数组的 push/unshift(pop/shift)函数实现队列的话,当我们出队列的时候时间复杂度是o(n)。而这里的 buffer 实现是比较巧妙的。数据存储是使用定长数组。通过pushIndexpopIndex标识位来记录出入队列的位置,它们的初始值都是 0,出队列的时候直接把popIndex位置空,然后值+1。入队列则是pushIndex+1。这样,无论take还是put,时间复杂度都是o(1)

pushIndex达到了 buffer 的长度的时候,buffer 的处理会根据 buffer 类型不同进行处理

1、ON_OVERFLOW_THROW:超出限制直接报错

2、ON_OVERFLOW_SLIDE:类似于环状队列,达到长度限制后,从索引 0 继续存储。

3、ON_OVERFLOW_EXPAND:达到限制后,长度自动变大 2 倍。

4、ON_OVERFLOW_DROP:达到限制后,后续的都丢弃

chanel

chanel 的实现是类似发布/订阅的设计模式。chanel.take(taker)存入一个 taker 函数,chanel.put(action)时,取出 cb 函数执行,action 是用来消费 taker 的

  • 普通 chanel(单播)

特点:当put一个 action 时,如果没有taker的时候,会将这个 action 存起来,存 action 是用了上面提到的buffer这个数据结构。等到有 taker 的时候可以马上调用 action。

一个简化版的单播 chanel 实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Chanel {
constructor() {
// 存action
this.buffers = [];
// 存taker
this.takers = [];
this.isClosed = false;
}
take(cb) {
if (this.isClosed) {
return;
}
if (this.buffers.length > 0) {
cb(this.buffers.shift());
} else {
this.takers.push(cb);
}
}
put(action) {
if (this.takers.length === 0) {
this.buffers.push(action);
} else {
this.takers.shift()(action);
}
}
close() {
if (this.isClosed) {
return;
}
}
}

eventChanel 是在普通 Chanel 基础上实现,是用来用于订阅外部的事件源。chanel的一些使用参考可以看文档

简化的 eventChanel 实现如下,其实给订阅函数传进一个函数,调用这个函数可以往 Chanel 内 put 东西。

1
2
3
4
5
6
7
8
9
10
11
12
13
class eventChanel extends Chanel {
constructor(subscribe) {
super();
this.unscribe = subscribe(action => {
super.put(action);
});
}

close() {
this.unscribe();
this.isClosed = true;
}
}
  • 多播(multiCast) chanel

从上面的中间件源码可以看到,redux-saga 默认情况下的ChanelstdChannelstdChannel就是基于多播 chanel (multiCastChanel)实现,只不过添加了redux-saga本身的调度系统。multiCastChanel和 nodejs 的eventEmiter是非常类似的,multiCastChaneltake类似于eventEmiteronce,multiCastChanelput类似于eventEmiteremit

通俗的理解,saga 内 multiCastChanel 和 Chanel 最大的区别是,multiCastChanel 不能存 action,只能存 taker,能根据 action 的 type 判断是否执行 taker;chanel 可以缓存 action 和 taker,接收到 action 马上触发 taker,不会判断 type,类似于两个人对话的样子(单播)

一个简化版的 multiCastChanel 实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Chanel {
constructor() {
this.isClosed = false;
this.takers = [];
}
put(action) {
if (this.isClosed) {
return;
}
const takers = this.takers;
for (let i = 0, len = takers.length; i < len; i++) {
if (!takers[i].MATCH || action.type === takers[i].MATCH) {
takers[i](action);
takers.splice(takers.indexOf(takers[i]), 1);
}
}
}
take(cb, match) {
cb["MATCH"] = match;
this.takers.push(cb);
}
close() {
this.isClosed = true;
}
}

源码中的 stdChanel 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
export function stdChannel() {
const chan = multicastChannel();
const { put } = chan;
chan.put = input => {
// saga的action,不进入调度状态
if (input[SAGA_ACTION]) {
put(input);
return;
}
asap(() => {
put(input);
});
};
return chan;
}

上面代码中的multicastChannel和我们的简化版 chanel 原理是一样的。我们可以看到,stdChanel是对multicastChannelput方法进行了重写。只是对于非 saga 内置action使用asap(() => { put(input); });进行调用,这个asap方法其实是 saga 内部调度系统的一个执行函数,它的作用是如果当前 saga 是空闲状态,则执行我们的回调;如果是挂起状态则将回调存进任务队列中。后面会专门介绍 saga 的调度系统。

  • Post title:简析redux技术栈(二):认识saga的buffer和chanel
  • Post author:flytam
  • Create time:2019-08-18 21:59:27
  • Post link:https://blog.flytam.vip/简析redux技术栈(二):认识saga的buffer和chanel.html
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.