分享

用Python多线程实现生产者消费者模式

 _王文波 2017-04-17

限时干货下载:添加微信公众号“数据玩家「fbigdata

回复【2】免费获取「完整数据分析资料,包括SPSS\SAS\SQL\EXCEL\Project!

pre-ipo新三板企业投资机会,请联系微.信.号:6048856

什么是生产者消费者模式

软件开发的过程中,经常碰到这样的场景: 某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。

结构图如下

为了大家容易理解,我们举一个寄信的例子。假设你要寄一封信,大致过程如下:

1、你把信写好----相当于生产者生产数据

2、你把信放入邮箱----相当于生产者把数据放入缓冲区

3、邮递员把信从邮箱取出,做相应处理----相当于消费者把数据取出缓冲区,处理数据

生产者消费者模式的优点

  • 解耦

假设生产者和消费者分别是两个线程。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果未来消费者的代码发生变化,可能会影响到生产者的代码。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

举个例子,我们去邮局投递信件,如果不使用邮箱(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须得认识谁是邮递员,才能把信给他。这就产生了你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮箱相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。

  • 并发

由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。

继续上面的例子,如果我们不使用邮箱,就得在邮局等邮递员,直到他回来,把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞)。或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。

  • 支持忙闲不均

    当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产。

我们再拿寄信的例子,假设邮递员一次只能带走1000封信,万一碰上情人节(或是圣诞节)送贺卡,需要寄出去的信超过了1000封,这时候邮箱这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮箱中,等下次过来时再拿走。

通过上面的介绍大家应该已经明白了生产者消费者模式。

Python中的多线程编程

在实现生产者消费者模式之前,我们先学习下Python中的多线程编程。 线程是操作系统直接支持的执行单元,高级语言通常都内置多线程的支持,Python也不例外,并且Python的线程是真正的Posix Thread,而不是模拟出来的线程。 Python的标准库提供了两个模块:thread和threading,thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

下面我们先看一段在Python中实现多线程的代码。

  1. import time,threading

  2. #线程代码

  3. classTaskThread(threading.Thread):

  4. def __init__(self,name):

  5. threading.Thread.__init__(self,name=name)

  6. def run(self):

  7. print('thread %s is running...' % self.getName)

  8. for i in range(6):

  9. print('thread %s >>> %s' % (self.getName, i))

  10. time.sleep(1)

  11. print('thread %s finished.' % self.getName)

  12. taskthread = TaskThread('TaskThread')

  13. taskthread.start

  14. taskthread.join

下面是程序的执行结果:

  1. thread TaskThreadis running...

  2. thread TaskThread >>> 0

  3. thread TaskThread >>> 1

  4. thread TaskThread >>> 2

  5. thread TaskThread >>> 3

  6. thread TaskThread >>> 4

  7. thread TaskThread >>> 5

  8. thread TaskThread finished.

TaskThread类继承自threading模块中的Thread线程类。构造函数的name参数指定线程的名字,通过重载基类run函数实现具体任务。

在简单熟悉了Python的线程后,下面我们实现一个生产者消费者模shi。

  1. fromQueueimportQueue

  2. import random,threading,time

  3. #生产者类

  4. classProducer(threading.Thread):

  5. def __init__(self, name,queue):

  6. threading.Thread.__init__(self, name=name)

  7. self.data=queue

  8. def run(self):

  9. for i in range(5):

  10. print('%s is producing %d to the queue!' % (self.getName, i))

  11. self.data.put(i)

  12. time.sleep(random.randrange(10)/5)

  13. print('%s finished!' % self.getName)

  14. #消费者类

  15. classConsumer(threading.Thread):

  16. def __init__(self,name,queue):

  17. threading.Thread.__init__(self,name=name)

  18. def run(self):

  19. for i in range(5):

  20. val = self.data.get

  21. print('%s is consuming. %d in the queue is consumed!' % (self.getName,val))

  22. 10))

  23. print('%s finished!' % self.getName)

  24. def main:

  25. queue = Queue

  26. producer = Producer('Producer',queue)

  27. consumer = Consumer('Consumer',queue)

  28. producer.start

  29. consumer.start

  30. producer.join

  31. consumer.join

  32. print'All threads finished!'

  33. if __name__ == '__main__':

  34. main

执行结果可能如下:

  1. Produceris producing 0 to the queue!

  2. Consumeris consuming. 0in the queue is consumed!

  3. Produceris producing 1 to the queue!

  4. Produceris producing 2 to the queue!

  5. Consumeris consuming. 1in the queue is consumed!

  6. Consumeris consuming. 2in the queue is consumed!

  7. Produceris producing 3 to the queue!

  8. Produceris producing 4 to the queue!

  9. Producer finished!

  10. Consumeris consuming. 3in the queue is consumed!

  11. Consumeris consuming. 4in the queue is consumed!

  12. Consumer finished!

  13. All threads finished!

因为多线程是抢占式执行的,所以打印出的运行结果不一定和上面的完全一致。

小结

本例通过Python实现了一个简单的生产者消费者模型。Python中的Queue模块已经提供了对线程同步的支持,所以本文并没有涉及锁、同步、死锁等多线程问题。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多