简介 Json Messaging是使用node.js技术构建的发布/订阅类型的消息服务器,具有如下特性: 1、支持TCP和WebSocket协议; 2、传输帧使用Json格式; 3、可以使用正则表达式订阅消息目的地,正则表达式中可以包含“捕获”,所有目的地匹配该正则表达式的消息,连同目的地的“捕获”都将发送到订阅方; 4、一个客户端可以订阅多个消息目的地; 5、为了简化设计,服务器端不持久化消息。 项目 https://sourceforge.net/projects/jsonmessaging/ 下载 https://sourceforge.net/projects/jsonmessaging/files/ 致谢 Json Messaging消息服务器使用了很多第三方的框架和技术,感谢他们辛勤的工作。 Json:http://www.json.org node.js:http://nodejs.org node-uuid:https://github.com/broofa/node-uuid WebSocket-Node:https://github.com/Worlize/WebSocket-Node 使用方法 1、编译安装node.js; 2、打开server/config.js,可以配置TCP和WebSocket端口; 3、启动消息服务器:node server/server.js。 例子 使用最新版本的Firefox或者Chrome打开下面的HTML文件可以发送和接收消息。 <!DOCTYPE html>
Json Messaging Example
div#output {
border: 1px solid #000;
width: 960px;
height: 450px;
overflow: auto;
background-color: #333;
color: #6cf;
}</p>
strong {
color: #f66;
}
input#input {
border: 1px solid #000;
width: 640px;
}
button {
border: 1px solid #000;
width: 100px;
}
</style>
<script>
// connect to the Json Messaging server and return an ‘connection’ object
function connect(host, port, messageListener, errorListener) {
window.WebSocket = window.WebSocket || window.MozWebSocket;
if (!window.WebSocket) {
alert('Your browser does not support WebSocket.');
return null;
}
var connection = new WebSocket('ws://' + host + ':' + port);
connection.onmessage = function(message) {
try {
var parsed = JSON.parse(message.data);
switch (parsed.type) {
case 'message':
if (messageListener) {
messageListener(parsed.content, parsed.match);
}
break;
case 'error':
if (errorListener) {
errorListener(parsed.content);
}
break;
default:
throw new Error('Unknown message type ' + parsed.type);
break;
}
} catch (e) {
console.warn(e);
alert(e);
}
};
connection.publish = function(content, destination) {
connection.send(JSON.stringify({
type: 'publish',
destination: destination,
content: content
}));
};
connection.subscribe = function(destination) {
connection.send(JSON.stringify({
type: 'subscribe',
destination: destination
}));
};
connection.unsubscribe = function(destination) {
connection.send(JSON.stringify({
type: 'unsubscribe',
destination: destination
}));
};
return connection;
}
// the 'connection' object
var connection = null;
var output = null;
var input = null;
// initialize
window.onload = function() {
output = document.getElementById('output');
input = document.getElementById('input');
// connect to the local server
connection = connect(
'localhost',
8155,
// message handler
function(content, match) {
output.innerHTML += ('<strong>Message: </strong>' + content + '<br>\n');
},
// error handler
function(content) {
output.innerHTML += ('<strong>Error: </strong>' + content + '<br>\n');
}
);
// subscribe a topic
connection.onopen = function() {
connection.subscribe('test');
};
};
function _send() {
connection.publish(input.value, 'test');
}
function _clear() {
output.innerHTML = '';
}
</script>
Send Clear
下面的C程序发送三条HelloWorld消息,第一条是英文,第二条是中文,第三条是Unicode转义的中文。注意源代码必须以UTF-8编码保存。
include
include
include
include
include
include
include
include
int main(int argc, char ** argv) { int fd; struct sockaddrin addr; int ret; const char publishframe1[] = "{\"type\":\"publish\",\"destination\":\"test\",\"content\":\"Hello World\"}"; const char publishframe2[] = "{\"type\":\"publish\",\"destination\":\"test\",\"content\":\"你好世界\"}"; const char publishframe_3[] = "{\"type\":\"publish\",\"destination\":\"test\",\"content\":\"\u4f60\u597d\u4e16\u754c\"}";
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(“127.0.0.1”);
addr.sin_port = htons(8153);
ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
printf("%d\n", ret);
ret = write(fd, publish_frame_1, sizeof(publish_frame_1));
printf("%d\n", ret);
ret = write(fd, publish_frame_2, sizeof(publish_frame_2));
printf("%d\n", ret);
ret = write(fd, publish_frame_3, sizeof(publish_frame_3));
printf("%d\n", ret);
} 帧格式 消息服务器的应用层数据帧采用Json格式,使用UTF-8编码的纯文本,在TCP协议中,使用'\0'作为帧间分隔,在WebSocket协议中遵循WebSocket草案标准。 帧格式有5类,其中,客户端到服务器端的3类,服务器端到客户端的2类。 客户端到服务器端 发布帧 客户端发送一条消息到服务器的目的地中,所有连接到服务器并且订阅了该目的地(正则表达式匹配)的客户端都能接收到该消息。 帧格式为: { "type": "publish", "destination": <消息目的地>, "content": <消息内容> } 其中,消息目的地为字符串类型;消息内容同样也必须是Json格式的。 订阅帧 客户端订阅服务器的一个目的地,所有匹配该目的地的消息都会发送到该客户端。 帧格式为: { "type": "subscribe", "destination": <消息目的地> } 其中,消息目的地可以是正则表达式,且正则表达式中可以含有“捕获”,服务器会使用该正则表达式匹配发送的消息目的地,如果符合,则会把该消息连同匹配结果一并发给客户端,在下面的“消息帧”介绍中有具体的例子。 同一个客户端可以订阅多个目的地。 取消订阅帧 客户端取消订阅服务器的一个目的地。 帧格式为: { "type": "subscribe", "destination": <消息目的地> } 其中,消息目的地等于订阅帧中的消息目的地。 当客户端断开连接后,服务器端会自动取消该客户端的所有订阅。 服务器端到客户端 消息帧 一旦消息目的地匹配,服务器端会把匹配结果连同消息内容发给客户端。 帧格式为: { "type": "message", "match": <匹配结果>, "content": <消息内容> } 匹配结果为一个数组,至少包含一个元素,即订阅的消息目的地;如果订阅的消息目的地是正则表达式且其中含有“捕获”,那么从第二往后的元素为捕获结果,参考JavaScript 正则表达式规范。 举例: 假设设备的网口状态信息在消息服务中发布,规定目的地格式为:“/devices/<设备名>/<网口名>”;消息内容为:“down”表示停止、“up”表示启动。 下面两个发布帧,表示设备a的第1个网口停止了,而设备b的第0个网口启动了: {"type":"publish","destination":"/devices/a/if1","content":"down"} {"type":"publish","destination":"/devices/b/if0","content":"up"} 如果客户端订阅目的地为“/devices/.”,那么它将能收到所有设备的所有网口的状态消息,接收到的消息帧如下: {"type":"message","match":["/devices/a/if1"],"content":"down"} {"type":"message","match":["/devices/b/if0"],"content":"up"} 如果想在程序中更方便地对设备和网口做分类处理,可以把订阅目的地改为“/devices/(.)/(.*)”,其中小括号即为“捕获”。 接收到的消息帧会变为: {"type":"message","match":["/devices/a/if1","a","if1"],"content":"down"} {"type":"message","match":["/devices/b/if0","b","if0"],"content":"up"} 可以看到,match中增加了捕获的结果。 错误帧 如果服务器端产生错误,例如客户端发送的帧超长、非Json格式等,将会向客户端返回错误帧。 帧格式为: { "type": "error", "content": <错误内容> } 客户端可以对错误进行相应的处理。 源代码结构 server.js 程序入口。 config.js 全局配置,其中的udpPort并没有使用,因为UDP难以知晓客户端状态,所以不打算实现UDP协议。 log.js 控制台日志,相比其它第三方的日志模块的特点是使用简单,而且能够输出日志产生的源代码的位置,便于调试。 protocol.js 协议帧的包装。 exchange.js 负责处理发布和订阅的消息,是服务器代码的核心部分。 tcp.js TCP协议的实现。 ws.js WebSocket协议的实现。