生产者消费者模型
# 内容回顾补充
io操作:
- i (input) 向内存输入 input/read/recv/recvfrom/accept/connect/close
- o (output) 向内存输出 print/write/send/sendto/close
start /terminate /join
- start /terminate 异步非阻塞
- join 同步阻塞
IPC 进程之间通讯
常见内置ipc通讯机制:
- 队列Queue和管道pipe
第三方公共提供的IPC机制(消息中间键)
- redis
- memcache
- kafka
- rabbitmq
第三方IPC机制与内置IPC机制
- 第三方IPC机制--并发需求
- 第三方IPC机制--高可用
- 第三方IPC机制--断电保存数据
- 第三方IPC机制--解耦
耦合关系是指某两个事物之间如果存在一种相互作用、相互影响的关系,那么这种关系就称"耦合关系"。
程序的解耦: 把写在一起的大功能分开多个小的功能处理
- 提高代码的复用性,可读性
# 1.生产者消费者模型(fcfc重要)
生产者消费者模型当中有两大类重要的角色,一个是生产者(负责造数据的任务),另一个是消费者(接收造出来的数据进行进一步的操作)。
为什么要使用生产者消费者模型?
在并发编程中,如果生产者处理速度很快,而消费者处理速度比较慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个等待的问题,就引入了生产者与消费者模型。让它们之间可以不停的生产和消费。
实现生产者消费者模型三要素:
1、生产者
2、消费者
3、队列(或其他的容器,但队列不用考虑锁的问题)
什么时候用这个模型?
程序中出现明显的两类任务,一类任务是负责生产,另外一类任务是负责处理生产的数据的(如爬虫)
用该模型的好处?
1、实现了生产者与消费者的解耦和
2、平衡了生产力与消费力,就是生产者一直不停的生产,消费者可以不停的消费,因为二者不再是直接沟通的,而是跟队列沟通的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- 生产者(producer):生产数据
- 消费者(consumer):处理数据
- 一个进程就是一个生产者或者消费者
- 生产者与消费者之间的容器就是队列,
- 应用:
- 爬虫提高效率
#示例
import time
import random
from multiprocessing import Process,Queue
def producer(q,name,food):
for i in range(10): #限制了队列数据值为10个
time.sleep(random.random())
fd = '%s%s'%(food,i)
q.put(fd)
print('%s生产了一个%s'%(name,food))
def consumer(q,name):
while True: #一直取值
food = q.get()
if not food:break #取到空时,子进程结束
time.sleep(random.randint(1,3))
print('%s吃了%s'%(name,food))
def cp(c_count,p_count):
q = Queue(10) #限制队列的值的个数为10个
for i in range(c_count): #创建消费者
Process(target=consumer, args=(q, 'alex')).start()
p_l = []
for i in range(p_count): #创建生产者,并加入列表P-l
p1 = Process(target=producer, args=(q, 'wusir', '泔水'))
p1.start()
p_l.append(p1)
for p in p_l:p.join() #结束所有生产者
for i in range(c_count):#在生产者全部关闭后放入None到队列,当get取到None时,表示进程结束
q.put(None)
if __name__ == '__main__':
cp(2,3) #要生产2个消费者和3个生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 2.进程之间的数据共享(了解)
- 在mulprocessing中有一个manger类
- manger封装了所有进程相关的 数据共享 数据传递等操作
- 对字典.列表.这种数据操作的时候会有数据不安全的风险,需要加锁解决问题
- 尽量减少这种数据共享操作
# 3.初识线程(fcfc重要)
线程:开销小,数据共享 是进程的一部分,cpu最小的调度单位
进程:开销大,数据隔离,是cpu的最小资源分配单位
cpython解释器 不能实现多线程利用多核
- cpython解释器中特殊的垃圾回收机制
- GIL 全局解释器锁:保证了整个python程序中,只能有一个线程被cpu执行
- GIL锁导致了线程不能并行,可以并发,所以使用多线程并不影响高io的操作,只会对高计算型的程序有效率上的影响.
- 遇到高计算时:多进程加上多线程
- cpython和pypy有GIL锁.jpython和python没有
web框架基本都是多线程
# thread类(线程) 1.启动线程 - 开启子线程\开启多个子线程\面向对象启动子线程参照multiprocessing格式去写(不需要再写name==main) - 开启一个线程速度非常快 - 线程之间数据共享 - 线程数据较进程来说很快 - 线程和进程异步 - 主线程要等待子线程结束之后结束 - join阻塞,等待子线程结束 - **主线程如果结束了,主进程也就结束了** - .ident线程id - 不要在子线程里修改全局变量 - 守护线程一直等到所有的非守护线程都结束之后才结束,除了守护了主线程的代码之外还守护子线程 2.current_threand 水性杨花 在哪一个线程里,threading.current_thread()得到的就是这个当前线程的信息, 3.ident获取线程id 4.terminate不能从主线程结束一个子线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#开启线程
import os
import time
from threading import Thread
# multiprocessing 是完全仿照这threading的类写的
def func():
print('start son thread')
time.sleep(1)
print('end son thread',os.getpid())
# 启动线程 start
Thread(target=func).start()
print('start',os.getpid())
time.sleep(0.5)
print('end',os.getpid())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 守护线程
import time
from threading import Thread
def son1():
while True:
time.sleep(0.5)
print('in son1')
def son2():
for i in range(5):
time.sleep(1)
print('in son2')
t =Thread(target=son1)
t.daemon = True
t.start()
Thread(target=son2).start()
time.sleep(3)
# 守护线程一直等到所有的非守护线程都结束之后才结束
# 除了守护了主线程的代码之外也会守护子线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 等待所有线程结束
def main():
li = []
for i in range(5):
t = Thread(target=consumer_demo)
t.daemon = True
t.start()
li.append(t)
for i in li:
i.join()
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
上次更新: 2023/04/16, 18:35:33