并发请求update如何保证数据的一致性?
发布于 4 个月前 作者 Fov6363 848 次浏览 来自 问答

一.情景介绍: 1.有一个接口提供update数据的功能,但这个接口在update时,需要做一些额外的操作,即需要通过先find后update的方式来修改数据,这样在并发的情况下如何保证数据的一致性? 2.使用mongoose 二.相关伪代码如下:

const user = await user.find({"id":id}).exec();
//要对user的属性做一些修改
user.xxx = xxxxxx;

await user.save();

当这个接口被并发调用时,就会出现值覆盖的问题,比如第一个请求是把user.age + 1,第二个请求同第一个请求,但实际效果user.age可能只加了1,因为第二个请求取到的user.age时,第一个请求还没有执行完save操作,导致第二个请求读到了脏数据,所以后面的值会覆盖前面的值. 三.局限性 1.中间的这部分修改操作是必须的,且无法转换成mongo原生支持的操作符(指$inc等),通常见于利用原属性值计算新值,比如user.age = (new Date().getTime() - user.birthday).必须在内存中计算属性值. 2.这个接口是并发调用,不能很好的实现队列的机制. 3.mongodb不支持行级锁,且没有暴露出来,所以通过mongodb的锁机制去搞这个感觉也没什么思路 四.方法一 1.把最后那个user.save()改成 user.findOneAndUpdate({“id”:id,“updated_at”:user.updated_at},{update属性}) ,返回找不到对应的document时,可以重新走一遍这个流程,如果返回数据了,证明修改成功,这个依赖的是每次update操作,都会更新其updated_at字段,变相的用程序实现乐观锁,这样做会不会陷入死循环状态?这样会不会有其它问题? 2.希望各位大神有其它的思路和方法指教一下.

10 回复

mongodb其实不是很适合做这类操作,因为它只支持文档级别的原子操作,并且没有事务,由于node单线程,天生异步,因此node里面没有锁的概念,而且node也不需要锁。所以在这种情况下只能考虑从第三方来实现,比如可以借助redis实现一个分布式锁。

@nullcc 基于redis实现一个分布式锁,刚才刷其它网站时看到了,对redis的分布式锁没有经验,我要去看下如何实现…因为目前的这个项目是以cluster模式启动了(以后可能是发展为集群),所以借助redis来实现分布式锁我觉得是很不错的方向.感觉回答.

@Fov6363 具体来说,你有一个资源A,你想通过一个接口来更新这个资源A,并且在这个接口中除了更新A还有一些其他操作。比如我们需要先从DB中获取A,做一些相关操作,计算出A的一个值,然后真正更新A。这个过程中有可能产生竞争,导致数据一致性有问题。 如果用redis实现一个分布式锁,考虑最简单的情况,一般来说mongodb的文档都有一个_id,就拿这个_id做key,value为1,表示同一时间只能有一个操作方操作资源A。redis的incr和decr操作都是原子的,所以在你要操作这个资源A的时候,要像在多线程编程中先acquire lock,操作完毕要release lock。这是最基本的情况,如果要做到稳定高效,你还要考虑获取锁超时、执行失败时release lock的问题。大致是这个思路。

@nullcc 非常感谢,这个过程我明白,关键是您说的获取锁超时,执行失败释放锁的问题不太好处理,我在看 redlock这个包,看是否能满足我现在的需求.

@Fov6363 这个库应该是可以用的。文档的第一句已经表明了它的用处: This is a node.js implementation of the redlock algorithm for distributed redis locks. It provides strong guarantees in both single-redis and multi-redis environments, and provides fault tolerance through use of multiple independent redis instances or clusters.

具体到例子里面也看到了一些ttl的设置,error handling之类的,仔细研究一下用法就行。

const debug = require('debug');
const Promise = require('bluebird');


const http          = require('http');
var client1         = require('redis').createClient(6379, '127.0.0.1');
var Redlock         = require('redlock');
const Db            = require('mongodb').Db;
const MongoClient   = require('mongodb').MongoClient;
const Server        = require('mongodb').Server;
var collection;

var redlock = new Redlock(
    // you should have one client for each redis node
    // in your cluster
    [client1],
    {
        // the expected clock drift; for more details
        // see http://redis.io/topics/distlock
        driftFactor: 0.01, // time in ms

        // the max number of times Redlock will attempt
        // to lock a resource before erroring
        retryCount:  10,

        // the time in ms between attempts
        retryDelay:  200, // time in ms

        // the max time in ms randomly added to retries
        // to improve performance under high contention
        // see https://www.awsarchitectureblog.com/2015/03/backoff.html
        retryJitter:  200 // time in ms
    }
);
const hostname = '127.0.0.1';
const port = 3000;

const server = http.createServer((req, res) => {
    res.statusCode = 200;
    res.setHeader('Content-Type', 'text/plain');
    // res.end('Hello World\n');
    update(res);
});
var resource = "1219924275036161";
var ttl = 10000;

MongoClient.connect('mongodb://localhost:27017/baas',async function(err, db) {

    // Create a collection
    collection = db.collection('users');
    // Insert the docs


    server.listen(port, hostname, () => {
        console.log(`Server running at http://${hostname}:${port}/`);
    });
});

function update(res) {
    console.log('-----');
    redlock.lock(resource,ttl).then(lock => {
        collection.find({'id':1219924275036161}).toArray(function (err,result) {
            let age = result[0].user_data.age;
            age += 1;
            // age = 0;

            collection.updateOne({'id':1219924275036161},{'$set':{'user_data.age':age}},function (err,result) {
                lock.unlock();
                res.end('nihao');
            })
        });
    })

}

赋上一个小demo,其中运行了两次,第一次不使用redlock包,第二次使用redloack包.使用wrk工具测试如下: 第一次

➜  wrk wrk -t4 -c200 -d30s --latency  http://127.0.0.1:3000
Running 30s test @ http://127.0.0.1:3000
  4 threads and 200 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    82.95ms   35.47ms 408.34ms   83.17%
    Req/Sec   620.24    185.87     1.15k    67.79%
  Latency Distribution
     50%   74.00ms
     75%   95.12ms
     90%  121.55ms
     99%  236.99ms
  73916 requests in 30.10s, 9.16MB read
  Socket errors: connect 0, read 67, write 0, timeout 0
Requests/sec:   2455.65
Transfer/sec:    311.75KB

最后的age为976,起始为0. 第二次:

➜  wrk wrk -t4 -c200 -d30s --latency  http://127.0.0.1:3000
Running 30s test @ http://127.0.0.1:3000
  4 threads and 200 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   155.27ms  341.91ms   2.00s    86.25%
    Req/Sec   165.63    144.66     0.91k    76.46%
  Latency Distribution
     50%    1.38ms
     75%    3.83ms
     90%  658.86ms
     99%    1.50s
  18377 requests in 30.09s, 2.28MB read
  Socket errors: connect 0, read 87, write 0, timeout 64
Requests/sec:    610.74
Transfer/sec:     77.53KB

QPS显示下降的很厉害,age此时为19380,起始值为976,其中在运行时,报一些未捕捉reject的错误,应该是这些错误导致了实际结果与预测结果有误.

第二个例子里的

Socket errors: connect 0, read 87, write 0, timeout 64

出现了64个连接超时的,这部分影响也很大。 还有就是一些参数的调优,因为这个锁是独占锁,可以计算出在一段时间内,一共能够成功进行几次操作。

./wrk -t4 -c200 -d30s --latency  http://127.0.0.1:3000
Running 30s test @ http://127.0.0.1:3000
  4 threads and 200 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    52.63ms  209.61ms   2.00s    93.22%
    Req/Sec   437.32    414.88     1.65k    73.17%
  Latency Distribution
     50%  766.00us
     75%    0.88ms
     90%    1.51ms
     99%    1.17s
  30797 requests in 30.09s, 3.82MB read
  Socket errors: connect 0, read 90, write 0, timeout 32
Requests/sec:   1023.42
Transfer/sec:    129.93KB

@Fov6363 我机器上的结果。

@nullcc 收到,很强

回到顶部