基于mongodb实现的消息队列mmq(Mongodb-Message-Queue)
发布于 23 天前 作者 miserylee 319 次浏览 来自 分享

项目地址:https://github.com/miserylee/mmq

安装

npm install mmq

or

yarn add mmq

基本使用

引入MQ模块

const MQ = require('mmq');

首先要定义你的consumers,用于对队列中的消息进行消费。

比如在这里定义一个名为MyConsumer的consumer,接受两个参数param1,param2。

const consumers = {
  'MyConsumer': (param1, param2) => {
    console.log('I am consumer!', param1, param2);
  }
};

如果当前不需要对消息进行消费,是可以不用定义consumers的

比如:现在有两个服务器进程,其中一个为应用服务,负责生产消息,另外一个为消息处理服务,负责处理消息。这时候只用在消息处理服务中定义consumers来初始化MQ对象即可。

在应用服务程序的主入口初始化MQ对象,初始化的过程主要是连接Mongodb的过程。

const mq = MQ.init('My Queue', {
  uri: 'mongodb://localhost/mmq',
  errorListener: console.error,
  messageNameFormat: ({operation, params}) => {
    return `${operation} with ${params.length} params`
  }
});

可用参数

  • *uri mongodb uri,用于连接mongodb,必选参数。
  • mongoOptions = {} 连接mongodb的options。
  • enableDeadQueue = true 是否启用死消息队列,死消息队列用于存储消费失败的消息。默认启用。
  • deadQueueName = 'mmq-deadQueue' 死消息队列的collection名称。
  • queueName = 'mmq-queue' 消息队列的collection名称。
  • visibility = 30 消息可见时间,单位s。即消费者获取一条消息后有多长时间进行消费,如果超过可见时间,将会认为消费失败。如果消费过程较长,将会在visibility / 2s时进行续时。
  • delay = 0 消息进入消息队列后延迟多长时间(s)才能被消费者获取。
  • maxRetries = 5 消息被进行多少次消费失败后,会进入死消息队列。
  • consumeInterval = 5000 消费周期(ms)。消费者服务会以此周期来轮询消息队列,获取并消费消息。
  • maxConsumption = 5 一次消费周期,最多获取并消费多少条消息。与consumeInterval共同决定消费速率。
  • errorListener(error) 错误处理函数。
  • messageNameFormat({operation, params, delay}) 用于打印log时,对消息名称的格式化。
  • consumers 消费者。如果该进程不需要进行消费动作,则不需要定义该参数。

该对象会缓存在MQ.instances中,可以使用getInstance()方法再次获取该对象

const mq = MQ.getInstance(name);

init方法会在初始化成功后会emit出一个initialized事件,可以监听该事件来进行后续的处理,比如向消息队列中推入消息。

mq.on('initialized', _ => {
	mq.appendMessage({
		operation: 'MyConsumer',
		params: ['Hello', 'world']
	});
});

一条message包括两个属性

  • operation 对应了消费者的名字,即该消息将由哪一个consumer进行消费
  • params 参数队列,即consumer传入的参数

可以使用appendMessages来一次性进行多条消息的推入

mq.appendMessages([{
  operation: 'MyConsumer',
  params: ['I am', 'message 1']
}, {
  operation: 'MyConsumer',
  params: ['I am', 'message 2']
}]);

也可以不用监听initialized事件,在MQ对象初始化完成之前,也可以推入消息,消息会暂存在mq. _retianedMessages中,一旦MQ初始化完成,暂存消息会一次性全部推入消息队列。

在消费服务的程序主入口初始化MQ,此时需要传入consumers参数用于消费。

const mq = MQ.init('My Queue', {
  uri: 'mongodb://localhost/mmq',
  errorListener: console.error,
  messageNameFormat: ({operation, params}) => {
    return `${operation} with ${params.length} params`
  },
  consumers
});

获取一条消息并进行消费。

mq.consume();

或者你可以开启一个计时器来进行周期性的消费,不过mmq已经帮你处理好了

mq.run();

至此,你的消息队列服务就部署好了,给程序添加环境变量DEBUG=mq:*来运行,这样可以打印出消息队列运行过程中的log了。

test

欢迎issue及PR! 项目地址:https://github.com/miserylee/mmq

2 回复

mark,学习了, 我之前都是使用ascoltatori实现的同样的功能; 用到的是mongodb的固定集合的队列特性

@dfsq1311 固定集合的话,如果消息量过大,会丢失消息的吧? 我觉得还是直接全部消息都存储起来,定期对已消费的消息进行clean比较靠谱。

回到顶部