博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
异步消息队列zeromq实现服务器间高性能通信
阅读量:7246 次
发布时间:2019-06-29

本文共 4470 字,大约阅读时间需要 14 分钟。

ZeroMQ 是一个很有个性的项目,它原来是定位为“史上最快消息队列”,所以名字里面有“MQ”两个字母,但是后来逐渐演变发展,慢慢淡化了消息队列的身影,改称为消息内核,或者消息层了。从网络通信的角度看,它处于会话层之上,应用层之下,有了它,你甚至不需要自己写一行的socket函数调用就能完成复杂的网络通信工作。

服务端 :

import zeromqimport console;var context = zeromq.context()var responder = context.zmq_socket_reply() //创建套接字responder.bind(  "tcp://*:5559")console.log("服务端已启动")do {console.log("服务端收到消息",responder.recv() );responder.send("World")}while( sleep (1) )

客户端:

import zeromqimport console;var context = zeromq.context()var requester = context.zmq_socket_request();requester.connect( "tcp://localhost:5559" )requester.send("Hello");  //发送消息var str = requester.recv(); //接收字符串console.log ("客户端收到消息 ", str );context.term(); //关闭

三种基本模式(它有很多种)

1. 请求应答模式(req rep

消息双向的,有来有往,req端请求的消息,rep端必须答复给req

2. 订阅发布模式 (sub pub

消息单向的,有去无回的。可按照发布端可发布制定主题的消息,订阅端可订阅喜欢的主题,订阅端只会收到自己已经订阅的主题。发布端发布一条消息,可被多个订阅端同事收到。

3. push pull模式

消息单向的,也是有去无回的。push的任何一个消息,始终只会有一个pull端收到消息.

后续的代理模式和路由模式等都是在三种基本模式上面的扩展或变异。

阻塞 和 非阻塞

以上三种基本模式都支持阻塞模式和非阻塞模式。

req 和 rep的阻塞模式是这样的(其实跟原生的socket实现也非常像)

大家用过socket的,客户端要是先启动的话,会连接失败,或者是短时间内有超时问题。

224720261.png

服务端

# Echo server programimport socketHOST = ''                 # Symbolic name meaning all available interfacesPORT = 50007              # Arbitrary non-privileged ports = socket.socket(socket.AF_INET, socket.SOCK_STREAM)s.bind((HOST, PORT))s.listen(1)conn, addr = s.accept()print 'Connected by', addrwhile 1:data = conn.recv(1024)if not data: breakconn.sendall(data)conn.close()

客户端:

# Echo client programimport socketHOST = 'daring.cwi.nl'    # The remote hostPORT = 50007              # The same port as used by the servers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)s.connect((HOST, PORT))s.sendall('Hello, world')data = s.recv(1024)s.close()print 'Received', repr(data)

如果使用ActiveMQ/RabbitMQ之类的有代理MQ系统,只要保证MQ代理最先启动, 就可以保证系统的正常运行。而对于无代理的ZeroMQ来说,似乎比较难办。 在刚刚开始使用ZeroMQ时,我也一直担心这个问题,总是小心翼翼地首先启动调 用bind指令的程序,然后启动执行connect指令的程序。这样其实只是利用了 ZeroMQ的高速数据传输能力,以及ZeroMQ对IPC和socket的良好封装特性,还是 没有解决进程启动顺序的问题。后来,偶然实验了一下,发现bind程序和 connect程序无论谁先启动,其实都不影响整个系统的正常运行。

咱们再用mq测试下

import zmqcontext = zmq.Context()# Socket to talk to serverprint "Connecting to hello world server…"socket = context.socket(zmq.REQ)socket.connect ("tcp://localhost:5555")# Do 10 requests, waiting each time for a responsefor request in range (10):print "Sending request ", request,"…"socket.send ("Hello")# Get the reply.message = socket.recv()print "Received reply ", request, "[", message, "]"

import zmqimport timecontext = zmq.Context()socket = context.socket(zmq.REP)socket.bind("tcp://*:5555")while True:#  Wait for next request from clientmessage = socket.recv()print "Received request: ", message#  Do some 'work'time.sleep (1)        #   Do some 'work'#  Send reply back to clientsocket.send("World")

225025331.png

可以看出发布者绑定绑定一个端口,订阅者通过连接发布者接受订阅的消息。

官网描述这种模式要注意以下几点:

####

这里的Publish-Subscribe模型是一个很典型的PUB-SUB模型,即发布者(Publisher)只能发送数据,它发送时指明发送数据的类型,而订阅者(Subscriber)则只接收它关心的类型的消息。

###

1. pub/sub模式下,sub事实上可以连接多个pub,每次只连接一个connect,所以接收到的消息可以是叫错的,以至于不会单个pub掩盖了其他pub

2. 如果存在某个pub没有被任何sub连接,则该pub会丢弃所有的消息

3. 如果你采用tcp的连接方式,sub很慢,消息将会堆积在pub,后期会对该问题有个较好的解决

4. 目前的而版本,过滤发生在sub端,而不是pub端,意思就是说一个pub会发送所有的消息到所有的sub, 由sub决定是要drop这个msg.

zeromq是lib库,部署完成后可自行编写server和client,编译时指定-lzmq即可

读速度 | 写速度

send 1000000 message cost 659 ms | recv 50000 message cost 36 ms
send 1000000 message cost 597 ms | recv 50000 message cost 33 ms
send 1000000 message cost 735 ms | recv 50000 message cost 34 ms
send 1000000 message cost 727 ms | recv 50000 message cost 33 ms
send 1000000 message cost 741 ms | recv 50000 message cost 33 ms
send 1000000 message cost 798 ms | recv 50000 message cost 32 ms
send 1000000 message cost 665 ms | recv 50000 message cost 34 ms
平均读取速度 147w | 平均写入速度 144w /s

为题提高性能 可以用gevent框架

import geventfrom gevent_zeromq import zmq# Global Contextcontext = zmq.Context() #它是GreenContext的一个简写,确保greenlet化socketdef server():server_socket = context.socket(zmq.REQ) #创建一个socket,使用mq类型模式REQ/REP(请求/回复,服务器是请求),还有PUB/SUB(发布/订阅),push/pull等server_socket.bind("tcp://127.0.0.1:5000") #绑定socketfor request in range(1,10):server_socket.send("Hello")print('Switched to Server for ', request)server_socket.recv()  #这里发生上下文切换def client():client_socket = context.socket(zmq.REP)  (客户端是回复)client_socket.connect("tcp://127.0.0.1:5000")  #连接server的socket端口for request in range(1,10):client_socket.recv()print('Switched to Client for ', request)client_socket.send("World")publisher = gevent.spawn(server)client    = gevent.spawn(client)gevent.joinall([publisher, client])

转载地址:http://rhjbm.baihongyu.com/

你可能感兴趣的文章
dns解析外网域名很慢
查看>>
Netty系列之Netty可靠性分析
查看>>
oracle中裸设备的使用
查看>>
JXLS-----JXLS读取Excel实例
查看>>
华为hybrid-vlan
查看>>
Spring HandlerInterceptor的使用
查看>>
如何更加便捷地在Eclipse上开发GO语言
查看>>
OpenWrt分支LEDE嵌入式无线路由系统定制-2.自定义固件
查看>>
unity之dotween
查看>>
Android ListView复杂列表优化实践
查看>>
CSS的十八般技巧
查看>>
LoadRunner对脚本的的几点建议
查看>>
VNCserver安装即使用
查看>>
MA5680T以太网口工作模式
查看>>
Project Server 2013新手入门 (七)管理项目风险和问题
查看>>
Hive学习 参考网址
查看>>
逆波兰表达式
查看>>
使用 C++ 处理 JSON 数据交换格式
查看>>
html模板
查看>>
this web application instance has been stopped ...
查看>>