大致描述一下问题的情况:客户端会产生大量的“消息”存到mongo数据库作为一个“消息池”,然后程序会每次读取一定量(上百个)的“消息”进行处理,会有异步处理,所以会用到async.quene来做并发控制(每次并发执行5个左右)。现在问题出现了:如果多进程的话,mongo“消息池”中的“消息”有可能被读出来多次的,一开始的思路是从mongo“消息池”读取“消息”的时候直接把已经读出来的标记一下状态,但是mongoose里面貌似只有findOneAndUpdate这样一次读一个的操作。不知道把问题说清楚了没,求指导。
貌似MongoDB后面的版本提供了基于文档的锁
这一句是对 mongo 2.8 的更新,出现了理解错误。
现在的锁是数据库级别的,2.8 只是锁的级别到了文档级别,但是锁是不可被操作的。这种文档级别的锁的性质,只能被想办法利用。
在数据库没有锁的情况下,真的想不出该如何解决这种问题。
其实这种并发读写的问题一直是用“锁”这个概念在解决的,我在开发的过程中遇到过类似的场景,我们是把元信息存在 mysql 里面,然后具体的数据放在 nosql 里。这样我们读的时候可以利用 sql 的锁特性,然后再去 nosql 里面取出更多数据。
不过话说我们的消息数据并不大,我们只是为了不重复读才用的 sql,不知你的场景是不是写入太频繁?
可以考虑在应用程序这一级提供分布式锁。例如,用Redis + redis-warlock或者redis-lock模块实现分布式锁。其中,redis-warlock
实现的是这个分布式锁算法。
每个进程:
- 请求加锁,如果成功则继续,失败则等待一段时间再重新申请
- 读取数据库未被标记为已读的记录
- 把读到的记录标记为已读,写回到数据库中
- 释放锁
- 每五个一组,处理本次读到的记录
另外,TokuMX是MongDB的一个变种,提供了更快的写操作和文档级别的锁机制。存储同样大小的数据量,TokuMX占用的空间明显要小。
嗯 谢谢你 我们也本来打算用这个思想自己写锁的 既然有这样的模块下来可以看看研究研究 不过现在我们这样做的:我们把收集到的消息放到redis的list里面 然后各个进程来一个一个的pop 目前测的速度能满足需求 不知道各位怎么看
没看明白。 如果是要并发从同一个数据源提出不重复的内容,把要取的内容编号就可以了。 进程1,2,3,4,5,每个读5个。 分别读1~5, 6~10, … 26~30,31~35,… 51~55,56~60,… 每个序列按n×5递增计算。 数据源新的内容编号放在后面。 读数据不需要锁。