本文通过开发一个基于web的dns查询api接口来介绍一下如何利用rabbitmq解耦web接口与后台处理逻辑以及实现负载均衡。利用nodejs搭建了一个web服务器,接收客户端发送的json请求(包含一个域名string),web服务器将请求传递到rabbitmq消息队列中,队列的消费者为一个python程序,用来处理dns查询,并将程序返回给指定的消息队列中,回传给服务器,服务器接收到消息后返回给客户端。
Web服务器端代码
首先使用npm init -y
创建package.json文件,执行 npm install express body-parser amqplib --save
安装所需的依赖。我在使用NODEJS开发RABBITMQ程序中介绍了如何搭建rabbitmq的开发环境以及一些基本的环境配置,和使用docker来快速启动一个rabbitmq实例的内容。
服务器端代码如下:
全部的服务器端代码如下,之后我们会详细的介绍每个部分的内容。
var express = require('express');
var app = express();
var bodyParser = require('body-parser');
var formatMessage=function(message,err){
return {"Message":message||"","Error":err||""+""};
}
// amqp code start point
var amqp = require('amqplib/callback_api');
var amqpConn= null;
var pubChannel = null;
function startConnRabbitMQ(){
amqp.connect("amqp://mike:123456@localhost", function(err, conn) {
if(err){
console.log("[x]Error:"+err);
return
}
conn.on("error",function(err){
if(err.message!=="Connection closing"){
console.log("[x]Error:"+err);
return
}
});
conn.on("close",function(){
console.log("[x]Rabbitmq is closed ,trying restart connect after 1s...")
return setTimeout(startConnRabbitMQ,1000);
});
amqpConn=conn;
console.log("[*]Rabbitmq connect is ready now!");
startChannel();
});
}
function startChannel(){
amqpConn.createConfirmChannel(function(err,ch){
if(err){
console.log("[x]Error:"+err);
return
}
ch.on("error",function(err){
console.log("[x]Error:"+err);
return
});
ch.on("close",function(){
console.log("[*]Rabbitmq channel is closed")
return
});
ch.assertQueue("dns_queue", {durable: false});
pubChannel=ch;
console.log("[*]Rabbitmq channel is ready now!")
})
}
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
startConnRabbitMQ();
// parse application/x-www-form-urlencoded
app.use(bodyParser.urlencoded({ extended: false }));
// parse application/json
app.use(bodyParser.json());
app.get("/dns",function(req,res){
console.log("Get request for dns");
});
app.post("/dns",function(req,res){
// res.json(req.body.domain);
if(typeof req.body.domain==='undefined'){
res.status(400).json({"Error":"No domain data was posted!"});
return
}
// 传递给后端的rabbitmq服务器
try {
pubChannel.assertQueue('', {exclusive: true}, function(err, q) {
var corr = generateUuid();
var domain = req.body.domain;
console.log(' [x] Requesting dns for (%s)', domain);
pubChannel.consume(q.queue, function(msg) {
if (msg.properties.correlationId == corr) {
var response_data=msg.content.toString();
console.log(' [.] Got %s',response_data);
res.status(200).json({"Message":response_data});
}
}, {noAck: true});
pubChannel.sendToQueue('dns_queue',
new Buffer(domain),
{ correlationId: corr, replyTo: q.queue });
});
} catch (e) {
console.log(e)
res.status(500).json({"Error":e})
return
}
});
var server= app.listen(5000,function(){
var host = server.address().address;
var port = server.address().port;
console.log('DNS App is listening at http://%s:%s', host, port);
});
代码中的startConnRabbitMQ()函数中调用amqp库来连接我们的rabbitmq服务器,注意这里的连接参数中需要依赖于你自己的环境设置,基本上规则按照amqp://用户名:密码@服务器:端口/虚拟路由
的方式声明,如果按照默认设置则直接为amqp://localhost即可。由于amqp库基于了node的event机制开发,我们可以绑定基本的事件,比如关闭时操作(这里我们设置断开重连),出错的时候的操作。 一切处理完毕后将连接实例赋予全局变量amqpConn。
function startConnRabbitMQ(){
amqp.connect("amqp://mike:123456@localhost", function(err, conn) {
if(err){
console.log("[x]Error:"+err);
return
}
conn.on("error",function(err){
if(err.message!=="Connection closing"){
console.log("[x]Error:"+err);
return
}
});
conn.on("close",function(){
console.log("[x]Rabbitmq is closed ,trying restart connect after 1s...")
return setTimeout(startConnRabbitMQ,1000);
});
amqpConn=conn;
console.log("[*]Rabbitmq connect is ready now!");
startChannel();
});
}
其中的startChannel()函数用来创建一个channel,通过建立channel来复用一个tcp连接,允许我们在一个程序中定义多个channel来完成数据传递,而不用绑定多个tcp连接。利用createConfirmChannel来创建一个带有确认回复的通道,并绑定基本的错误和关闭事件。我们在其中声明了dns_queue队列,其中设置durable为false,代表不关心是否丢失数据(rabbitmq重启或者出错的时候)
function startChannel(){
amqpConn.createConfirmChannel(function(err,ch){
if(err){
console.log("[x]Error:"+err);
return
}
ch.on("error",function(err){
console.log("[x]Error:"+err);
return
});
ch.on("close",function(){
console.log("[*]Rabbitmq channel is closed")
return
});
ch.assertQueue("dns_queue", {durable: false});
pubChannel=ch;
console.log("[*]Rabbitmq channel is ready now!")
})
}
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
startConnRabbitMQ();
generateUuid()是一个uuid生成的函数,我们也可以使用node的uuid库来直接生成,调用startConnRabbitMQ()启动服务器去连接rabbitmq,创建必要的channel.
// parse application/x-www-form-urlencoded
app.use(bodyParser.urlencoded({ extended: false }));
// parse application/json
app.use(bodyParser.json());
app.get("/dns",function(req,res){
console.log("Get request for dns");
});
上述代码我们利用body-parse中间件来管理post的json数据,设置路由get和post数据到指定的路由上做进一步的处理操作。
app.post("/dns",function(req,res){
// res.json(req.body.domain);
if(typeof req.body.domain==='undefined'){
res.status(400).json({"Error":"No domain data was posted!"});
return
}
当有数据传递进来的时候我们进行简单的判断是否为符合要求的操作(json数据包含domain域数据),传递到后端的服务器中的数据确保是经过过滤操作的。
// 传递给后端的rabbitmq服务器
pubChannel.assertQueue('', {exclusive: true}, function(err, q) {
var corr = generateUuid();
var domain = req.body.domain;
console.log(' [x] Requesting dns for (%s)', domain);
pubChannel.consume(q.queue, function(msg) {
if (msg.properties.correlationId == corr) {
var response_data=msg.content.toString();
console.log(' [.] Got %s',response_data);
res.status(200).json({"Message":response_data});
}
}, {noAck: true});
...
});
pubChannel包含了我们之前连接时创建的channel对象,我们在这个channel上声明一个队列,这个队列是一个带有随机名称的队列(第一个参数queue为空,此处为创建一个新的带有随机值名称的队列),设置exclusive为true,设置该队列仅仅限于此次连接使用,不允许其他来访问该队列。声明完毕后的回调函数中包含了q代表了该队列的信息。本地的channel利用pubChannel.consume()函数在刚才我们信创建的队列上监听数据,因为我们直到数据如果处理完毕肯定会发送到这个新的队列上来。
pubChannel.sendToQueue('dns_queue',
new Buffer(domain),
{ correlationId: corr, replyTo: q.queue });
});
} catch (e) {
console.log(e)
res.status(500).json({"Error":e})
return
}
本地的channel利用pubChannel.consume()函数在刚才我们信创建的队列上监听数据,因为我们直到数据如果处理完毕肯定会发送到这个新的队列上来。
本地的channel将域名信息以及返回数据需要发送的新的队列利用sendToQueue函数传递到rabbitmq中。这里的辅助信息中包含了{ correlationId: corr, replyTo: q.queue }); 一个uuid和一个新创建的队列内容
启动web服务器:
$ node app.js
DNS App is listening at http://:::5000
[*]Rabbitmq connect is ready now!
[*]Rabbitmq channel is ready now!
后台服务代码dns-worker.py
客户端通过rabbitmq来获得队列上的请求,请求将利用dnspython包进行处理,获得域名对应的服务器ip地址,客户端的运行环境包括pika(pip install pika
)处理rabbitmq的通信,利用dnspython(pip install dnspython
)来处理dns请求。整个的后台处理代码如下:
#!/usr/bin/env python
import pika
import json
import dns.resolver #import the module
myResolver = dns.resolver.Resolver() #create a new instance named 'myResolver'
parameters = pika.URLParameters('amqp://mike:123456@localhost')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='dns_queue')
def getDnsResponse(domain):
myAnswers = myResolver.query(domain, "A")
return myAnswers[0]
def on_request(ch, method, props, body):
domain = str(body)
print(" [.] DNS request:(%s)" % domain)
response = getDnsResponse(domain)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='dns_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
后台代码比较简单,代码分析如下
import dns.resolver #import the module
myResolver = dns.resolver.Resolver() #create a new instance named 'myResolver'
获得一个resolver来管理后续的dns查询请求(务必先安装dnspython包)
parameters = pika.URLParameters('amqp://mike:123456@localhost')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='dns_queue')
这里创建一个rabbitmq的连接,同样需要创建channel,并且声明一个dns_queue队列。
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='dns_queue')
这里我们声明了qos参数为prefetch_count为1,代表该channel当本地有一个请求尚未处理完的时候不会再发送到这里来,防止堆积。设定on_request为处理函数,一旦接到消息的时候,调用该函数来处理请求。
def on_request(ch, method, props, body):
domain = str(body)
print(" [.] DNS request:(%s)" % domain)
response = getDnsResponse(domain)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
这里我们的on_request函数包含了四个参数,第一个为channel,第二个为method,包含了我们下面需要的delivery_tag参数,代表需要确认的消息的序列号。props包含了我们之前设定的两个参数correlation_id,和reply_to。利用这些参数我们可以将处理后的消息路由到指定的队列上并使用basic_ack确认该消息已被处理。
启动后台服务器:
$ python dns-worker.py
[x] Awaiting RPC requests
集成测试
测试我们使用简单的curl命令来执行 使用命令行去获得请求响应:
curl -i -H "Content-Type:application/json" -X POST -d '{"domain":"www.jsmean.com"}' http://localhost:5000/dns
测试效果
上面的两个区域是启动的两个dns-worker,左下为web服务器的响应,右下为客户端发送json数据的请求。