项目地址:https://github.com/miserylee/mmq
安装
npm install mmq
or
yarn add mmq
基本使用
引入MQ模块
const MQ = require('mmq');
首先要定义你的consumers,用于对队列中的消息进行消费。
比如在这里定义一个名为MyConsumer的consumer,接受两个参数param1,param2。
const consumers = {
'MyConsumer': (param1, param2) => {
console.log('I am consumer!', param1, param2);
}
};
如果当前不需要对消息进行消费,是可以不用定义consumers的
比如:现在有两个服务器进程,其中一个为应用服务,负责生产消息,另外一个为消息处理服务,负责处理消息。这时候只用在消息处理服务中定义consumers来初始化MQ对象即可。
在应用服务程序的主入口初始化MQ对象,初始化的过程主要是连接Mongodb的过程。
const mq = MQ.init('My Queue', {
uri: 'mongodb://localhost/mmq',
errorListener: console.error,
messageNameFormat: ({operation, params}) => {
return `${operation} with ${params.length} params`
}
});
可用参数
*uri
mongodb uri,用于连接mongodb,必选参数。mongoOptions = {}
连接mongodb的options。enableDeadQueue = true
是否启用死消息队列,死消息队列用于存储消费失败的消息。默认启用。deadQueueName = 'mmq-deadQueue'
死消息队列的collection名称。queueName = 'mmq-queue'
消息队列的collection名称。visibility = 30
消息可见时间,单位s。即消费者获取一条消息后有多长时间进行消费,如果超过可见时间,将会认为消费失败。如果消费过程较长,将会在visibility / 2
s时进行续时。delay = 0
消息进入消息队列后延迟多长时间(s)才能被消费者获取。maxRetries = 5
消息被进行多少次消费失败后,会进入死消息队列。consumeInterval = 5000
消费周期(ms)。消费者服务会以此周期来轮询消息队列,获取并消费消息。maxConsumption = 5
一次消费周期,最多获取并消费多少条消息。与consumeInterval
共同决定消费速率。errorListener(error)
错误处理函数。messageNameFormat({operation, params, delay})
用于打印log时,对消息名称的格式化。consumers
消费者。如果该进程不需要进行消费动作,则不需要定义该参数。
该对象会缓存在MQ.instances
中,可以使用getInstance()
方法再次获取该对象
const mq = MQ.getInstance(name);
init
方法会在初始化成功后会emit出一个initialized
事件,可以监听该事件来进行后续的处理,比如向消息队列中推入消息。
mq.on('initialized', _ => {
mq.appendMessage({
operation: 'MyConsumer',
params: ['Hello', 'world']
});
});
一条message
包括两个属性
operation
对应了消费者的名字,即该消息将由哪一个consumer进行消费params
参数队列,即consumer传入的参数
可以使用appendMessages
来一次性进行多条消息的推入
mq.appendMessages([{
operation: 'MyConsumer',
params: ['I am', 'message 1']
}, {
operation: 'MyConsumer',
params: ['I am', 'message 2']
}]);
也可以不用监听initialized
事件,在MQ对象初始化完成之前,也可以推入消息,消息会暂存在mq. _retianedMessages
中,一旦MQ初始化完成,暂存消息会一次性全部推入消息队列。
在消费服务的程序主入口初始化MQ,此时需要传入consumers参数用于消费。
const mq = MQ.init('My Queue', {
uri: 'mongodb://localhost/mmq',
errorListener: console.error,
messageNameFormat: ({operation, params}) => {
return `${operation} with ${params.length} params`
},
consumers
});
获取一条消息并进行消费。
mq.consume();
或者你可以开启一个计时器来进行周期性的消费,不过mmq已经帮你处理好了
mq.run();
至此,你的消息队列服务就部署好了,给程序添加环境变量DEBUG=mq:*
来运行,这样可以打印出消息队列运行过程中的log了。
欢迎issue及PR! 项目地址:https://github.com/miserylee/mmq
mark,学习了, 我之前都是使用ascoltatori实现的同样的功能; 用到的是mongodb的固定集合的队列特性
@dfsq1311 固定集合的话,如果消息量过大,会丢失消息的吧? 我觉得还是直接全部消息都存储起来,定期对已消费的消息进行clean比较靠谱。