需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" }
,这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。
根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。
在设计任务队列的过程中需要考虑到的几个问题
- 并行执行多个任务
- 任务唯一性
- 任务成功或失败后的处理
针对以上问题的解决方案
- 并行执行多个任务利用 Promise.all 来实现
- 任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。
- 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部
示例代码
// remote_api.js 模拟第三方 API
'use strict';
const app = require('express')();
app.get('/', (req, res) => {
setTimeout(() => {
let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求
res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
}, 3000);
});
app.listen('9001', () => {
console.log('API 服务监听端口:9001');
});
// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列
'use strict';
const app = require('express')();
const redisClient = require('redis').createClient();
const QUEUE_NAME = 'queue:example';
function addTaskToQueue(taskName, callback) {
// 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列
redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
if (error) {
callback(error);
} else {
if (task) {
console.log('任务已存在,不新增相同任务');
callback(null, task);
} else {
redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
});
}
}
});
}
app.get('/', (req, res) => {
let taskName = req.query['task-name'];
addTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
res.status(200).send('正在查询中......');
}
});
});
app.listen(9002, () => {
console.log('生产者服务监听端口:9002');
});
// consumer.js 定时获取任务并执行
'use strict';
const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');
const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // 并行执行任务数量
function getTasksFromQueue(callback) {
// 获取多个任务
redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
if (error) {
callback(error);
} else {
// 将任务分值设置为 0,表示正在处理
if (tasks.length > 0) {
let tmp = [];
tasks.forEach((task) => {
tmp.push(0);
tmp.push(task);
});
redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
if (error) {
callback(error);
} else {
callback(null, tasks)
}
});
}
}
});
}
function addFailedTaskToQueue(taskName, callback) {
redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
});
}
function removeSucceedTaskFromQueue(taskName, callback) {
redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
})
}
function execTask(taskName) {
return new Promise((resolve, reject) => {
let requestOptions = {
'url': 'http://127.0.0.1:9001',
'method': 'GET',
'timeout': 5000
};
request(requestOptions, (error, response, body) => {
if (error) {
resolve('failed');
console.log(error);
addFailedTaskToQueue(taskName, (error) => {
if (error) {
console.log(error);
} else {
}
});
} else {
try {
body = typeof body !== 'object' ? JSON.parse(body) : body;
} catch (error) {
resolve('failed');
console.log(error);
addFailedTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
}
});
return;
}
if (body.status !== 200) {
resolve('failed');
addFailedTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
}
});
} else {
resolve('succeed');
removeSucceedTaskFromQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
}
});
}
}
});
});
}
// 定时,每隔 5 秒获取新的任务来执行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
console.log('获取新任务');
getTasksFromQueue((error, tasks) => {
if (error) {
console.log(error);
} else {
if (tasks.length > 0) {
console.log(tasks);
Promise.all(tasks.map(execTask))
.then((results) => {
console.log(results);
})
.catch((error) => {
console.log(error);
});
}
}
});
});
// 更新: 咦,我楼上的呢,怎么没了。。。。原帖如下:
不同意楼上,写 Node 为什么不能缩进 4 格,再就是 if else 怎么了。。很清晰啊。。
谢谢楼主
一般简单的用 list RPUSH + BLPOP 就好啦
@magicdawn 是的,一般情况下可以这样的,我这边的需求是,取出任务时不能移除任务,而是要将任务设置为正在执行状态,避免同一任务执行很多次。如果使用 List,当任务取出时,有相同的新任务进来又会加入队列,又重新执行了一次。我也没有想到什么更好的解决方式,就使用了 Sorted Set 实现了。
@HugoJing 这几天忙着写代码,都没来看,好像错过了什么。
@magicdawn list rpush 这个方案。两个问题,1 是重复任务会被添加 2 是当 node.js 抛错之后,取出的任务在数据库就直接消失了,很不严谨的做法。 @DuanPengfei https://zhuanlan.zhihu.com/p/20293493 《一个简单的 mysql 队列问题》 之前类似的问题我是这么做的。不过这并不是一种推荐的做法,只是我当时是这么做的。
你提的
1. 并行执行多个任务
2. 任务唯一性
3. 任务成功或失败后的处理
三点,我觉得最重要的就是第 2 点。怎么让任务不被 worker 重复拿到。以及还要加上第 4 点,按已被执行的时间排序从而雨露均沾。 第 4 点你是通过 score 做到的。
@DuanPengfei 话说你的方案也存在的问题是:当 node.js 抛错之后,取出的任务在数据库就直接消失了。因为它的 score 为 0 之后,就没人帮它恢复了
??
关于上述提到的执行时间排序的问题,1 是要每次cron运行后,更新任务的时间值 2 是需要恢复过期太久的任务
用 redis 的 score 的话,第2点不容易做到吧?
你现在的架构之上,有两点需要改进地方:
- 保证任务被唯一的 worker 拿到
- worker 挂逼之后,任务不会被一直搁置。
首先,你需要引入一个 touch time 的概念,这个概念就是说,一个任务被 touch 了,那么别的任务需要避开他。任务每次被 touch 的时间,都通过时间来记录一下。
解决 1 问题,引入 redis 的事务机制貌似也不行,我指的 multi exec 之类的。现在你的做法,zrangebyscore 和 zadd 之间有时间差,没法保证两个操作组合起来的原子性。还是可能多个worker拿到同一批任务。而如果用了 multi 的话,你没法知道自己需要 zadd 的对象是哪些。 解决 2 问题,开始引入 touch time 的概念。每个 worker 拿到任务之后,对任务的 score 设成当前。而取任务时,设置一个
zrangebyscore([QUEUE_NAME, 1, new Date().getTime() - 5 * 1000
这样的时间差。
要保证【保证任务被唯一的 worker 拿到】,就得使得 任务获得+任务touchtime更新 这两个操作被原子性的完成。抽象来说,更新+获取 需要原子性完成。 这一点 redis 应该是做不到了,mysql 能做到。
再回头看这个问题,如果我来做。无论量大量小,我还是觉得我文章里面的方式更好。。。。
redis 除非一个 worker 对应一个QUEUE_NAME,否则做不到原子性。
但 mysql 当量大的时候,也存在竞争的问题,不过我的方案已经是行级锁了,每秒 1w qps 轻松能上去。如果高于这个 qps 的话,获取我也要分成不同的 mysql 表或库,也就是进行水平拆分。
@magicdawn 如果可以容忍数据丢失的话,感觉 list 这个方案还行
关注
来自酷炫的 CNodeMD
既然用了redis, 为什么不用sub和pub
@alsotang 一针见血,这两个问题当时在设计时就是存在的,后来存储方案没有改变,只是不再使用 crontab 方式运行,而是守护进程采取批处理的方式,取出一批过期的处理完了再取一批。我要去看看你的方案,努力学习一下,在这方面没有什么经验,第一次做。
@richenlin sub 和 pub 的方式有个问题就是断开连接后再连接后,断开那一时刻的任务都处理不到了,像 alsotang 说的,我这样处理失败了也会变成僵尸任务,不过我们在每次重启后都会把所有 score 为 0 的处理一遍。
我现在都是用mysql做事务,用行锁保证唯一,不然后面重复执行了会有很多问题
恩恩,MySQL 是比较好的方案,毕竟有原子性操作,估计我这边也会迁到 MySQL 上去
@richenlin sub / pub 连 redis 中的持久化都没有…比list更糟
@magicdawn pub /sub 是跨系统跨平台业务分离的异步事件的通知机制,本身并不能作为队列,需要配合 list或者set 来实现持久化,且能够自动发现和广播。 可以自行实现两个队列,分别是生产/消费,以及失败通知队列
@DuanPengfei 对了,用 redis 会不会还存在持久化的问题。。。
@alsotang 单纯从 redis 上讲会存在持久化问题,但是我们公司的 redis 是怎么做的我也不是很清楚,貌似是不用担心 redis 挂掉的问题,而且我这个对数据一致和安全要求不太高的情况还能用,后续我应该会迁移到 MySQL 上,会再仔细研究和参考资料尽量完善一下