锁的分类 :
线程安全:
线程池:
生产者消费者模型:
'''
锁:由于线程之间随机调度:某线程可能在执行n条后,CPU接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。 为什么加锁:1、用于非线程安全, 2、控制一段代码,确保其不产生调度混乱。
锁种类介绍: 1、Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被其他的线程拥有。
# import threading # import time # # v = [] # lock = threading.Lock() # # def func(arg): # lock.acquire() #获得锁定 # v.append(arg) # time.sleep(0.01) # m = v[-1] # print(arg,m) # lock.release() #释放锁定 # # # for i in range(10): # t =threading.Thread(target=func,args=(i,)) # t.start()
2、RLockRLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
# import threading # import time # # v = [] # lock = threading.RLock() # def func (arg): # lock.acquire() # lock.acquire() # v.append(arg) # time.sleep(2) # m = v[-1] # print(arg,m) # lock.release() # lock.release() # for i in range(10): # t=threading.Thread(target=func, args=(i,)) # t.start()
3、BoundedSemaphore(1次放N个)信号量 Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
# import time # import threading # # lock = threading.BoundedSemaphore(6) # def func(arg): # lock.acquire() # print(arg) # time.sleep(1) # lock.release() # # # for i in range(20): # t =threading.Thread(target=func,args=(i,)) # t.start()
4、Condition(1次方法x个) 通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。 import time import threading
lock = threading.Condition()
# ############## 方式一 ##############
def func(arg): print('线程进来了') lock.acquire() lock.wait() # 加锁
print(arg) time.sleep(1)
lock.release()
for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
while True: inp = int(input('>>>'))
lock.acquire() lock.notify(inp) lock.release()
# ############## 方式二 ############## """ def xxxx(): print('来执行函数了') input(">>>") # ct = threading.current_thread() # 获取当前线程 # ct.getName() return True
def func(arg): print('线程进来了') lock.wait_for(xxxx) print(arg) time.sleep(1)
for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
"""
5、Event 是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。(如同红灯停,绿灯行)
# import time # import threading # # lock = threading.Event() # # # def func(arg): # print('线程来了') # lock.wait() # 加锁:红灯 # print(arg) # # # for i in range(10): # t =threading.Thread(target=func,args=(i,)) # t.start() # # input(">>>>") # lock.set() # 绿灯 # # # lock.clear() # 再次变红灯 # # for i in range(10): # t =threading.Thread(target=func,args=(i,)) # t.start() # # input(">>>>") # lock.set()
二、 线程安全 线程安全:列表和字典线程安全 # import threading # # v = [] # def func(arg): # v.append(arg) # 线程安全 # print(v) # for i in range(10): # t =threading.Thread(target=func,args=(i,)) # t.start()
三、threadinglocal
可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节 threading.local作用:内部自动为每一个线程维护一个空间{字典},用于当前线程存取属于自己的值,保证线程之间数据隔离。 # import time # import threading # # DATA_DICT = {} # # def func(arg): # ident = threading.get_ident() # DATA_DICT[ident] = arg # time.sleep(1) # print(DATA_DICT[ident],arg) # # # for i in range(10): # t =threading.Thread(target=func,args=(i,)) # t.start()
四、多线程池
# from concurrent.futures import ThreadPoolExecutor # import time # # def task(a1,a2): # time.sleep(2) # print(a1,a2) # # # 创建了一个线程池(最多5个线程) # pool = ThreadPoolExecutor(5) # # for i in range(40): # # 去线程池中申请一个线程,让线程执行task函数。 # pool.submit(task,i,8)
五、生产者消费者模型 三部件: 生产者 队列,先进先出 扩展: 栈,后进先出 消费者
思考:生产者消费者模型解决了什么问题?不用一直等待的问题。 # # import time # import queue # import threading # q = queue.Queue() # 线程安全 # # def producer(id): # """ # 生产者 # :return: # """ # while True: # time.sleep(2) # q.put('包子') # print('厨师%s 生产了一个包子' %id ) # # for i in range(1,4): # t = threading.Thread(target=producer,args=(i,)) # t.start() # # # def consumer(id): # """ # 消费者 # :return: # """ # while True: # time.sleep(1) # v1 = q.get() # print('顾客 %s 吃了一个包子' % id) # # for i in range(1,3): # t = threading.Thread(target=consumer,args=(i,)) # t.start()
|