线程-锁&池
- queue实现线程通信
# 1.锁!!!!!
# 1.1 互斥锁Lock
线程共享内存,数据存在安全隐患
操作的是全局变量
做以下操作
先计算再赋值才容易出现数据不安全的问题
包括+=/-=//+/*a (lst[0] += 1 dic['key']-=1)
解决线程数据安全问题
加锁Lock!
- 加锁会影响程序的执行效率,保障了数据安全
互斥锁:
- 在同一个线程中,不能连续acquire多次
# 1.2 递归锁 RLock
递归锁在同一个进程中,可以连续多次acquire,不会被阻塞
递归锁占用了更多的资源,没有互斥锁效率高
![img](file:///C:\Users\big cattle\Documents\Tencent Files\694526621\Image\Group\ML}3$2IGV~JDKV20547BCZ1.png)
# 1.2 单例加互斥锁
基本格式
class Foo(object): ____instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: time.sleep(0.1) cls.__instance = object.__new__(cls) return cls.__instance obj1=Foo() obj2=Foo()
1
2
3
4
5
6
7
8
9在线程中创建单例存在隐患!最终单例模式格式!!
import time from threading import Lock class A: __instance = None lock = Lock() def __new__(cls, *args, **kwargs): with cls.lock: if not cls.__instance: time.sleep(0.1) cls.__instance = object.__new__(cls) return cls.__instance def __init__(self,name,age): self.name = name self.age = age def func(): a = A('alex', 84) print(a) from threading import Thread for i in range(10): t = Thread(target=func) t.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 1.3死锁现象
死锁产生条件:
线程里有多把锁
多把锁交替使用
一个线程持有锁1同时在请求锁2,另一个线程持有锁2同时在请求锁1,二者不得到对方的锁都不会放开自己的锁,程序就这样僵持下去了。
死锁解决办法:
- 递归锁解决!
快速解决死锁
- 处理数据效率差
#递归锁解决死锁的的本质是把多个互斥锁变成了一把互斥锁 #递归锁也会发生死锁现象!在多把递归锁交替使用的时候!
1
2
优化代码逻辑(互斥锁解决)!
- 解决死锁速度慢,逻辑优化困难
- 数据处理效率高
#互斥锁
在同一个线程中,一把互斥锁不能连续acquire多次
#递归锁
一把递归锁在同一个进程中多次acquire也不会发生阻塞,占用了更多资源
#死锁现象
死锁产生条件:
1. 线程里有多把锁
2. 多把锁交替使用
线程一直处在阻塞状态,产生死锁
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 2.队列
先进先出队列:
Queue
先来先服务server端
后进先出队列-栈:
- LifQueue (last in first out)
- 算法应用较多
优先级队列:
- PriorityQueue
- 自动排序
- 用户级别
- 告警级别
女神小结:线程很重要
# 进程
# Queue
# 生产者消费者模型
# JoinableQueue
# 生产者消费者模型
# 什么是生产者消费者模型
# 把一个产生数据并且处理数据的过程解耦
# 让生产的数据的过程和处理数据的过程达到一个工作效率上的平衡
# 中间的容器,在多进程中我们使用队列或者可被join的队列,做到控制数据的量
# 当数据过剩的时候,队列的大小会控制这生产者的行为
# 当数据严重不足的时候,队列会控制消费者的行为
# 并且我们还可以通过定期检查队列中元素的个数来调节生产者消费者的个数
# 比如说:一个爬虫,或者一个web程序的server端
# 爬虫
# 请求网页的平均时间是0.3s
# 处理网页代码的时候是0.003s
# 100倍,每启动100个线程生产数据
# 启动一个线程来完成处理数据
# web程序的server端
# 每秒钟有6w条请求
# 一个服务每s中只能处理2000条
# 先写一个web程序,只负责一件事情,就是接收请求,然后把请求放到队列中
# 再写很多个server端,从队列中获取请求,然后处理,然后返回结果
# 线程
# 线程:开销小 数据共享 是进程的一部分,不能独立存在 本身可以利用多核
# GIL锁
# 全局解释器锁
# cpython解释器中的机制
# 导致了在同一个进程中多个线程不能同时利用多核 —— python的多线程只能是并发不能是并行
# threading模块
# 创建线程 :面向函数 面向对象
# 线程中的几个方法:
# 属于线程对象t.start(),t.join()
# 守护线程t.daemon = True 等待所有的非守护子线程都结束之后才结束
# 非守护线程不结束,主线程也不结束
# 主线程结束了,主进程也结束
# 结束顺序 :非守护线程结束 -->主线程结束-->主进程结束
#-->主进程结束 --> 守护线程也结束
# threading模块的函数 :
# current_thread 在哪个线程中被调用,就返回当前线程的对象
# 活着的线程,包括主线程
# enumerate 返回当前活着的线程的对象列表
# active_count 返回当前或者的线程的个数
# 测试
# 进程和线程的效率差,线程的开启、关闭、切换效率更高
# 线程的数据共享的效果
# 操作系统
# 多道 遇到io会切换
# 分时 时间片到了会切换
# 进程的重点
# 进程的特点 :
# 数据隔离 IPC
# 开销大
# 能利用多核
# 进程之间共享数据的安全问题:Lock
# 生产者消费者模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 3.池
预先开启固定个数的进程数,当任务来临的时候,直接交给已经开好的进程去执行
- 节省了额进程和线程的开启,关闭,切换都需要时间
- 减轻了操作系统调度的负担
concurrent.futures 模块,开启池
# 3.1 进程池
ProcessPoolExcutor类
进程池里的进程数一般为cpu个数或者加一
一个池中的进程个数限制了我们程序的并发个数
#基本格式
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def func(i,name):
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end', os.getpid())
return '%s * %s'%(i,os.getpid()) #返回值
if __name__ == '__main__':
p = ProcessPoolExecutor(5) #池中进程数
ret_l = []
for i in range(10):
ret = p.submit(func,i,'alex')
ret_l.append(ret) #提交任务
for ret in ret_l:
print('ret-->',ret.result()) # ret.result() 取返回值, 同步阻塞
p.shutdown() ## 关闭池之后就不能继续提交任务,并且会阻塞,直到已经提交的任务完成
print('main',os.getpid())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 3.2 线程池
ThreadPoolExcutor类
线程池里的线程个数一般为cpu的4到5倍
#基本格式
#类不同,其余格式相同
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid())
time.sleep(random.randint(1,3))
print('end', os.getpid())
return '%s * %s'%(i,os.getpid())
tp = ThreadPoolExecutor(20)
# ret = tp.map(func,range(20))
# for i in ret:
# print(i)
for i in range(10):
ret = tp.submit(func,i)
tp.shutdown()
print('main')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
检测所有线程结束:
from concurrent.futures import ThreadPoolExecutor,wait
tp = ThreadPoolExecutor(THREAD_POOL_MAX_NUM)
task_list = []
# 读取conf主机列表
for host_item in HOST_LIST:
task = tp.submit(worker, host_item)
task_list.append(task)
wait(task_list,return_when=ALL_COMPLETED) # 阻塞至结束
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 3.3 回调函数
- 执行完子线程任务之后直接调用对应的回调函数
import requests
from concurrent.futures import ThreadPoolExecutor
def func(url):
"""
线程池
:return:
"""
ret=requests.get(url)
ret=ret.text
return {'url':url,'ret':ret}
def foo(ret):
ret=ret.result()
print(ret['url'])
url=url_lst = [
'http://www.baidu.com',
'http://www.cnblogs.com',
'http://www.douban.com',
'http://www.alibaba.com',
'http://www.cnblogs.com/Eva-J/articles/8306047.html',
'http://www.cnblogs.com/Eva-J/articles/7206498.html',
]
for url in url_lst:
t=ThreadPoolExecutor(20)
ret = t.submit(func,url)
ret.add_done_callback(foo)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
是单独开启线程还是池?
如果只是开一个单独的子线程去做一件事,就可以单独开线程
有大量任务等待去做,要求一定的并发,就需要开启线程池去做
根据程序的IO操作判定是否开进程池:
- 有大量的阻塞io使用池
- 爬虫大量用到池
是否开启回调函数:
- 执行完子线程任务之后直接调用对应的回调函数
- 爬取网页,需要等待数据传输和网络上的响应高IO的不需要回调函数,子线程处理
- 分析网页,没有什么io操作,这个操作没有必要再子线程完成,交给回调函数
进程和线程都有锁
-所有线程中能工作的基本都不能在进程中工作
-在进程中能使用的基本在线程中也可以使用
1
2
3
2
3
上次更新: 2023/04/16, 18:35:33