精华 Node.js 中使用 Redis 来实现定时任务
发布于 1个月前 作者 xadillax 770 次浏览 来自 分享

原文链接:http://xcoder.in/2015/06/05/scheduled-task-using-redis/

好久没写博文了,最近在跟随着公司大牛们的脚步秘密研发新产品中。

不过前几天有一个小需求的东西可以提出来写一点点小干货儿跟大家分享分享。米娜桑会的就可以忽略了,反正我也是随便写的;如果觉得本文对你有用的话还请多多支持喵。(●´ω`●)ゞ

本文所说的定时任务或者说计划任务并不是很多人想象中的那样,比如说每天凌晨三点自动运行起来跑一个脚本。这种都已经烂大街了,随便一个 Crontab 就能搞定了。

这里所说的定时任务可以说是计时器任务,比如说用户触发了某个动作,那么从这个点开始过二十四小时我们要对这个动作做点什么。那么如果有 1000 个用户触发了这个动作,就会有 1000 个定时任务。于是这就不是 Cron 范畴里面的内容了。

举个最简单的例子,一个用户推荐了另一个用户,我们定一个二十四小时之后的任务,看看被推荐的用户有没有来注册,如果没注册就给他搞一条短信过去。Σ>―(〃°ω°〃)♡→

最初的设想

一开始我是想把这个计时器做在内存里面直接调用的。

考虑到 Node.js 的定时并不是那么准确(无论是 setTimeout 还是 setInterval),所以本来打算自己维护这个定时器队列。

又考虑到 Node.js 原生对象比较耗内存。之前我用 JSON 对象存了一本字典,约十二万多的词条,原文件大概也就五六兆,用 Node.js 的原生对象一存居然有五六百兆的内存占用——所以打算这个定时器队列用 C++ 来写 addon。

考虑到任何时候插入的任务都有可能在已有的任务之前或者之后,所以本来想用 C++ 来写一个小根堆。每次用户来一个任务的时候就将这个任务插入到堆中。

如果按照上述方法的话,再加上对时间要求掐得也不是那么紧,于是就是一个不断的 process.nextTick() 的过程。

process.nextTick() 当中执行这么一个函数:

  1. 从小根堆中不断获取堆顶的任务并处理,一直处理到堆顶任务的执行时间大于当前时间为止。
  2. 继续 process.nextTick() 来让下一个 tick 执行步骤 1 中的流程。

所以最后就是一边往小根堆插入任务,另一边通过不断 process.nextTick() 消费任务的这么一个过程。

最后,为了考虑到程序重启的时候内存数据会丢失,还应该做一个持久化的事情——在每次插入任务的时候顺便往持久化中间件中插一条副本,比如 MySQL、MongoDB、Redis、Riak 等等任何三方依赖。消费任务的时候顺便把中间件中的这条任务数据给删除。

也就是说中间件中永远存的就是当前尚未完成的任务。每当程序重启的时候都先从中间件中把所有任务读取进来重建一下堆,然后就能继续工作了。

如果当时我没有发现 Redis 的这个妙用的话,上述的流程将会是我实现我们定时任务的流程了。

Redis 妙用

在 Redis 的 2.8.0 版本之后,其推出了一个新的特性——键空间消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成这个定时任务的操作了,不过定时的单位是秒

Publish / Subscribe

Redis 在 2.0.0 之后推出了 Pub / Sub 的指令,大致就是说一边给 Redis 的特定频道发送消息,另一边从 Redis 的特定频道取值——形成了一个简易的消息队列

比如我们可以往 foo 频道推一个消息 bar,那么就可以直接:

PUBLISH foo bar

另一边我们在客户端订阅 foo 频道就能接受到这个消息了。

举个例子,如果在 Node.js 里面使用 ioredis 这个包那么看起来就会像这样:

var Redis = require("ioredis");
var sub = new Redis(/** 连接信息 */);
sub.once("connect", function() {
    // 假设我们需要选择 redis 的 db,因为实际上我们不会去污染默认的 db 0
    sub.select(DB_NUMBER, function(err) {
        if(err) process.exit(4);
        sub.subscribe("foo", function() {
            //... 订阅频道成功
        });
    });
});

// 监听从 `foo` 来的消息
sub.on("message", function(channel, msg) {
    console.log(channel, msg);
});

Redis Keyspace Notifications

在 Redis 里面有一些事件,比如键到期、键被删除等。然后我们可以通过配置一些东西来让 Redis 一旦触发这些事件的时候就往特定的 Channel 推一条消息。

本文所涉及到的需求的话我们所需要关心的事件是 EXPIRE 即过期事件。

大致的流程就是我们给 Redis 的某一个 db 设置过期事件,使其键一旦过期就会往特定频道推消息,我在自己的客户端这边就一直消费这个频道就好了。

以后一来一条定时任务,我们就把这个任务状态压缩成一个键,并且过期时间为距这个任务执行的时间差。那么当键一旦到期,就到了任务该执行的时间,Redis 自然会把过期消息推去,我们的客户端就能接收到了。这样一来就起到了定时任务的作用。

消息类型

当达到一定条件后,有两种类型的这种消息会被触发,用哪个需要自己选了。举个例子,我们删除了在 db 0 中一个叫 foo 的键,那么系统会往两个频道推消息,一个是 del 事件频道推 foo 消息,另一个是 foo 频道推 del 消息,它们小俩口被系统推送的指令分别等价于:

PUBLISH __keyspace@0__:foo del
PUBLISH __keyevent@0__:del foo

其中往 foo 推送 del 的频道名为 __keyspace@0__:foo,即是 "__keyspace@" + DB_NUMBER + "__:" + KEY_NAME;而 del 的频道名为 "__keyevent@" + DB_NUMBER + "__:" + EVENT_NAME

配置

即使你的 Redis 版本达标了,但是 Redis 默认是关闭这个功能的,你需要修改配置文件来打开它,或者直接在 CLI 里面通过指令修改。这里就说说配置文件的修改吧。

如果不想看我在这里罗里吧嗦的,也可以直接去看 Redis 的相关文档

首先打开 Redis 的配置文件,在不同的系统和安装方式下文件位置可能不同,比如通过 brew 安装的 MacOS 下可能是在 /usr/local/etc/redis.conf 下面,通过 apt-get 安装的 Ubuntu 下可能是在 /etc/redis/redis.conf 下,总之找到配置文件。或者自己写一个配置文件,启动的时候指定配置文件地址就好。

然后找到一项叫 notify-keyspace-events 的地方,如果找不到则自行添加,其值可以是 ExKlg 等等。这些字母的具体含义如下所示:

  • K,表示 keyspace 事件,有这个字母表示会往 __keyspace@<db>__ 频道推消息。
  • E,表示 keyevent 事件,有这个字母表示会往 __keyevent@<db>__ 频道推消息。
  • g,表示一些通用指令事件支持,如 DELEXPIRERENAME 等等。
  • $,表示字符串(String)相关指令的事件支持。
  • l,表示列表(List)相关指令事件支持。
  • s,表示集合(Set)相关指令事件支持。
  • h,哈希(Hash)相关指令事件支持。
  • z,有序集(Sorted Set)相关指令事件支持。
  • x,过期事件,与 g 中的 EXPIRE 不同的是,gEXPIRE 是指执行 EXPIRE key ttl 这条指令的时候顺便触发的事件,而这里是指那个 key 刚好过期的这个时间点触发的事件。
  • e,驱逐事件,一个 key 由于内存上限而被驱逐的时候会触发的事件。
  • Ag$lshzxe 的别名。也就是说 AKE 的意思就代表了所有的事件。

结合上述列表我们就能拼凑出自己所需要的事件支持字符串了,在我的需求中我只需要 Ex 就可以满足了,所以配置项就是这样的:

notify-keyspace-events Ex

然后保存配置文件,启动 Redis 就启用了过期事件的支持了。

实践

我们先说任务的创造者吧。由于这里 Redis 的事件只会传键名,并不会传键值,而过期事件触发的时候那个键已经没了,你也无法获取键值,加上我的主系统和任务系统是分布式的,所以就把所有需要的信息往键名塞。

一个最简单的键名设计就是 任务类型 + ":" + JSON.stringify 化后的参数数组;更有甚者可以直接把任务类型替换成所需的函数路径,比如需要执行这个任务的函数在 task/foo/bar 文件下面的 baz 函数,参数 arguments 数组为 [ 1, 2 ],那么键名的设计可以是 task/foo/bar.baz:[1,2],反正我们只需要触发这个键,用不着去查询这个键。等到真正过期了任务系统接收到这个键名的时候再一一解析,得到需要执行 task/foo/bar.baz 这个消息,并且网函数里面传入 [1,2] 这个 arguments

所以当接收到一个定时任务的时候,我们得到消息、函数名、过期时间参数,这个函数可以如下设计:

/** 我们假设 redis 是一个 ioredis 的对象 */

var sampleTaskMaker = function(message, func, timeout) {
    message = JSON.stringify(message);
    console.log("Received a new task:", func, message, "after " + timeout + ".");

    // 这里的 uuid 是 npm 一个包
    // 生成一个唯一 uuid 的目的是为了防止两个任务用了相同的函数和参数,那么
    // 键名可能会重复并覆盖的情况
    // uuid 的文档为 https://www.npmjs.com/package/node-uuid
    //
    // 这里的 ❤️ 是一个分隔符,冒号是分割 uuid 和后面内容的,而 ❤️ 是分割函数名
    // 和消息的
    var key = uuid.v1().replace(/-/g, "") +
        ":❤️" + func + "❤️" + message;
    var content = "";

    redis.multi()
        .set(key, content)
        .expire(key, timeout)
        .exec(function(err) {
            if(err) {
                console.error("Failed to publish EXPIRE EVENT for " + content);
                console.error(err);
                return;
            }
        });
};

Ioredis 的稳定可以点此查看。

然后在任务系统里面的一开始监听这个过期频道:

// assign 是 sugarjs 里面的函数
// 把 db 塞到字符串里面的 {db} 里去
var subscribeKey = "__keyevent@{db}__:expired".assign({ db: 1 });

// 假设 sub 是 ioredis 的对象
sub.once("connect", function() {
    // 假设我们需要选择 redis 的 db,因为实际上我们不会去污染默认的 db 0
    sub.select(1, function(err) {
        if(err) process.exit(4);
        sub.subscribe("foo", function() {
            //... 订阅频道成功
        });
    });
});

// 监听从 `foo` 来的消息
sub.on("message", sampleOnExpired);

注意: 我们这里选择 db 1 是因为一旦开启过期事件监听,那么这个 db 的所有过期事件都会被发送。为了不跟正常使用的 redis 过期键混淆,我们为这个事情专门用一个新的 db。比如我们在自己正常使用的 db 0 里面监听了,那么不是我们任务触发的过期事件也会传过来,这个时候我们解析的键名就不对了。

最后就是我们的 sampleOnExpired 函数了。

var sampleOnExpired = function(channel, key) {
    // UUID:❤️func❤️params
    var body = key.split("❤️");
    if(body.length < 3) return;

    // 取出 body 第一位为 func
    var func = body[1];

    // 推出前两位,后面剩下的有可能是参数里面自带 ❤️ 而被分割,所以要拼回去
    body.shift(); body.shift();
    var params = body.join("❤️");

    // 然后把 params 传入 func 去执行
    // func:
    //   path1/path2.func
    func = func.split(".");
    if(func.length !== 2) {
        console.error("Bad params for task:", func.join("."), "-", params);
        return;
    }

    var path = func[0];
    func = func[1];

    var mod;
    try {
        mod = require("./tasks/" + path);
    } catch(e) {
        console.error("Failed to load module", path);
        console.error(e.stack);
        return;
    }

    process.nextTick(function() {
        try {
            mod[func].apply(null, JSON.parse(params));
        } catch(e) {
            console.error("Failed to call function", path, "-", func, "-", params);
            console.error(e.stack);
        }
    });
};

这个简易的架子搭好后,你只需要去写一堆任务执行函数,然后在生成任务的时候把相应参数传给 sampleTaskMaker 就好了。Redis 会自动过期并且触发事件给你的 sampleOnExpired 函数,然后就会去执行相应的任务处理函数了。

小结

其实这个需求在我们项目目前就是给用户定时发提醒短信用的。如果没有发现 Redis 的这个妙用,我还是会去用第二节里面的方法来写的。其实这期间也有考虑过用 RabbitMQ,不过貌似它的定时消息需要做一些 Hack,比较麻烦,最后就放弃了。

Redis 的这个方法其实是我在谷歌搜出来的,别人在 StackOverflow 回答的答案。我参考了之后用我自己的方法实现了出来,并且把代码的关键部分提取出来整理成这篇小文,还希望能给各位看官一些用吧,望打赏。

如果没有什么用也憋喷我,毕竟我是个蒟蒻。有更好的方法希望留个言,望告知。谢谢。(´,•ω•,)♡

26 回复

不错的文章

我写过的node调度

一样的思路

@i5ting 其实我更希望实现第二节的内容,毕竟我比较喜欢造轮子。奈何项目进度不允许。

@xadillax 借口,哈哈,永远都木有时间

如果我来写,我的做法会是这样:

首先我在 redis 中维护一个 sorted set。score 的值为:YYYYMMDD+HOUR,如 2015061020。当 A 用户邀请了 B 用户之后,我就推算一下 24 小时后,是多少,把 B 用户的值写入 set 中。然后另外起个定时任务,每小时读一次这个 set,读出比当前时间小的 score,批量处理一次。

请问我这个做法相比文中有哪些不足吗?

文中提到的自动过期以及事件提醒等等的功能,无非就是 sql 中的一个 where 语句? table_time < current_time。只是由于使用了 redis,所以要变着法子实现这个功能?

请问 subscribeKey 在哪儿订阅的

@alsotang 不同点在于一个是主动轮询,一个是被动接收消息。

Pull 和 Push 的区别——当前系统(除去外部依赖不谈)通常被动接收的效率比轮询高。这就是为什么网络通信有那么多方法,epoll 和 IOCP 这类效率高点的原因了。

以陈榕的说法讲,这叫鬼子进村策略。一遍遍的询问“鬼子进村了吗?”,“鬼子进村了吗?”… 大量的 cpu 时间都耗了进去。使用 kqueue 这些,变成了派一些个人去站岗,鬼子来了就可以拿到通知,效率自然高了许多。

以及考虑到可扩展性,时间不一定是二十四小时,任务也有很多,并且不是说一个小时跑一次脚本的那种。如果是这样的话 Cron 足矣。

@alsotang 哦看错了,你是脚本形式。我最开始也是考虑到脚本。但是考虑到任务的可扩展性,有很多任务只是要到点处理,而不是说到点之后一个小时内一次性批量处理。

应用场景还是有所不同的。

@xadillax

  1. 在这个地方,你觉得被动接收消息的效率比主动轮询高?效率怎么比?性能?CPU 占用,内存占用?我的方案至少还有的优势是【批量处理】。
  2. 考虑到可拓展性,我方案中的 score 也可以细到毫秒粒度。你所说的到点处理,最终还是受时间粒度所限。一般这类可以延迟执行的任务,到秒级别的时间粒度也够用了吧?我的方案可以按秒级别来轮询 redis。与通过事件监听相比,两者性能我认为相当。

我也不一定要用脚本形式,这个脚本可以写进 app 中,也可以单独提出来。Node 在 IO 上是非阻塞的,所以放哪里都差不多。但从解耦的角度来,app 负责写入任务,脚本负责处理任务,这样的架构我认为更好。

文章中的方案,复杂度比我的方案高了一个量级。是否真的这个必要?

楼主holy high 学习学习

@alsotang 我之前看错了,你的是脚本形式的,我有回复的。

这两种方案是没有可比性的,我的任务系统也是独立的,并不是合在 app 里面的。只是面临的需求不同而已。

@alsotang 我在文章一开始就已经提到脚本的方法了,实际上就是 Cron 大法,并且在一开始就说明了需求问题被我弃用了这个方法,转而将文章的重点放到 Redis 的过期事件当中。

@alsotang 粒度是小时的方法可以类比为我在序中的 Cron 大法。

而你说的把粒度调到秒的时候,就是我文章中的第二节的方法,本来是想用 C++ 来搞最小堆放在任务系统内部,这样还省了三方依赖的 IO 损耗。

只不过最后一种方法才是我要介绍的。

好高级的样子

完全看不懂啊 什么时候才可以到达这样的高度。。 自豪地采用 CNodeJS ionic

最近也在做类似的东西, 批量定时任务 + 精确到秒, 不过我是拿 ruby 写得, 思路跟 @alsotang 基本一致.

定时任务连 redis 都没放, 直接放 MySQL.

个人的想法是, 降低复杂度 + 可扩展性最重要, 性能够用就好. 过早优化是万恶之源, 对不?

@42thcoder 嗯,说的极是。我上面也是提供了三种解决方案,你做的跟我的第二种方法类似。我只是在循序渐进介绍可能的设计方案。

@42thcoder @xadillax 后来你们发送短信的实际需求是用的哪种方案。。?

@alsotang -。 - 短信只是其中一种需求啦,总之这是一个通用的类似的定时任务方案。

其实rabbitmq也不麻烦,应该还要容易一些

一个简单的事情,被你说得好复杂。 主要是使用到了 这个功能吧 http://redisdoc.com/topic/notification.html

我的理解就是,

  1. 任务来了, 2 写入redis
  2. 加上键值过期监听
  3. 监听到过期,任务处理

蒟蒻。。。。

@shanelau -。 - 总得凑字数不是。而且前面的两种方法也是可以借鉴的。

@fish 对,蒟蒻就是本菜。

回到顶部