来自:mjsws > 馆藏分类
配色: 字号:
ZeroMQ(java)的负载均衡
2018-12-08 | 阅:  转:  |  分享 
  
ZeroMQ(java)的负载均衡我们在实际的应用中最常遇到的场景如下:A向B发送请求,B向A返回结果。。。。但是这种场景就会很容易变成这个
样子:很多A向B发送请求,所以B要不断的处理这些请求,所以就会很容易想到对B进行扩展,由多个B来处理这些请求,那么这里就出现了另外
一个问题:B对请求处理的速度可能不同,那么B之间他们的负载也是不同的,那么应该如何对请求进行分发就成了一个比较重要的问题。。。也就
变成了负载均衡的问题了。。。其实最好的负载均衡解决方案也很简单:绝大多数的任务都是独立的,这里中间层可以将A发送过来的请求先缓存起
来,然后B的行为就是主动的找中间层获取请求处理,然后返回,再获取。。。。也就是中间层只是做一个请求的缓存。。。由B自己来掌控合适来
处理请求,也就是当B已经处理完了任务之后,自己去主动获取。。。而不是由中间层自己去主动分发。。。。嗯,那么在ZeroMQ中应该如何
实现这种模式呢,恩其实还挺简单的,如下图:这里由两个Router来作为中间层,具体的数据流程如下:(1)中间层启动,Worker连
接Backend,向其发送Request请求(ready),这个时候中间层就能够知道哪一个worker现在是空闲的,将其保存起来(
放到worker队列),可以处理请求638棋牌http://www.dadiqipaigw.cnworker的执行流程就是sen
d(发送ready)---recv(获取请求),(2)Client端向Fronted发送请求,中间层将请求缓存到一个任务队列(3)
中间层从任务队里里面取出一个任务,将其发送给worker队列中的一个worker,并将其从woker队列中移除(4)worker处
理完以后,发送执行结果,也就是send,中间层收到woker的数据之后,将其发送给相应的client,然后在讲这个worker放
到worker队列中,表示当前这个worker可用。。。。好了,前面就基本上介绍了整个结构用ZeroMQ应该是怎么实现的,那么接下
来就直接来上代码吧:packagebalance;importjava.util.LinkedList;importorg.
zeromq.ZFrame;importorg.zeromq.ZMQ;importorg.zeromq.ZMsg;public
classBalance{publicstaticclassClient{publicvoidstart(){
newThread(newRunnable(){publicvoidrun(){//TODOAuto-generat
edmethodstubZMQ.Contextcontext=ZMQ.context(1);ZMQ.Socketsoc
ket=context.socket(ZMQ.REQ);socket.connect("ipc://front");//连接
router,想起发送请求for(inti=0;i1000;i++){socket.send("hello".ge
tBytes(),0);//发送hello请求Stringbb=newString(socket.recv());/
/获取返回的数据System.out.println(bb);}socket.close();context.term();}})
.start();}}publicstaticclassWorker{publicvoidstart(){newT
hread(newRunnable(){publicvoidrun(){//TODOAuto-generatedme
thodstubZMQ.Contextcontext=ZMQ.context(1);ZMQ.Socketsocket=
context.socket(ZMQ.REQ);socket.connect("ipc://back");//连接,用于获取要
处理的请求,并发送回去处理结果socket.send("ready".getBytes());//发送ready,表示当前可用w
hile(!Thread.currentThread().isInterrupted()){ZMsgmsg=ZMsg.r
ecvMsg(socket);//获取需要处理的请求,其实这里msg最外面的标志frame是router对分配给client的标
志frameZFramerequest=msg.removeLast();//最后一个frame其实保存的就是实际的请求数
据,这里将其移除,待会用新的frame代替ZFrameframe=newZFrame("hellofjs".getByt
es());msg.addLast(frame);//将刚刚创建的frame放到msg的最后,worker将会收到msg.sen
d(socket);//将数据发送回去乐淘棋牌http://www.letaoqpyx.com}socket.close();
context.term();}}).start();}}publicstaticclassMiddle{private
LinkedList?workers;privateLinkedList?requests;privateZMQ.Contex
tcontext;privateZMQ.Pollerpoller;publicMiddle(){this.workers
=newLinkedList();this.requests=newLinkedList();this.context
=ZMQ.context(1);this.poller=newZMQ.Poller(2);}publicvoidst
art(){ZMQ.Socketfronted=this.context.socket(ZMQ.ROUTER);//创建
一个router,用于接收client发送过来的请求,以及向client发送处理结果ZMQ.Socketbackend=th
is.context.socket(ZMQ.ROUTER);//创建一个router,用于向后面的worker发送数据,然后接收
处理的结果fronted.bind("ipc://front");//监听,等待client的连接backend.bind("i
pc://back");//监听,等待worker连接//创建pollItemZMQ.PollItemfitem=new
ZMQ.PollItem(fronted,ZMQ.Poller.POLLIN);ZMQ.PollItembitem=new
ZMQ.PollItem(backend,ZMQ.Poller.POLLIN);this.poller.register(fi
tem);//注册pollItemthis.poller.register(bitem);while(!Thread.curr
entThread().isInterrupted()){this.poller.poll();if(fitem.isRead
able()){//表示前面有请求发过来了ZMsgmsg=ZMsg.recvMsg(fitem.getSocket())
;//获取client发送过来的请求,这里router会在实际请求上面套一个连接的标志framethis.requests.ad
dLast(msg);//将其挂到请求队列}if(bitem.isReadable()){//这里表示worker发送数据
过来了ZMsgmsg=ZMsg.recvMsg(bitem.getSocket());//获取msg,这里也会在实际发送的
数据前面包装一个连接的标志frame//这里需要注意,这里返回的是最外面的那个frame,另外它还会将后面的接着的空的标志fram
e都去掉乐淘棋牌http://www.jiekeqipai.netZFrameworkerID=msg.unwrap();
//把外面那层包装取下来,也就是router对连接的标志framethis.workers.addLast(workerID);
//将当前的worker的标志frame放到worker队列里面,表示这个worker可以用了ZFramereadyOrAdd
ress=msg.getFirst();//这里获取标志frame后面的数据,如果worker刚刚启动,那么应该是发送过来的
ready,if(newString(readyOrAddress.getData()).equals("ready")){
//表示是worker刚刚启动,发过来的readymsg.destroy();}else{msg.send(fronted)
;//表示是worker处理完的返回结果,那么返回给客户端}}while(this.workers.size()0&&t
his.requests.size()0){ZMsgrequest=this.requests.removeFirst(
);ZFrameworker=this.workers.removeFirst();request.wrap(worker)
;//在request前面包装一层,把可以用的worker的标志frame包装上,这样router就会发给相应的worker的连
接request.send(backend);//将这个包装过的消息发送出去}}fronted.close();backend.
close();this.context.term();}}publicstaticvoidmain(Stringargs[]){Workerworker=newWorker();worker.start();Clientclient=newClient();client.start();Middlemiddle=newMiddle();middle.start();}}其实根据前面已经提出来的实现原理来编写代码还是比较顺利的,中途也没有遇到什么问题。。。不过要理解这部分要比较了解ZeroMQ的数据格式才行
献花(0)
+1
(本文系mjsws首藏)