Node.js 与php的session共享小记
发布于 2年前 作者 xujun52011 2100 次浏览 最后一次编辑是 10个月前

前段时间一直在研究php与nodejs的session共享问题 最初遇到的问题是一般php默认的序列化方法及反序列化使用的是serialize/unserialize, 所以需要用js来写serialize/unserialize(session_encode及session_decode只需要稍稍重写一下serialize/unserialize即可) 后面遇到的问题就是分布式memcached的时候hash算法也需要与php的hash算法保持一致性, 以及热增减服务器, 这点目前还在纠结, 在看关于consistent hashing算法的内容, 写完之后也会与大家分享

ps:本memcached模块要比NPM那个memcached模块效率高 ab 1000压测每秒要多处理100多个请求(同样的服务器, 同样的connect, 只是memcached模块替换, 取30次数据, 相信数据还是比较可信的)

add:今天打开php_memcache.dll的源代码分析, 终于得到了standard策略的算法, 也已用nodejs实现, 下一步就是要把consistent策略的算法实现出来, 这样memcache模块就完整了, 可以与php共享了.

最新更新的代码

但是consistent模式下还是不能百分之百的跟php实现统一(即100%的命中)

欢迎拍砖

var net = require('net')
 , crypto = require('crypto')
 , zlib = require('zlib')
 , CRLF = "\r\n"
 , FLAG_STRING = 0
 , FLAG_ENCODE = 1
 , FLAG_COMPRESSED = 2
 , FLAG_BINARY = 3
 , CONSISTENT_POINTS = 160
 , CONSISTENT_BUCKETS = 1024, fs = require('fs');

function utf8_encode (argString) {

  if (argString === null || typeof argString === "undefined") {
    return "";
  }

  var string = (argString + '');
  var utftext = '',
    start, end, stringl = 0;

  start = end = 0;
  stringl = string.length;
  for (var n = 0; n < stringl; n++) {
    var c1 = string.charCodeAt(n);
    var enc = null;

    if (c1 < 128) {
      end++;
    } else if (c1 > 127 && c1 < 2048) {
      enc = String.fromCharCode((c1 >> 6) | 192, (c1 & 63) | 128);
    } else {
      enc = String.fromCharCode((c1 >> 12) | 224, ((c1 >> 6) & 63) | 128, (c1 & 63) | 128);
    }
    if (enc !== null) {
      if (end > start) {
        utftext += string.slice(start, end);
      }
      utftext += enc;
      start = end = n + 1;
    }
  }

  if (end > start) {
    utftext += string.slice(start, stringl);
  }

  return utftext;
}

function md5(str) {
  var md5sum = crypto.createHash('md5');
  md5sum.update(String(str));
  return md5sum.digest('hex');
}

function crc32 (str) {

  str = utf8_encode(str);
  var table = [
   "00000000", "77073096", "EE0E612C", "990951BA",
   "076DC419", "706AF48F", "E963A535", "9E6495A3",
   "0EDB8832", "79DCB8A4", "E0D5E91E", "97D2D988",
   "09B64C2B", "7EB17CBD", "E7B82D07", "90BF1D91",
   "1DB71064", "6AB020F2", "F3B97148", "84BE41DE",
   "1ADAD47D", "6DDDE4EB", "F4D4B551", "83D385C7",
   "136C9856", "646BA8C0", "FD62F97A", "8A65C9EC",
   "14015C4F", "63066CD9", "FA0F3D63", "8D080DF5",
   "3B6E20C8", "4C69105E", "D56041E4", "A2677172",
   "3C03E4D1", "4B04D447", "D20D85FD", "A50AB56B",
   "35B5A8FA", "42B2986C", "DBBBC9D6", "ACBCF940",
   "32D86CE3", "45DF5C75", "DCD60DCF", "ABD13D59",
   "26D930AC", "51DE003A", "C8D75180", "BFD06116",
   "21B4F4B5", "56B3C423", "CFBA9599", "B8BDA50F",
   "2802B89E", "5F058808", "C60CD9B2", "B10BE924",
   "2F6F7C87", "58684C11", "C1611DAB", "B6662D3D",
   "76DC4190", "01DB7106", "98D220BC", "EFD5102A",
   "71B18589", "06B6B51F", "9FBFE4A5", "E8B8D433",
   "7807C9A2", "0F00F934", "9609A88E", "E10E9818",
   "7F6A0DBB", "086D3D2D", "91646C97", "E6635C01",
   "6B6B51F4", "1C6C6162", "856530D8", "F262004E",
   "6C0695ED", "1B01A57B", "8208F4C1", "F50FC457",
   "65B0D9C6", "12B7E950", "8BBEB8EA", "FCB9887C",
   "62DD1DDF", "15DA2D49", "8CD37CF3", "FBD44C65",
   "4DB26158", "3AB551CE", "A3BC0074", "D4BB30E2",
   "4ADFA541", "3DD895D7", "A4D1C46D", "D3D6F4FB",
   "4369E96A", "346ED9FC", "AD678846", "DA60B8D0",
   "44042D73", "33031DE5", "AA0A4C5F", "DD0D7CC9",
   "5005713C", "270241AA", "BE0B1010", "C90C2086",
   "5768B525", "206F85B3", "B966D409", "CE61E49F",
   "5EDEF90E", "29D9C998", "B0D09822", "C7D7A8B4",
   "59B33D17", "2EB40D81", "B7BD5C3B", "C0BA6CAD",
   "EDB88320", "9ABFB3B6", "03B6E20C", "74B1D29A",
   "EAD54739", "9DD277AF", "04DB2615", "73DC1683",
   "E3630B12", "94643B84", "0D6D6A3E", "7A6A5AA8",
   "E40ECF0B", "9309FF9D", "0A00AE27", "7D079EB1",
   "F00F9344", "8708A3D2", "1E01F268", "6906C2FE",
   "F762575D", "806567CB", "196C3671", "6E6B06E7",
   "FED41B76", "89D32BE0", "10DA7A5A", "67DD4ACC",
   "F9B9DF6F", "8EBEEFF9", "17B7BE43", "60B08ED5",
   "D6D6A3E8", "A1D1937E", "38D8C2C4", "4FDFF252",
   "D1BB67F1", "A6BC5767", "3FB506DD", "48B2364B",
   "D80D2BDA", "AF0A1B4C", "36034AF6", "41047A60",
   "DF60EFC3", "A867DF55", "316E8EEF", "4669BE79",
   "CB61B38C", "BC66831A", "256FD2A0", "5268E236",
   "CC0C7795", "BB0B4703", "220216B9", "5505262F",
   "C5BA3BBE", "B2BD0B28", "2BB45A92", "5CB36A04",
   "C2D7FFA7", "B5D0CF31", "2CD99E8B", "5BDEAE1D",
   "9B64C2B0", "EC63F226", "756AA39C", "026D930A",
   "9C0906A9", "EB0E363F", "72076785", "05005713",
   "95BF4A82", "E2B87A14", "7BB12BAE", "0CB61B38",
   "92D28E9B", "E5D5BE0D", "7CDCEFB7", "0BDBDF21",
   "86D3D2D4", "F1D4E242", "68DDB3F8", "1FDA836E",
   "81BE16CD", "F6B9265B", "6FB077E1", "18B74777",
   "88085AE6", "FF0F6A70", "66063BCA", "11010B5C",
   "8F659EFF", "F862AE69", "616BFFD3", "166CCF45",
   "A00AE278", "D70DD2EE", "4E048354", "3903B3C2",
   "A7672661", "D06016F7", "4969474D", "3E6E77DB",
   "AED16A4A", "D9D65ADC", "40DF0B66", "37D83BF0",
   "A9BCAE53", "DEBB9EC5", "47B2CF7F", "30B5FFE9",
   "BDBDF21C", "CABAC28A", "53B39330", "24B4A3A6",
   "BAD03605", "CDD70693", "54DE5729", "23D967BF",
   "B3667A2E", "C4614AB8", "5D681B02", "2A6F2B94",
   "B40BBE37", "C30C8EA1", "5A05DF1B", "2D02EF8D"
  ];

  var crc = 0;
  var x = 0;
  var y = 0;

  crc = crc ^ (-1);
  for (var i = 0, iTop = str.length; i < iTop; i++) {
    y = (crc ^ str.charCodeAt(i)) & 0xFF;
    x = "0x" + table[y];
    crc = (crc >>> 8) ^ x;
  }

  return crc ^ (-1);
}

function memcached(config) {
 var currentServer
    , buffer
    , header
    , cmd
    , callback
    , receiveTimes
    , headLength
    , bodyLength
    , settings = {}
    , config = config || {}
    , defaults = {
    encode : JSON.stringify,
    decode : JSON.parse,
    maxRetryTimes : 5,
    retryDelay : 10,
    consistent : false
   }
    , serverGroup = []
    , connections = []
    , serverStatus = []
    , serverWeight = []
    , serverNodes = []
    , serverBuckets = []
    , bucketPoped = false
    , queryStarted = false//表示当前为空闲状态可以处理
    , queue = [];

 for(var o in defaults) {
    if (config[o] == undefined) {
   settings[o] = defaults[o];
    } else {
   settings[o] = config[o];
    }
 }

 function findServerStandard(key) {
    var hash = (crc32(key) >> 16) & 0x7fff
   , server = serverWeight[hash % (serverWeight.length)];
    return server;
 }

 function findServerConsistent(key) {
    if (!bucketPoped) {
   var step = Math.floor(0xffffffff / CONSISTENT_BUCKETS)
    , nodesLength = serverNodes.length;
   serverNodes = serverNodes.sort(function(a, b){
    if (a.point < b.point) {
        return -1;
    } else if (a.point > b.point) {
        return 1;
    } else {
        return 0;
    }
   });
   for(var i=0; i<CONSISTENT_BUCKETS; i++) {
    var point = step * i
        , low = 0
        , high = nodesLength - 1
        , mid
        , bucket;
    while(true) {
        if (point <= serverNodes[low].point || point > serverNodes[high].point) {
      bucket = serverNodes[low].server;
      break;
        }
        //mid = Math.floor(low + (high - low) / 2);
        mid = Math.floor(low + (high - low) * (point - serverNodes[low].point) / (serverNodes[high].point - serverNodes[low].point));
        if (point <= serverNodes[mid].point && point > (mid ? serverNodes[mid-1] : 0)) {
      bucket = serverNodes[mid].server;
      break;
        }
        if (serverNodes[mid].point < point) {
      low = mid + 1;
        } else {
      high = mid - 1;
        }
    }
    serverBuckets.push(bucket);
   }
   bucketPoped = true;
    }
    var hash = crc32(key);
    if (hash < 0) {
   hash = 0xffffffff + hash;
    }
    return serverBuckets[hash % CONSISTENT_BUCKETS];
 }

 function addServerStandard(host, port, weight) {
    serverGroup.push({host : host, port : port});
    connections.push(null);
    serverStatus.push(false);
    for(var i=0; i<weight; i++) {
   serverWeight.push(serverGroup.length - 1);
    }
 }

 function addServerConsistent(host, port, weight) {
    serverGroup.push({host : host, port : port});
    connections.push(null);
    serverStatus.push(false);
    var points = weight * CONSISTENT_POINTS
   , serverSeq = serverGroup.length - 1;
    for(var i=0; i<points; i++) {
   var key = host + ':' + port + '-' + i
    , hash = crc32(key);
   if (hash < 0) {
    hash = 0xffffffff + hash;
   }
   serverNodes.push({
    server : serverSeq,
    point : hash
   });
    }
    bucketPoped = false;
 }

 function findServer(key) {
    if (serverGroup.length == 1) {
   currentServer = 0;
    } else {
   if (!settings.consistent) {
    currentServer = findServerStandard.call(this, key);
   } else {
    currentServer = findServerConsistent.call(this, key);
   }
    }
    return currentServer;
 }

 function write(data) {
    queryStarted = true;
    connections[currentServer].write(data);
 }

 function init() {
    buffer = new Buffer('');
    header = [];
    cmd = null;
    callback = null;
    receiveTimes = 0;
    headLength = 0;
    bodyLength = 0;
    if (queue.length > 0) {
   var next = queue[0];
   currentServer = next.server;
   write(next.data + CRLF);
    } else {
   queryStarted = false;
    }
 }

 this.addServer = function(host, port, weight) {
    var host = host || '127.0.0.1'
   , port = port || 11211
   , weight;
    if (!weight || weight < 0) weight = 1;
    if (!settings.consistent) {
   addServerStandard.call(this, host, port, weight);
    } else {
   addServerConsistent.call(this, host, port, weight);
    }
    return this;
 }

 this.query = function(data, callback, cmd, times) {
    if (serverGroup.length == 0) return;
    var conn = connections[currentServer]
   , server = serverGroup[currentServer]
   , host = server.host
   , port = server.port
   , connected = serverStatus[currentServer]
   , idx = data.indexOf(' ')
   , cmd = cmd || (idx == -1 ? data : data.substr(0, idx)).toLowerCase()
   , self = this
   , times = times || 0;
    queue.push({
   server : currentServer,
   cmd : cmd,
   data : data,
   callback : callback
    });
    if (!connected) {
   if (conn) return;
   conn = net.connect(port, host);
   conn.on('connect', function(){
    serverStatus[currentServer] = true;
    if (!queryStarted) {
        init();
    }
   });
   conn.on('data', function(data){
    self.receive(data);
   });
   conn.on('error', function(e){
    connections[currentServer] = null;
    serverStatus[currentServer] = false;
    if (times >= settings.maxRetryTimes) {
        throw e;
    }
    setTimeout(function(){
        self.query(data, callback, cmd, times+1);
    }, settings.retryDelay);
   });
   connections[currentServer] = conn;
    } else {
   if (!queryStarted) {
    init();
   }
    }
 }

 this.store = function(cmd, key, value, callback, lifetime, ver) {

    var key = this.setKey(key);

    findServer.call(this, key);//选择服务器

    if (typeof callback != 'function') {
   lifetime = callback;
   callback = null;
    }

    if (typeof value == 'number') {
   val = value.toString();
   flags = FLAG_STRING;
    } else if (Buffer.isBuffer(value)) {
   val = value;
   flags = FLAG_BINARY;
    } else if (typeof value == 'string') {
   val = value.toString();
   flags = FLAG_STRING;
    } else {
   try {
    val = settings.encode(value);
    flags = FLAG_ENCODE;
   } catch(e) {
    try {
        val = value.toString();
    } catch(e) {
        val = '';
    }
    flags = FLAG_STRING;
   }
    }

    var flags = flags || FLAG_STRING
   , lifetime = lifetime || 0
   , buffer = flags == FLAG_BINARY ? val : new Buffer(val)
   , bufferLength = buffer.length || 0
   , query = [cmd, key, flags, lifetime, bufferLength];

    if (ver !== undefined) query.push(ver);

    this.query(query.join(' ') + CRLF + val, callback);
 }

 this.receive = function(data) {
    var self = this;
    if (receiveTimes == 0) {
   var last = queue.shift();
   cmd = last.cmd;
   callback = last.callback;
    }
    buffer = Buffer.concat([buffer, data]);
    receiveTimes++;
    var lines = buffer.toString().split(CRLF);
    header = lines[0].split(/\s/g);
    for(var i=0; i<header.length; i++) {
   header[i] = header[i].toLowerCase();
    }
    result = this[cmd+'Handler']();
    if (result === null || result === undefined) return;
    try {
   if (result[2] == FLAG_COMPRESSED) {
    var fn = callback;
    zlib.unzip(result[1], function(e, buf){
        if (e) {
      result[0] = e;
        }
        result[1] = buf.toString();
        fn.apply(self, result);
    });
   } else {
    callback.apply(this, result);
   }
    } catch(e) {
   
    }
    init();
 }

 this.close = function(){
    var self = this;
    for(var i=0; i<serverGroup.length; i++) {
   if (!connections[i]) continue;
   connections[i].end();
   connections[i].destroy();
   connections[i] = null;
   serverStatus[i] = false;
    }
    serverGroup = connections = serverStatus = queue = [];
    return this;
 }

 this.setKey = function(key) {
    return key.toString().substr(0, 250).replace(/\s/g, '_');
 }

 this.add = function(key, value, callback, lifetime) {
    this.store('add', key, value, callback, lifetime);
    return this;
 }

 this.set = function(key, value, callback, lifetime) {
    this.store('set', key, value, callback, lifetime);
    return this;
 }

 this.replace = function(key, value, callback, lifetime) {
    this.store('replace', key, value, callback, lifetime);
    return this;
 }

 this.append = function(key, value, callback) {
    this.store('append', key, value, callback, 0);
    return this;
 }

 this.prepend = function(key, value, callback) {
    this.store('prepend', key, value, callback, 0);
    return this;
 }

 this.cas = function(key, value, ver, callback, lifetime) {
    this.store('cas', key, value, callback, lifetime, ver);
    return this;
 }

 this.incr = function(key, value, callback) {
    findServer.call(this, key);
    this.query('incr ' + key + ' ' + value, callback);
    return this;
 }

 this.decr = function(key, value, callback) {
    findServer.call(this, key);
    this.query('decr ' + key + ' ' + value, callback);
    return this;
 }

 this.delete = function(key, callback) {
    var key = this.setKey(key);
    findServer.call(this, key);
    this.query('delete ' + key, callback);
    return this;
 }

 this.flush = function(callback) {
    var flushed = 0
   , self = this
   , error = undefined;
    for(var i=0; i<serverGroup.length; i++) {
   currentServer = i;
   this.query('flush_all', function(e, data){
    flushed++;
    if (e) error = e;
    if (flushed == serverGroup.length) {
        callback.apply(self, [error, 'ok']);
    }
   }, 'flush');
    }
    return this;
 }

 this.stats = function(callback) {
    currentServer = 0;
    this.query('stats', callback);
    return this;
 }

 this.items = function(callback) {
    currentServer = 0;
    this.query('stats items', callback, 'items');
    return this;
 }

 this.slabs = function(callback){
    currentServer = 0;
    this.query('stats slabs', callback, 'slabs');
    return this;
 }

 this.dump = function(slab, callback){
    currentServer = 0;
    this.query('stats cachedump '+slab+' 0', callback, 'dump');
    return this;
 }

 this.list = function(callback) {
    var self = this;
    this.slabs(function(e, data){
   var items = []
    , activeSlabs = Number(data.active_slabs)
    , succeed = 0;
   for(var o in data.slabs) {
    self.dump(o, function(e, data){
        succeed++;
        if (!e) items = items.concat(data);
        if (succeed == activeSlabs) {
      callback(undefined, items);
        }
    });
   }
    });
    return this;
 }

 this.version = function(callback) {
    currentServer = 0;
    this.query('version', callback);
    return this;
 }

 this.get = function(key, callback) {
    var key = this.setKey(key);
    findServer.call(this, key);
    this.query('get ' + key, callback);
    return this;
 }

 this.gets = function(key, callback) {
    var key = this.setKey(key);
    findServer.call(this, key);
    this.query('gets ' + key, callback);
    return this;
 }

 this.storeHandler = function(){
    if (header[0] == 'stored') {
   return [undefined, header[0]];
    } else {
   return [new Error(header[0]), header[0]];
    }
 }

 this.numberHandler = function(){
    if (header[0] == 'not_found') {
   return [new Error(header[0]), header[0]];
    } else {
   return [undefined, Number(header[0])];
    }
 }

 this.addHandler = function(){
    return this.storeHandler();
 }

 this.setHandler = function(){
    return this.storeHandler();
 }

 this.replaceHandler = function(){
    return this.storeHandler();
 }

 this.appendHandler = function(){
    return this.storeHandler();
 }

 this.prependHandler = function(){
    return this.storeHandler();
 }

 this.casHandler = function(){
    return this.storeHandler();
 }

 this.incrHandler = function(){
    return this.numberHandler();
 }

 this.decrHandler = function(){
    return this.numberHandler();
 }

 this.deleteHandler = function(){
    if (header[0] == 'deleted') {
   return [undefined, header[0]];
    } else {
   return [new Error(header[0]), header[0]];
    }
 }

 this.flushHandler = function(){
    if (header[0] == 'ok') {
   return [undefined, header[0]];
    } else {
   return [new Error(header[0]), header[0]];
    }
 }

 this.statsHandler = function(){
    var data = buffer.toString().replace(/^\s*|\s*$/g, '').split(CRLF)
   , end = data.pop();
    if (end == 'END') {
   var stats = {};
   for(var i=0; i<data.length; i++) {
    var line = data[i].split(/\s+/g);
    stats[line[1]] = line[2];
   }
   return [undefined, stats];
    }
    return null;
 }

 this.itemsHandler = function(){
    var data = this.statsHandler()
   , items = {};
    if (data === null) return null;
    for(var o in data[1]) {
   var reg= /^items\:(\d+)\:(.+)$/
    , matches = reg.exec(o);
   if (!matches) continue;
   if (!items[matches[1]]) items[matches[1]] = {};
   items[matches[1]][matches[2]] = data[1][o];
    }
    return [undefined, items];
 }

 this.slabsHandler = function(){
    var data = this.statsHandler();
    if (data === null) return null;
    var activeSlabs = Number(data[1].active_slabs)
   , totalMalloced = data[1].total_malloced
   , slabs = {};
    delete(data[1]['active_slabs']);
    delete(data[1]['total_malloced']);
    for(var o in data[1]) {
   var reg = /^(\d+)\:(.+)$/
    , matches = reg.exec(o);
   if (!matches) continue;
   if (!slabs[matches[1]]) slabs[matches[1]] = {};
   slabs[matches[1]][matches[2]] = data[1][o];
    }
    return [
   undefined,
   {
    active_slabs : activeSlabs,
    total_malloced : totalMalloced,
    slabs : slabs
   }
    ];
 }

 this.dumpHandler = function(){
    var data = buffer.toString().replace(/^\s*|\s*$/g, '').split(CRLF)
   , end = data.pop();
    if (end == 'END') {
   var items = []
    , reg = /^ITEM\s([^\s]+)\s\[([^\s]+)\sb;\s([^\s]+)\ss\]$/;
   for(var i=0; i<data.length; i++) {
    var line = reg.exec(data[i]);
    if (!line) continue;
    items.push({
        key : line[1],
        size : line[2],
        expires : line[3]
    });
   }
   return [undefined, items];
    }
    return null;
 }

 this.versionHandler = function(){
    if (header[0] == 'version') {
   return [undefined, header[1]];
    } else {
   return [new Error(header[0]), header[0]];
    }
 }

 this.getHandler = function(){
    if (receiveTimes == 1) {
   //不存在此KEY
   if (header[0] == 'end') return [new Error('not_found'), ''];
   headLength = new Buffer(header.join(' ') + CRLF).length;
   bodyLength = parseInt(header[3]);
    }
    //实际buffer长度
    var bufferLength = buffer.length
   , dataLength = headLength + bodyLength + 'END'.length + 2 * CRLF.length;
    if (bufferLength >= dataLength) {
   var bodyBuffer = buffer.slice(headLength, headLength + bodyLength)
    , body = bodyBuffer.toString();
   if (header[2] == FLAG_STRING) {
    return [undefined, body, FLAG_STRING];
   } else if (header[2] == FLAG_COMPRESSED) {
    return [undefined, bodyBuffer, FLAG_COMPRESSED];
   } else if (header[2] == FLAG_BINARY) {
    return [undefined, body, FLAG_BINARY];
   } else {
    try {
        return [undefined, settings.decode(body), FLAG_ENCODE];
    } catch(e) {
        return [undefined, body, FLAG_STRING];
    }
   }
    }
    return null;
 }

 this.getsHandler = function(){
    var data = this.getHandler();
    if (data === null) return null;
    data.push(header[4]);
    return data;
 }

}

module.exports = memcached;
2 回复

坐等楼主共享~~。。

已更新, 但是consistent模式下还是不能百分之百的跟php实现统一(即100%的命中)

回到顶部