刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • Python基础

  • Python进阶

  • Python并发编程

    • 并发编程初识
    • 进程
    • Process模块&锁&进程之间通信
    • 生产者消费者模型
      • 内容回顾补充
      • 1.生产者消费者模型(fcfc重要)
      • 2.进程之间的数据共享(了解)
      • 3.初识线程(fcfc重要)
    • 线程-锁&池
    • 协程
  • Django

  • Flask

  • 爬虫

  • Python
  • Python并发编程
bigox
2021-03-22
目录

生产者消费者模型

# 内容回顾补充

  • io操作:

    • i (input) 向内存输入 input/read/recv/recvfrom/accept/connect/close
    • o (output) 向内存输出 print/write/send/sendto/close
  • start /terminate /join

    • start /terminate 异步非阻塞
    • join 同步阻塞
  • IPC 进程之间通讯

    • 常见内置ipc通讯机制:

      • 队列Queue和管道pipe
    • 第三方公共提供的IPC机制(消息中间键)

      • redis
      • memcache
      • kafka
      • rabbitmq
    • 第三方IPC机制与内置IPC机制

      • 第三方IPC机制--并发需求
      • 第三方IPC机制--高可用
      • 第三方IPC机制--断电保存数据
      • 第三方IPC机制--解耦
    • 耦合关系是指某两个事物之间如果存在一种相互作用、相互影响的关系,那么这种关系就称"耦合关系"。

    • 程序的解耦: 把写在一起的大功能分开多个小的功能处理

      • 提高代码的复用性,可读性

# 1.生产者消费者模型(fcfc重要)

  生产者消费者模型当中有两大类重要的角色,一个是生产者(负责造数据的任务),另一个是消费者(接收造出来的数据进行进一步的操作)。

为什么要使用生产者消费者模型?
     在并发编程中,如果生产者处理速度很快,而消费者处理速度比较慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个等待的问题,就引入了生产者与消费者模型。让它们之间可以不停的生产和消费。

实现生产者消费者模型三要素:
    1、生产者
    2、消费者
    3、队列(或其他的容器,但队列不用考虑锁的问题)

什么时候用这个模型?
程序中出现明显的两类任务,一类任务是负责生产,另外一类任务是负责处理生产的数据的(如爬虫)

用该模型的好处?
1、实现了生产者与消费者的解耦和
2、平衡了生产力与消费力,就是生产者一直不停的生产,消费者可以不停的消费,因为二者不再是直接沟通的,而是跟队列沟通的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  • 生产者(producer):生产数据
  • 消费者(consumer):处理数据
  • 一个进程就是一个生产者或者消费者
  • 生产者与消费者之间的容器就是队列,
  • 应用:
    • 爬虫提高效率
#示例
import time
import random
from multiprocessing import Process,Queue

def producer(q,name,food):
    for i in range(10):				  #限制了队列数据值为10个
        time.sleep(random.random())    
        fd = '%s%s'%(food,i)  
        q.put(fd)
        print('%s生产了一个%s'%(name,food))

        
def consumer(q,name):
    while True:						#一直取值
        food = q.get()				
        if not food:break				#取到空时,子进程结束
        time.sleep(random.randint(1,3))
        print('%s吃了%s'%(name,food))


def cp(c_count,p_count):
    q = Queue(10)				#限制队列的值的个数为10个
    for i in range(c_count):		#创建消费者
        Process(target=consumer, args=(q, 'alex')).start()
    p_l = []
    for i in range(p_count):		#创建生产者,并加入列表P-l
        p1 = Process(target=producer, args=(q, 'wusir', '泔水'))
        p1.start()
        p_l.append(p1)
    for p in p_l:p.join()			#结束所有生产者
    for i in range(c_count):#在生产者全部关闭后放入None到队列,当get取到None时,表示进程结束
        q.put(None)
if __name__ == '__main__':
    cp(2,3)						#要生产2个消费者和3个生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 2.进程之间的数据共享(了解)

  • 在mulprocessing中有一个manger类
    • manger封装了所有进程相关的 数据共享 数据传递等操作
    • 对字典.列表.这种数据操作的时候会有数据不安全的风险,需要加锁解决问题
    • 尽量减少这种数据共享操作

# 3.初识线程(fcfc重要)

  • 线程:开销小,数据共享 是进程的一部分,cpu最小的调度单位

  • 进程:开销大,数据隔离,是cpu的最小资源分配单位

  • cpython解释器 不能实现多线程利用多核

    • cpython解释器中特殊的垃圾回收机制
    • GIL 全局解释器锁:保证了整个python程序中,只能有一个线程被cpu执行
    • GIL锁导致了线程不能并行,可以并发,所以使用多线程并不影响高io的操作,只会对高计算型的程序有效率上的影响.
    • 遇到高计算时:多进程加上多线程
    • cpython和pypy有GIL锁.jpython和python没有
  • web框架基本都是多线程

    # thread类(线程)
    
    1.启动线程
    
    - 开启子线程\开启多个子线程\面向对象启动子线程参照multiprocessing格式去写(不需要再写name==main)
    
    - 开启一个线程速度非常快
    - 线程之间数据共享
    - 线程数据较进程来说很快
    - 线程和进程异步
    - 主线程要等待子线程结束之后结束
    - join阻塞,等待子线程结束
    - **主线程如果结束了,主进程也就结束了**
    - .ident线程id
    - 不要在子线程里修改全局变量
    - 守护线程一直等到所有的非守护线程都结束之后才结束,除了守护了主线程的代码之外还守护子线程
    
    2.current_threand   水性杨花 在哪一个线程里,threading.current_thread()得到的就是这个当前线程的信息,
    3.ident获取线程id
    
    4.terminate不能从主线程结束一个子线程
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
#开启线程
import os
import time
from threading import Thread
# multiprocessing 是完全仿照这threading的类写的
def func():
    print('start son thread')
    time.sleep(1)
    print('end son thread',os.getpid())
# 启动线程 start
Thread(target=func).start()
print('start',os.getpid())
time.sleep(0.5)
print('end',os.getpid())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 守护线程
import time
from threading import Thread
def son1():
    while True:
        time.sleep(0.5)
        print('in son1')
def son2():
    for i in range(5):
        time.sleep(1)
        print('in son2')
t =Thread(target=son1)
t.daemon = True
t.start()
Thread(target=son2).start()
time.sleep(3)
# 守护线程一直等到所有的非守护线程都结束之后才结束
# 除了守护了主线程的代码之外也会守护子线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 等待所有线程结束

def main():
    li = []
    for i in range(5):
        t = Thread(target=consumer_demo)
        t.daemon = True
        t.start()
        li.append(t)
    for i in li:
        i.join()
1
2
3
4
5
6
7
8
9
10
11
#Python#
上次更新: 2023/04/16, 18:35:33
Process模块&锁&进程之间通信
线程-锁&池

← Process模块&锁&进程之间通信 线程-锁&池→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式