Node.js + Redis Sorted Set 任务队列

需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" },这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。

根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。

在设计任务队列的过程中需要考虑到的几个问题

  1. 并行执行多个任务
  2. 任务唯一性
  3. 任务成功或失败后的处理

针对以上问题的解决方案

  1. 并行执行多个任务利用 Promise.all 来实现
  2. 任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。
  3. 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// remote_api.js 模拟第三方 API
'use strict';

const app = require('express')();

app.get('/', (req, res) => {
setTimeout(() => {
let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求
res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
}, 3000);
});

app.listen('9001', () => {
console.log('API 服务监听端口:9001');
});

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列
'use strict';

const app = require('express')();
const redisClient = require('redis').createClient();

const QUEUE_NAME = 'queue:example';

function addTaskToQueue(taskName, callback) {
// 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列
redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
if (error) {
console.log(error);
} else {
if (task) {
console.log('任务已存在,不新增相同任务');
callback(null, task);
} else {
redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
});
}
}
});
}

app.get('/', (req, res) => {
let taskName = req.query['task-name'];
addTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
res.status(200).send('正在查询中......');
}
});
});

app.listen(9002, () => {
console.log('生产者服务监听端口:9002');
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// consumer.js 定时获取任务并执行
'use strict';

const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');

const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // 并行执行任务数量

function getTasksFromQueue(callback) {
// 获取多个任务
redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
if (error) {
callback(error);
} else {
// 将任务分值设置为 0,表示正在处理
if (tasks.length > 0) {
let tmp = [];
tasks.forEach((task) => {
tmp.push(0);
tmp.push(task);
});
redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
if (error) {
callback(error);
} else {
callback(null, tasks)
}
});
}
}
});
}

function addFailedTaskToQueue(taskName, callback) {
redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
});
}

function removeSucceedTaskFromQueue(taskName, callback) {
redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
})
}

function execTask(taskName) {
return new Promise((resolve, reject) => {
let requestOptions = {
'url': 'http://127.0.0.1:9001',
'method': 'GET',
'timeout': 5000
};
request(requestOptions, (error, response, body) => {
if (error) {
resolve('failed');
console.log(error);
addFailedTaskToQueue(taskName, (error) => {
if (error) {
console.log(error);
} else {

}
});
} else {
try {
body = typeof body !== 'object' ? JSON.parse(body) : body;
} catch (error) {
resolve('failed');
console.log(error);
addFailedTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {

}
});
return;
}
if (body.status !== 200) {
resolve('failed');
addFailedTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {

}
});
} else {
resolve('succeed');
removeSucceedTaskFromQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {

}
});
}
}
});
});
}

// 定时,每隔 5 秒获取新的任务来执行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
console.log('获取新任务');
getTasksFromQueue((error, tasks) => {
if (error) {
console.log(error);
} else {
if (tasks.length > 0) {
console.log(tasks);

Promise.all(tasks.map(execTask))
.then((results) => {
console.log(results);
})
.catch((error) => {
console.log(error);
});

}
}
});
});
Node.js + Redis Sorted Set 延时任务池 Systemd Service 中 一个`-`的困惑
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×