分享

04消息队列zmq的发布者-订阅者的计算π的简单程序。

 小世界的野孩子 2021-06-23
# 小知识:计算π的其中一个方法是,随机的向一个边长为n的正方形中撒豆子。
# 然后看这些豆子是否在以n为半径的四分之一圆内,正方形面积:n*n,四分之一圆的面积:π*n*n/4
# 因此落在四分之一圆内的概率为π/4,这样我们就能算出π的值。
# 我们这个程序是来讲述zmq发布-订阅过程的流程,

# 通过计算π值为例子来进行计算。
# 首先发布者bitsource随机生成一个字符串,字符串偶数位为纵坐标,奇数位为横坐标。
# 一个字符串表示一个点,将奇数位转换为横坐标,将偶数位转换为纵坐标,然后通过计算
# 这个点到原点的距离,来判断是否为以半径为B的圆内,通过大量的模拟,来进行计算π。



import random, threading, time, zmq
B = 32 # number of bits of precision in each random integer

def ones_and_zeros(digits):
"""Express `n` in at least `d` binary digits, with no special prefix."""
# getrandbits()
# 方法返回指定大小(以位为单位)的整数。
return bin(random.getrandbits(digits)).lstrip('0b').zfill(digits)

# 发布者
def bitsource(zcontext, url):
# 发布订阅函数,发布者。
"""Produce random points in the unit square."""
# 创造一个对象。发布者对象
zsock = zcontext.socket(zmq.PUB)
# 绑定URL
zsock.bind(url)
while True:
# 持续不断的一直发送这里是32位的二进制字符。
# 这里一个字符串代表一个点。
zsock.send_string(ones_and_zeros(B * 2))
# 停顿0.01秒。
time.sleep(0.01)
# 订阅者1
def always_yes(zcontext, in_url, out_url):
"""Coordinates in the lower-left quadrant are inside the unit circle."""
# 创造一个订阅者对象。
isock = zcontext.socket(zmq.SUB)
# 连接发布者的URL
isock.connect(in_url)
# 设置过滤条件,接收00开头的
# 如果开头为00的话,那么这个点x、y值都不会超过半径的一半
# 一定在四分之一圆内。
isock.setsockopt(zmq.SUBSCRIBE, b'00')
# 推送。
osock = zcontext.socket(zmq.PUSH)
# 连接接收推送的URL
osock.connect(out_url)
while True:
# 接收订阅者的消息。
isock.recv_string()
# 推送给发布者消息。因此我们这里直接发送Y
osock.send_string('Y')
# 订阅者2
def judge(zcontext, in_url, pythagoras_url, out_url):
"""Determine whether each input coordinate is inside the unit circle."""
# # 创造一个订阅者对象。
isock = zcontext.socket(zmq.SUB)
# 连接URL
isock.connect(in_url)
# 设置接收订阅的过滤条件。
for prefix in b'01', b'10', b'11':
isock.setsockopt(zmq.SUBSCRIBE, prefix)
# 设置一个响应对象。
psock = zcontext.socket(zmq.REQ)
psock.connect(pythagoras_url)
# 设置一个推送对象。
osock = zcontext.socket(zmq.PUSH)
osock.connect(out_url)
# 这里用了勾股定理,是两个落在坐标轴上的点,平方和。
unit = 2 ** (B * 2)
# 这里需要计算是否在四分之一圆内。
while True:
# 接收发布者的消息。
bits = isock.recv_string()
# 提取这个点的x坐标,y坐标。
n, m = int(bits[::2], 2), int(bits[1::2], 2)
# 发送给客户请求端
psock.send_json((n, m))
# 然后接受客户请求端发送过来处理过的数据。
sumsquares = psock.recv_json()
# 判断是否在圆内。
osock.send_string('Y' if sumsquares < unit else 'N')
# 请求端,
def pythagoras(zcontext, url):
"""Return the sum-of-squares of number sequences."""
zsock = zcontext.socket(zmq.REP)
zsock.bind(url)
while True:
# 这里先请求数据,然后将请求的数据进行处理发送出去。
numbers = zsock.recv_json()
zsock.send_json(sum(n * n for n in numbers))
# 汇总,进行计算π的值。
def tally(zcontext, url):
"""Tally how many points fall within the unit circle, and print pi."""
zsock = zcontext.socket(zmq.PULL)
zsock.bind(url)
# 这里是如果接受到一个Y p+4,接受到一个N ,q + 1
# 然后计算比值,这个就是我们算出来的π的值。
p = q = 0
while True:
decision = zsock.recv_string()
q += 1
if decision == 'Y':
p += 4
print(decision, p / q)

# 我们使用多线程的方式,上边的每一个函数开一个线程,。
def start_thread(function, *args):
thread = threading.Thread(target=function, args=args)
thread.daemon = True # so you can easily Ctrl-C the whole program
thread.start()

def main(zcontext):
pubsub = 'tcp://127.0.0.1:6700'
reqrep = 'tcp://127.0.0.1:6701'
pushpull = 'tcp://127.0.0.1:6702'
start_thread(bitsource, zcontext, pubsub)
start_thread(always_yes, zcontext, pubsub, pushpull)
start_thread(judge, zcontext, pubsub, reqrep, pushpull)
start_thread(pythagoras, zcontext, reqrep)
start_thread(tally, zcontext, pushpull)
# 这个是主线程,主线程结束,其他的多线程也就要结束。
time.sleep(30)

if __name__ == '__main__':
main(zmq.Context())

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约