模仿koajs的middware设计思想实现异步任务链式处理

最近在接触koajs,发现它的middware设计非常精巧。 今天模仿它实现一下异步任务链式处理。

koajs使用举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const Koa = require('koa');
const app = new Koa();

// x-response-time
app.use(async (ctx, next) => {
//前置逻辑!!!
const start = Date.now();

//等待后续middware全部处理完毕!!!
await next();

//后置逻辑!!!
const ms = Date.now() - start;
ctx.set('X-Response-Time', `${ms}ms`);
});

// logger
app.use(async (ctx, next) => {
const start = Date.now();
await next();
const ms = Date.now() - start;
console.log(`${ctx.method} ${ctx.url} - ${ms}`);
});

// response
app.use(async ctx => {
ctx.body = 'Hello World';
});

app.listen(3000);

以上代码三个app.use的使用,添加了3个middware,这三个middware都是async函数,因为async函数的特性,这些middware最终在执行时会表现出与同步行为相似的效果。 每个middware内部,在执行await next()之前,都是它的前置逻辑,执行await next()会让当前middware像同步函数一样被阻塞住,只有后续的middwares全部处理完毕,它的后置逻辑才得以执行。

同步实现

在学习vue-router源码时,它里面有一个类似的任务链设计来处理那些hooks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function runSequence(tasks, fn, cb) {
function run(index) {
const task = tasks[index]
if (task) {
fn(task, function runNext() {
run(index + 1)
})
} else if (index >= tasks.length) {
cb && cb()
} else {
run(index + 1)
}
}

run(0)
}

这个方式的使用举例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
let hooks = [
function (ctx, next) {
console.log(1)
ctx.from = 1
next()
console.log(2)
},
function (ctx, next) {
console.log(3)
ctx.from = ctx.from + ' 2'
next()
console.log(4)
},
function (ctx, next) {
console.log(5)
ctx.from = ctx.from + ' 3'
next()
console.log(6)
}
]

let ctx = {}
runSequence(hooks, function runSequenceFn(hook, next) {
hook(ctx, function hookNext(res) {
if (res === false) {
return
}

next()
})
})

// 1
// 3
// 5
// 6
// 4
// 2

console.log(ctx) // {from: '1 2 3'}

这个实现思想里面,有两层next的处理。第一层在runSequence的内部,有一个runNext,它会在作为runSequenceFn的第二个参数传入,第二层是hookNext,在调用hook时,作为它的第二个参数传入。 以上使用举例中,在hooks数组内添加了三个task,从打印可以看到这三个task的前置、后置逻辑在真正执行时的先后顺序:1 3 5 6 4 2

这个实现方式有一个缺陷,就是各个task中必须调用next,否则下一个task就得不到执行,可以稍微改善一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
let hooks = [
function (ctx, next) {
console.log(1)
ctx.from = 1
next()
console.log(2)
},
// 这个hook内部没有了next调用
function (ctx, next) {
console.log(3)
ctx.from = ctx.from + ' 2'
console.log(4)
},
function (ctx, next) {
console.log(5)
ctx.from = ctx.from + ' 3'
next()
console.log(6)
}
]

let ctx = {}
runSequence(hooks, function runSequenceFn(hook, next) {
let nextCalled = false
hook(ctx, function hookNext(res) {
if (res === false) {
return
}

nextCalled = true
next()
})

if(!nextCalled) {
next()
}
})

// 1
// 3
// 4
// 5
// 6
// 2

console.log(ctx) // {from: '1 2 3'}

不过因为第二个task没有next调用的原因,所以console.log(3)console.log(4)现在都是前置逻辑,而原来console.log(4)属于后置逻辑。

实现async任务链

将同步改为异步,只需要将原来的同步函数,全部都改为async函数即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async function runSequence(tasks, fn) {
async function run(index) {
const task = tasks[index]
if (task) {
await fn(task, async function runNext() {
await run(index + 1)
})
} else if (index >= tasks.length) {
return
} else {
await run(index + 1)
}
}

await run(0)
}

const long = () => new Promise(resolve => setTimeout(resolve, 1000))
const now = Date.now()

let hooks = [
async function (ctx, next) {
console.log('===>1', Date.now() - now)
await long()
ctx.from = 1
await next()
await long()
console.log('1<===', Date.now() - now)
},
async function (ctx, next) {
console.log('===>2', Date.now() - now)
await long()
ctx.from = ctx.from + ' 2'
await next()
await long()
console.log('2<===', Date.now() - now)
},
async function (ctx, next) {
console.log('===>3', Date.now() - now)
await long()
ctx.from = ctx.from + ' 3'
await next()
await long()
console.log('3<===', Date.now() - now)
}
]

let ctx = {}
runSequence(hooks, async function runSequenceFn(hook, next) {
let nextCalled = false
await hook(ctx, async function hookNext(res) {
nextCalled = true
await next()
})

if (!nextCalled) {
await next()
}
}).then(() => {
// ===>1 1
// ===>2 1002
// ===>3 2004
// 3<=== 4014
// 2<=== 5020
// 1<=== 6025
console.log(ctx) // {from: '1 2 3'}
})

async函数是异步处理用同步方式进行表达的设计,所以以上代码,按照同步逻辑来理解即可。

koa源码

koajs中这个处理时利用koa-compose来处理的,这是一个非常简单的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
function compose(middleware) {
if (!Array.isArray(middleware)) throw new TypeError('Middleware stack must be an array!')

return function (context, next) {
// last called middleware #
let index = -1
return dispatch(0)
function dispatch(i) {
if (i <= index) return Promise.reject(new Error('next() called multiple times'))
index = i
let fn = middleware[i]
if (i === middleware.length) fn = next
if (!fn) return Promise.resolve()
try {
return Promise.resolve(fn(context, dispatch.bind(null, i + 1)))
} catch (err) {
return Promise.reject(err)
}
}
}
}

基于这个compose函数,也能实现async的异步任务链:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const long = () => new Promise(resolve => setTimeout(resolve, 1000))
const now = Date.now()

let hooks = [
async function (ctx, next) {
console.log('===>1', Date.now() - now)
await long()
ctx.from = 1
await next()
await long()
console.log('1<===', Date.now() - now)
},
async function (ctx, next) {
console.log('===>2', Date.now() - now)
await long()
ctx.from = ctx.from + ' 2'
await next()
await long()
console.log('2<===', Date.now() - now)
},
async function (ctx, next) {
console.log('===>3', Date.now() - now)
await long()
ctx.from = ctx.from + ' 3'
await next()
await long()
console.log('3<===', Date.now() - now)
}
]

let ctx = {}
let runSequence = compose(hooks)
runSequence(ctx).then(() => {
// ===>1 1
// ===>2 1002
// ===>3 2004
// 3<=== 4014
// 2<=== 5020
// 1<=== 6025
console.log(ctx) // {from: '1 2 3'}
})