分享

python 多线程知识全面解析

 风声之家 2021-03-13

Python编程学堂 4天前

非阻塞启动线程















import threadingimport timedef one_thread(name,id):    print("start....")    print(name)    print(id)    time.sleep(5)    print("end...")
print("start thread")threading.Thread(target=one_thread, args=(), kwargs={"name": 111, "id": 222}).start()# args是一个list# kwargs是一个字典,需要对应函数的keyprint("end thread")

  • 得到值如下,线程启动函数后,非阻塞执行







start threadstart....111222end threadend...

多线程并发处理







































import threading
import time

class myThread(threading.Thread): def __init__(self, threadID, name): threading.Thread.__init__(self) self.threadID = threadID self.name = name
def run(self): print_time(self.threadID, self.name)
num = 0def print_time(threadID, name): global num # 每一个线程循环10次,最终总循环次数为30次 for i in range(10): print("start run") time.sleep(2) print(i) num += 1 print("thread_id=%s:name=%s" % (threadID, name))

if __name__ == '__main__': threads = [] # 新增三个线程 for i in range(3): name = "Thread-%d" % i t = myThread(i, name) t.start() threads.append(t) for t in threads: t.join() print("所有线程执行完毕") print("总循环次数为:%s" % num)

  • 打印结果:每次运行三个线程,每个线程循环打印10次










start runstart runstart run00...thread_id=1:name=Thread-1所有线程执行完毕总循环次数为:30

  • 多线程共享资源,可以使用全局变量global

多线程加锁

  • 对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间





































# -*- coding: utf-8 -*-import timeimport threading# 创建锁对象lock = threading.Lock()num = 0
def run(n): global num for i in range(10): # 加锁 为了确保下面代码只能由一个线程从头到尾的执行 # 会阻止多线程的并发执行,所以效率会大大降低 """ lock.acquire() try: num = num - n num = num + n finally: # 解锁 lock.release() """ with lock: time.sleep(2) print("start") num = num + 1 print("==============")

if __name__ == '__main__': t1 = threading.Thread(target=run,args=(6,)) t2 = threading.Thread(target=run,args=(9,)) t1.start() t2.start() t1.join() t2.join() print("num = %s"%(num))

  • 打印结果是每次只能运行一个线程





start==============...num = 20

多线程与队列

  • 我们经常会遇到这样的一个问题,这里有成千上万条数据,每次需要取出其中的一条数据进行处理,那么引入多线程该怎么进行任务分配?

  • 我们可以将数据进行分割然后交给多个线程去跑,可是这并不是一个明智的做法。在这里我们可以使用队列与线程相结合的方式进行任务分配。

  • 队列线程的思想:首先创建一个全局共享的队列,队列中只存在有限个元素,并将所有的数据逐条加入到队列中,并调用队列的join函数进行等待。之后便可以开启若干线程,线程的任务就是不断的从队列中取数据进行处理就可以了。































































import threadingimport timeimport queue
q = queue.Queue(10)
threadLock = threading.Lock()

class myThread(threading.Thread): def __init__(self, threadID, name): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.exitFlag = 0
def run(self): while not self.exitFlag: threadLock.acquire() if not q.empty(): id = q.get() print_time(self.name, id) threadLock.release() else: threadLock.release()

def print_time(threadName, id): print ("%s:%s:%s"%(threadName,time.ctime(time.time()),id)) # pass

# 创建3个线程threads = []for i in range(3): name = "Thread-%d" % i t = myThread(i, name) t.start() threads.append(t)print(threads)
# 新增队列数据for i in range(10000): q_name = "Queue:%d" % i q.put(q_name)
# 等待队列清空while not q.empty(): pass
# 也可以join方法,与上同效# q.join()
# 通知线程,处理完之后关闭for t in threads: t.exitFlag = 1
# 等待所有线程结束之后才退出for t in threads: t.join()
print("Exiting Main Thread")

  • 这里必须要在判断q.empty()前加上线程锁,因为可能会出现这样的一种情况。

  • 某一时刻,队列中还有一个元素,该元素正在被线程A取出,而与此同时线程B正在判断队列q是否为空,而此时线程B中队列q不为空进入后面的操作,但是待B去取元素时,最后一个元素已经被A取出,造成线程等待,显示出被挂起的状态。

  • 我们也可以通过加入q.get(timeout=10)超时操作来弥补这一问题。

  • 打印的结果








[<myThread(Thread-0, started 6568)>, <myThread(Thread-1, started 7724)>, <myThread(Thread-2, started 7796)>]Thread-1:Sat Aug 22 11:36:29 2020:Queue:0Thread-1:Sat Aug 22 11:36:29 2020:Queue:1...Thread-1:Sat Aug 22 11:36:30 2020:Queue:9998Thread-1:Sat Aug 22 11:36:30 2020:Queue:9999Exiting Main Thread

ThreadPoolExecutor线程池的使用

  • 锁依然可以运用到线程池

  • map的使用,接受一个List的数据,会循环调用



















from concurrent.futures.thread import ThreadPoolExecutorimport timenum = 0def print_time(data):    global num    num += 1    time.sleep(2)    print("start_%s" % data)    print("============")data = []for i in range(50):    data.append(i)with ThreadPoolExecutor(10) as pool:    result = pool.map(print_time, data)# 等待所有线程执行完毕for i in result:    passprint("循环次数=%s" % num)

  • 打印结果为:每次启动10个线程,启动了5次







============start_46start_49========================循环次数=50

  • submit接受list的数据,也可以接受字典




















from concurrent.futures.thread import ThreadPoolExecutorfrom concurrent.futures import as_completed
import timedef print_time(data): time.sleep(2) print("start_%s" % data) print("============")data = []for i in range(50): data.append(i)with ThreadPoolExecutor(10) as executor: future_list = [] for i in range(10): # future = executor.submit(print_time,data) future = executor.submit(print_time, {"name": 111, "id": 222}) future_list.append(future) for res in as_completed(future_list): # 这个futrure_list是你future对象的列表 print(res.result())


作者:望月成三人
链接:https://www.jianshu.com/p/74d3119903fc
来源:简书
本文来源于简书,著作权归作者所有。仅用于学习交流,不进行商业用途,如需引用或转载,请注明原出处。如有侵权,请联系删除。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多