前段时间一直在研究php与nodejs的session共享问题 最初遇到的问题是一般php默认的序列化方法及反序列化使用的是serialize/unserialize, 所以需要用js来写serialize/unserialize(session_encode及session_decode只需要稍稍重写一下serialize/unserialize即可) 后面遇到的问题就是分布式memcached的时候hash算法也需要与php的hash算法保持一致性, 以及热增减服务器, 这点目前还在纠结, 在看关于consistent hashing算法的内容, 写完之后也会与大家分享
ps:本memcached模块要比NPM那个memcached模块效率高 ab 1000压测每秒要多处理100多个请求(同样的服务器, 同样的connect, 只是memcached模块替换, 取30次数据, 相信数据还是比较可信的)
add:今天打开php_memcache.dll的源代码分析, 终于得到了standard策略的算法, 也已用nodejs实现, 下一步就是要把consistent策略的算法实现出来, 这样memcache模块就完整了, 可以与php共享了.
最新更新的代码
但是consistent模式下还是不能百分之百的跟php实现统一(即100%的命中)
欢迎拍砖
var net = require('net')
, crypto = require('crypto')
, zlib = require('zlib')
, CRLF = "\r\n"
, FLAG_STRING = 0
, FLAG_ENCODE = 1
, FLAG_COMPRESSED = 2
, FLAG_BINARY = 3
, CONSISTENT_POINTS = 160
, CONSISTENT_BUCKETS = 1024, fs = require('fs');
function utf8_encode (argString) {
if (argString === null || typeof argString === "undefined") {
return "";
}
var string = (argString + '');
var utftext = '',
start, end, stringl = 0;
start = end = 0;
stringl = string.length;
for (var n = 0; n < stringl; n++) {
var c1 = string.charCodeAt(n);
var enc = null;
if (c1 < 128) {
end++;
} else if (c1 > 127 && c1 < 2048) {
enc = String.fromCharCode((c1 >> 6) | 192, (c1 & 63) | 128);
} else {
enc = String.fromCharCode((c1 >> 12) | 224, ((c1 >> 6) & 63) | 128, (c1 & 63) | 128);
}
if (enc !== null) {
if (end > start) {
utftext += string.slice(start, end);
}
utftext += enc;
start = end = n + 1;
}
}
if (end > start) {
utftext += string.slice(start, stringl);
}
return utftext;
}
function md5(str) {
var md5sum = crypto.createHash('md5');
md5sum.update(String(str));
return md5sum.digest('hex');
}
function crc32 (str) {
str = utf8_encode(str);
var table = [
"00000000", "77073096", "EE0E612C", "990951BA",
"076DC419", "706AF48F", "E963A535", "9E6495A3",
"0EDB8832", "79DCB8A4", "E0D5E91E", "97D2D988",
"09B64C2B", "7EB17CBD", "E7B82D07", "90BF1D91",
"1DB71064", "6AB020F2", "F3B97148", "84BE41DE",
"1ADAD47D", "6DDDE4EB", "F4D4B551", "83D385C7",
"136C9856", "646BA8C0", "FD62F97A", "8A65C9EC",
"14015C4F", "63066CD9", "FA0F3D63", "8D080DF5",
"3B6E20C8", "4C69105E", "D56041E4", "A2677172",
"3C03E4D1", "4B04D447", "D20D85FD", "A50AB56B",
"35B5A8FA", "42B2986C", "DBBBC9D6", "ACBCF940",
"32D86CE3", "45DF5C75", "DCD60DCF", "ABD13D59",
"26D930AC", "51DE003A", "C8D75180", "BFD06116",
"21B4F4B5", "56B3C423", "CFBA9599", "B8BDA50F",
"2802B89E", "5F058808", "C60CD9B2", "B10BE924",
"2F6F7C87", "58684C11", "C1611DAB", "B6662D3D",
"76DC4190", "01DB7106", "98D220BC", "EFD5102A",
"71B18589", "06B6B51F", "9FBFE4A5", "E8B8D433",
"7807C9A2", "0F00F934", "9609A88E", "E10E9818",
"7F6A0DBB", "086D3D2D", "91646C97", "E6635C01",
"6B6B51F4", "1C6C6162", "856530D8", "F262004E",
"6C0695ED", "1B01A57B", "8208F4C1", "F50FC457",
"65B0D9C6", "12B7E950", "8BBEB8EA", "FCB9887C",
"62DD1DDF", "15DA2D49", "8CD37CF3", "FBD44C65",
"4DB26158", "3AB551CE", "A3BC0074", "D4BB30E2",
"4ADFA541", "3DD895D7", "A4D1C46D", "D3D6F4FB",
"4369E96A", "346ED9FC", "AD678846", "DA60B8D0",
"44042D73", "33031DE5", "AA0A4C5F", "DD0D7CC9",
"5005713C", "270241AA", "BE0B1010", "C90C2086",
"5768B525", "206F85B3", "B966D409", "CE61E49F",
"5EDEF90E", "29D9C998", "B0D09822", "C7D7A8B4",
"59B33D17", "2EB40D81", "B7BD5C3B", "C0BA6CAD",
"EDB88320", "9ABFB3B6", "03B6E20C", "74B1D29A",
"EAD54739", "9DD277AF", "04DB2615", "73DC1683",
"E3630B12", "94643B84", "0D6D6A3E", "7A6A5AA8",
"E40ECF0B", "9309FF9D", "0A00AE27", "7D079EB1",
"F00F9344", "8708A3D2", "1E01F268", "6906C2FE",
"F762575D", "806567CB", "196C3671", "6E6B06E7",
"FED41B76", "89D32BE0", "10DA7A5A", "67DD4ACC",
"F9B9DF6F", "8EBEEFF9", "17B7BE43", "60B08ED5",
"D6D6A3E8", "A1D1937E", "38D8C2C4", "4FDFF252",
"D1BB67F1", "A6BC5767", "3FB506DD", "48B2364B",
"D80D2BDA", "AF0A1B4C", "36034AF6", "41047A60",
"DF60EFC3", "A867DF55", "316E8EEF", "4669BE79",
"CB61B38C", "BC66831A", "256FD2A0", "5268E236",
"CC0C7795", "BB0B4703", "220216B9", "5505262F",
"C5BA3BBE", "B2BD0B28", "2BB45A92", "5CB36A04",
"C2D7FFA7", "B5D0CF31", "2CD99E8B", "5BDEAE1D",
"9B64C2B0", "EC63F226", "756AA39C", "026D930A",
"9C0906A9", "EB0E363F", "72076785", "05005713",
"95BF4A82", "E2B87A14", "7BB12BAE", "0CB61B38",
"92D28E9B", "E5D5BE0D", "7CDCEFB7", "0BDBDF21",
"86D3D2D4", "F1D4E242", "68DDB3F8", "1FDA836E",
"81BE16CD", "F6B9265B", "6FB077E1", "18B74777",
"88085AE6", "FF0F6A70", "66063BCA", "11010B5C",
"8F659EFF", "F862AE69", "616BFFD3", "166CCF45",
"A00AE278", "D70DD2EE", "4E048354", "3903B3C2",
"A7672661", "D06016F7", "4969474D", "3E6E77DB",
"AED16A4A", "D9D65ADC", "40DF0B66", "37D83BF0",
"A9BCAE53", "DEBB9EC5", "47B2CF7F", "30B5FFE9",
"BDBDF21C", "CABAC28A", "53B39330", "24B4A3A6",
"BAD03605", "CDD70693", "54DE5729", "23D967BF",
"B3667A2E", "C4614AB8", "5D681B02", "2A6F2B94",
"B40BBE37", "C30C8EA1", "5A05DF1B", "2D02EF8D"
];
var crc = 0;
var x = 0;
var y = 0;
crc = crc ^ (-1);
for (var i = 0, iTop = str.length; i < iTop; i++) {
y = (crc ^ str.charCodeAt(i)) & 0xFF;
x = "0x" + table[y];
crc = (crc >>> 8) ^ x;
}
return crc ^ (-1);
}
function memcached(config) {
var currentServer
, buffer
, header
, cmd
, callback
, receiveTimes
, headLength
, bodyLength
, settings = {}
, config = config || {}
, defaults = {
encode : JSON.stringify,
decode : JSON.parse,
maxRetryTimes : 5,
retryDelay : 10,
consistent : false
}
, serverGroup = []
, connections = []
, serverStatus = []
, serverWeight = []
, serverNodes = []
, serverBuckets = []
, bucketPoped = false
, queryStarted = false//表示当前为空闲状态可以处理
, queue = [];
for(var o in defaults) {
if (config[o] == undefined) {
settings[o] = defaults[o];
} else {
settings[o] = config[o];
}
}
function findServerStandard(key) {
var hash = (crc32(key) >> 16) & 0x7fff
, server = serverWeight[hash % (serverWeight.length)];
return server;
}
function findServerConsistent(key) {
if (!bucketPoped) {
var step = Math.floor(0xffffffff / CONSISTENT_BUCKETS)
, nodesLength = serverNodes.length;
serverNodes = serverNodes.sort(function(a, b){
if (a.point < b.point) {
return -1;
} else if (a.point > b.point) {
return 1;
} else {
return 0;
}
});
for(var i=0; i<CONSISTENT_BUCKETS; i++) {
var point = step * i
, low = 0
, high = nodesLength - 1
, mid
, bucket;
while(true) {
if (point <= serverNodes[low].point || point > serverNodes[high].point) {
bucket = serverNodes[low].server;
break;
}
//mid = Math.floor(low + (high - low) / 2);
mid = Math.floor(low + (high - low) * (point - serverNodes[low].point) / (serverNodes[high].point - serverNodes[low].point));
if (point <= serverNodes[mid].point && point > (mid ? serverNodes[mid-1] : 0)) {
bucket = serverNodes[mid].server;
break;
}
if (serverNodes[mid].point < point) {
low = mid + 1;
} else {
high = mid - 1;
}
}
serverBuckets.push(bucket);
}
bucketPoped = true;
}
var hash = crc32(key);
if (hash < 0) {
hash = 0xffffffff + hash;
}
return serverBuckets[hash % CONSISTENT_BUCKETS];
}
function addServerStandard(host, port, weight) {
serverGroup.push({host : host, port : port});
connections.push(null);
serverStatus.push(false);
for(var i=0; i<weight; i++) {
serverWeight.push(serverGroup.length - 1);
}
}
function addServerConsistent(host, port, weight) {
serverGroup.push({host : host, port : port});
connections.push(null);
serverStatus.push(false);
var points = weight * CONSISTENT_POINTS
, serverSeq = serverGroup.length - 1;
for(var i=0; i<points; i++) {
var key = host + ':' + port + '-' + i
, hash = crc32(key);
if (hash < 0) {
hash = 0xffffffff + hash;
}
serverNodes.push({
server : serverSeq,
point : hash
});
}
bucketPoped = false;
}
function findServer(key) {
if (serverGroup.length == 1) {
currentServer = 0;
} else {
if (!settings.consistent) {
currentServer = findServerStandard.call(this, key);
} else {
currentServer = findServerConsistent.call(this, key);
}
}
return currentServer;
}
function write(data) {
queryStarted = true;
connections[currentServer].write(data);
}
function init() {
buffer = new Buffer('');
header = [];
cmd = null;
callback = null;
receiveTimes = 0;
headLength = 0;
bodyLength = 0;
if (queue.length > 0) {
var next = queue[0];
currentServer = next.server;
write(next.data + CRLF);
} else {
queryStarted = false;
}
}
this.addServer = function(host, port, weight) {
var host = host || '127.0.0.1'
, port = port || 11211
, weight;
if (!weight || weight < 0) weight = 1;
if (!settings.consistent) {
addServerStandard.call(this, host, port, weight);
} else {
addServerConsistent.call(this, host, port, weight);
}
return this;
}
this.query = function(data, callback, cmd, times) {
if (serverGroup.length == 0) return;
var conn = connections[currentServer]
, server = serverGroup[currentServer]
, host = server.host
, port = server.port
, connected = serverStatus[currentServer]
, idx = data.indexOf(' ')
, cmd = cmd || (idx == -1 ? data : data.substr(0, idx)).toLowerCase()
, self = this
, times = times || 0;
queue.push({
server : currentServer,
cmd : cmd,
data : data,
callback : callback
});
if (!connected) {
if (conn) return;
conn = net.connect(port, host);
conn.on('connect', function(){
serverStatus[currentServer] = true;
if (!queryStarted) {
init();
}
});
conn.on('data', function(data){
self.receive(data);
});
conn.on('error', function(e){
connections[currentServer] = null;
serverStatus[currentServer] = false;
if (times >= settings.maxRetryTimes) {
throw e;
}
setTimeout(function(){
self.query(data, callback, cmd, times+1);
}, settings.retryDelay);
});
connections[currentServer] = conn;
} else {
if (!queryStarted) {
init();
}
}
}
this.store = function(cmd, key, value, callback, lifetime, ver) {
var key = this.setKey(key);
findServer.call(this, key);//选择服务器
if (typeof callback != 'function') {
lifetime = callback;
callback = null;
}
if (typeof value == 'number') {
val = value.toString();
flags = FLAG_STRING;
} else if (Buffer.isBuffer(value)) {
val = value;
flags = FLAG_BINARY;
} else if (typeof value == 'string') {
val = value.toString();
flags = FLAG_STRING;
} else {
try {
val = settings.encode(value);
flags = FLAG_ENCODE;
} catch(e) {
try {
val = value.toString();
} catch(e) {
val = '';
}
flags = FLAG_STRING;
}
}
var flags = flags || FLAG_STRING
, lifetime = lifetime || 0
, buffer = flags == FLAG_BINARY ? val : new Buffer(val)
, bufferLength = buffer.length || 0
, query = [cmd, key, flags, lifetime, bufferLength];
if (ver !== undefined) query.push(ver);
this.query(query.join(' ') + CRLF + val, callback);
}
this.receive = function(data) {
var self = this;
if (receiveTimes == 0) {
var last = queue.shift();
cmd = last.cmd;
callback = last.callback;
}
buffer = Buffer.concat([buffer, data]);
receiveTimes++;
var lines = buffer.toString().split(CRLF);
header = lines[0].split(/\s/g);
for(var i=0; i<header.length; i++) {
header[i] = header[i].toLowerCase();
}
result = this[cmd+'Handler']();
if (result === null || result === undefined) return;
try {
if (result[2] == FLAG_COMPRESSED) {
var fn = callback;
zlib.unzip(result[1], function(e, buf){
if (e) {
result[0] = e;
}
result[1] = buf.toString();
fn.apply(self, result);
});
} else {
callback.apply(this, result);
}
} catch(e) {
}
init();
}
this.close = function(){
var self = this;
for(var i=0; i<serverGroup.length; i++) {
if (!connections[i]) continue;
connections[i].end();
connections[i].destroy();
connections[i] = null;
serverStatus[i] = false;
}
serverGroup = connections = serverStatus = queue = [];
return this;
}
this.setKey = function(key) {
return key.toString().substr(0, 250).replace(/\s/g, '_');
}
this.add = function(key, value, callback, lifetime) {
this.store('add', key, value, callback, lifetime);
return this;
}
this.set = function(key, value, callback, lifetime) {
this.store('set', key, value, callback, lifetime);
return this;
}
this.replace = function(key, value, callback, lifetime) {
this.store('replace', key, value, callback, lifetime);
return this;
}
this.append = function(key, value, callback) {
this.store('append', key, value, callback, 0);
return this;
}
this.prepend = function(key, value, callback) {
this.store('prepend', key, value, callback, 0);
return this;
}
this.cas = function(key, value, ver, callback, lifetime) {
this.store('cas', key, value, callback, lifetime, ver);
return this;
}
this.incr = function(key, value, callback) {
findServer.call(this, key);
this.query('incr ' + key + ' ' + value, callback);
return this;
}
this.decr = function(key, value, callback) {
findServer.call(this, key);
this.query('decr ' + key + ' ' + value, callback);
return this;
}
this.delete = function(key, callback) {
var key = this.setKey(key);
findServer.call(this, key);
this.query('delete ' + key, callback);
return this;
}
this.flush = function(callback) {
var flushed = 0
, self = this
, error = undefined;
for(var i=0; i<serverGroup.length; i++) {
currentServer = i;
this.query('flush_all', function(e, data){
flushed++;
if (e) error = e;
if (flushed == serverGroup.length) {
callback.apply(self, [error, 'ok']);
}
}, 'flush');
}
return this;
}
this.stats = function(callback) {
currentServer = 0;
this.query('stats', callback);
return this;
}
this.items = function(callback) {
currentServer = 0;
this.query('stats items', callback, 'items');
return this;
}
this.slabs = function(callback){
currentServer = 0;
this.query('stats slabs', callback, 'slabs');
return this;
}
this.dump = function(slab, callback){
currentServer = 0;
this.query('stats cachedump '+slab+' 0', callback, 'dump');
return this;
}
this.list = function(callback) {
var self = this;
this.slabs(function(e, data){
var items = []
, activeSlabs = Number(data.active_slabs)
, succeed = 0;
for(var o in data.slabs) {
self.dump(o, function(e, data){
succeed++;
if (!e) items = items.concat(data);
if (succeed == activeSlabs) {
callback(undefined, items);
}
});
}
});
return this;
}
this.version = function(callback) {
currentServer = 0;
this.query('version', callback);
return this;
}
this.get = function(key, callback) {
var key = this.setKey(key);
findServer.call(this, key);
this.query('get ' + key, callback);
return this;
}
this.gets = function(key, callback) {
var key = this.setKey(key);
findServer.call(this, key);
this.query('gets ' + key, callback);
return this;
}
this.storeHandler = function(){
if (header[0] == 'stored') {
return [undefined, header[0]];
} else {
return [new Error(header[0]), header[0]];
}
}
this.numberHandler = function(){
if (header[0] == 'not_found') {
return [new Error(header[0]), header[0]];
} else {
return [undefined, Number(header[0])];
}
}
this.addHandler = function(){
return this.storeHandler();
}
this.setHandler = function(){
return this.storeHandler();
}
this.replaceHandler = function(){
return this.storeHandler();
}
this.appendHandler = function(){
return this.storeHandler();
}
this.prependHandler = function(){
return this.storeHandler();
}
this.casHandler = function(){
return this.storeHandler();
}
this.incrHandler = function(){
return this.numberHandler();
}
this.decrHandler = function(){
return this.numberHandler();
}
this.deleteHandler = function(){
if (header[0] == 'deleted') {
return [undefined, header[0]];
} else {
return [new Error(header[0]), header[0]];
}
}
this.flushHandler = function(){
if (header[0] == 'ok') {
return [undefined, header[0]];
} else {
return [new Error(header[0]), header[0]];
}
}
this.statsHandler = function(){
var data = buffer.toString().replace(/^\s*|\s*$/g, '').split(CRLF)
, end = data.pop();
if (end == 'END') {
var stats = {};
for(var i=0; i<data.length; i++) {
var line = data[i].split(/\s+/g);
stats[line[1]] = line[2];
}
return [undefined, stats];
}
return null;
}
this.itemsHandler = function(){
var data = this.statsHandler()
, items = {};
if (data === null) return null;
for(var o in data[1]) {
var reg= /^items\:(\d+)\:(.+)$/
, matches = reg.exec(o);
if (!matches) continue;
if (!items[matches[1]]) items[matches[1]] = {};
items[matches[1]][matches[2]] = data[1][o];
}
return [undefined, items];
}
this.slabsHandler = function(){
var data = this.statsHandler();
if (data === null) return null;
var activeSlabs = Number(data[1].active_slabs)
, totalMalloced = data[1].total_malloced
, slabs = {};
delete(data[1]['active_slabs']);
delete(data[1]['total_malloced']);
for(var o in data[1]) {
var reg = /^(\d+)\:(.+)$/
, matches = reg.exec(o);
if (!matches) continue;
if (!slabs[matches[1]]) slabs[matches[1]] = {};
slabs[matches[1]][matches[2]] = data[1][o];
}
return [
undefined,
{
active_slabs : activeSlabs,
total_malloced : totalMalloced,
slabs : slabs
}
];
}
this.dumpHandler = function(){
var data = buffer.toString().replace(/^\s*|\s*$/g, '').split(CRLF)
, end = data.pop();
if (end == 'END') {
var items = []
, reg = /^ITEM\s([^\s]+)\s\[([^\s]+)\sb;\s([^\s]+)\ss\]$/;
for(var i=0; i<data.length; i++) {
var line = reg.exec(data[i]);
if (!line) continue;
items.push({
key : line[1],
size : line[2],
expires : line[3]
});
}
return [undefined, items];
}
return null;
}
this.versionHandler = function(){
if (header[0] == 'version') {
return [undefined, header[1]];
} else {
return [new Error(header[0]), header[0]];
}
}
this.getHandler = function(){
if (receiveTimes == 1) {
//不存在此KEY
if (header[0] == 'end') return [new Error('not_found'), ''];
headLength = new Buffer(header.join(' ') + CRLF).length;
bodyLength = parseInt(header[3]);
}
//实际buffer长度
var bufferLength = buffer.length
, dataLength = headLength + bodyLength + 'END'.length + 2 * CRLF.length;
if (bufferLength >= dataLength) {
var bodyBuffer = buffer.slice(headLength, headLength + bodyLength)
, body = bodyBuffer.toString();
if (header[2] == FLAG_STRING) {
return [undefined, body, FLAG_STRING];
} else if (header[2] == FLAG_COMPRESSED) {
return [undefined, bodyBuffer, FLAG_COMPRESSED];
} else if (header[2] == FLAG_BINARY) {
return [undefined, body, FLAG_BINARY];
} else {
try {
return [undefined, settings.decode(body), FLAG_ENCODE];
} catch(e) {
return [undefined, body, FLAG_STRING];
}
}
}
return null;
}
this.getsHandler = function(){
var data = this.getHandler();
if (data === null) return null;
data.push(header[4]);
return data;
}
}
module.exports = memcached;