刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • Python基础

  • Python进阶

  • Python并发编程

    • 并发编程初识
    • 进程
    • Process模块&锁&进程之间通信
    • 生产者消费者模型
    • 线程-锁&池
      • 1.锁!!!!!
        • 1.1 互斥锁Lock
        • 1.2 递归锁 RLock
        • 1.2 单例加互斥锁
        • 1.3死锁现象
      • 2.队列
      • 3.池
        • 3.1 进程池
        • 3.2 线程池
        • 3.3 回调函数
    • 协程
  • Django

  • Flask

  • 爬虫

  • Python
  • Python并发编程
bigox
2021-03-22
目录

线程-锁&池

  • queue实现线程通信

# 1.锁!!!!!

# 1.1 互斥锁Lock

  • 线程共享内存,数据存在安全隐患

    1. 操作的是全局变量

    2. 做以下操作

      • 先计算再赋值才容易出现数据不安全的问题

      • 包括+=/-=//+/*a (lst[0] += 1 dic['key']-=1)

  • 解决线程数据安全问题

  • 加锁Lock!

    • 加锁会影响程序的执行效率,保障了数据安全
  • 互斥锁:

    • 在同一个线程中,不能连续acquire多次

# 1.2 递归锁 RLock

  • 递归锁在同一个进程中,可以连续多次acquire,不会被阻塞

  • 递归锁占用了更多的资源,没有互斥锁效率高

    ![img](file:///C:\Users\big cattle\Documents\Tencent Files\694526621\Image\Group\ML}3$2IGV~JDKV20547BCZ1.png)

# 1.2 单例加互斥锁

  • 基本格式

    class Foo(object):
        ____instance = None
        def __new__(cls, *args, **kwargs):
            if not cls.__instance:
                    time.sleep(0.1)
                    cls.__instance = object.__new__(cls)
            return cls.__instance
    obj1=Foo()
    obj2=Foo()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
  • 在线程中创建单例存在隐患!最终单例模式格式!!

    import time
    from threading import Lock
    class A:
        __instance = None
        lock = Lock()
        def __new__(cls, *args, **kwargs):
            with cls.lock:
                if not cls.__instance:
                    time.sleep(0.1)
                    cls.__instance = object.__new__(cls)
            return cls.__instance
        def __init__(self,name,age):
            self.name = name
            self.age = age
    
    def func():
        a = A('alex', 84)
        print(a)
    
    from threading import Thread
    for i in range(10):
        t = Thread(target=func)
        t.start()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

# 1.3死锁现象

  • 死锁产生条件:

    1. 线程里有多把锁

    2. 多把锁交替使用

      一个线程持有锁1同时在请求锁2,另一个线程持有锁2同时在请求锁1,二者不得到对方的锁都不会放开自己的锁,程序就这样僵持下去了。

  • 死锁解决办法:

    1. 递归锁解决!
    • 快速解决死锁

      • 处理数据效率差
      #递归锁解决死锁的的本质是把多个互斥锁变成了一把互斥锁
      #递归锁也会发生死锁现象!在多把递归锁交替使用的时候!
      
      1
      2
    1. 优化代码逻辑(互斥锁解决)!

      • 解决死锁速度慢,逻辑优化困难
      • 数据处理效率高
#互斥锁
在同一个线程中,一把互斥锁不能连续acquire多次
#递归锁
一把递归锁在同一个进程中多次acquire也不会发生阻塞,占用了更多资源
#死锁现象
死锁产生条件:
1. 线程里有多把锁
2. 多把锁交替使用
线程一直处在阻塞状态,产生死锁
1
2
3
4
5
6
7
8
9

# 2.队列

  • 先进先出队列:

    • Queue

    • 先来先服务server端

  • 后进先出队列-栈:

    • LifQueue (last in first out)
    • 算法应用较多
  • 优先级队列:

    • PriorityQueue
    • 自动排序
    • 用户级别
    • 告警级别
女神小结:线程很重要

# 进程
    # Queue
        # 生产者消费者模型
    # JoinableQueue
        # 生产者消费者模型
    # 什么是生产者消费者模型
        # 把一个产生数据并且处理数据的过程解耦
        # 让生产的数据的过程和处理数据的过程达到一个工作效率上的平衡
        # 中间的容器,在多进程中我们使用队列或者可被join的队列,做到控制数据的量
            # 当数据过剩的时候,队列的大小会控制这生产者的行为
            # 当数据严重不足的时候,队列会控制消费者的行为
            # 并且我们还可以通过定期检查队列中元素的个数来调节生产者消费者的个数
        # 比如说:一个爬虫,或者一个web程序的server端
            # 爬虫
                # 请求网页的平均时间是0.3s
                # 处理网页代码的时候是0.003s
                # 100倍,每启动100个线程生产数据
                # 启动一个线程来完成处理数据
            # web程序的server端
                # 每秒钟有6w条请求
                    # 一个服务每s中只能处理2000条
                    # 先写一个web程序,只负责一件事情,就是接收请求,然后把请求放到队列中
                    # 再写很多个server端,从队列中获取请求,然后处理,然后返回结果
# 线程
    # 线程:开销小 数据共享 是进程的一部分,不能独立存在 本身可以利用多核
    # GIL锁
        # 全局解释器锁
        # cpython解释器中的机制
        # 导致了在同一个进程中多个线程不能同时利用多核 —— python的多线程只能是并发不能是并行
    # threading模块
        # 创建线程 :面向函数 面向对象
        # 线程中的几个方法:
            # 属于线程对象t.start(),t.join()
            # 守护线程t.daemon = True 等待所有的非守护子线程都结束之后才结束
                # 非守护线程不结束,主线程也不结束
                # 主线程结束了,主进程也结束
                # 结束顺序 :非守护线程结束 -->主线程结束-->主进程结束
                            #-->主进程结束 --> 守护线程也结束
            # threading模块的函数 :
                # current_thread 在哪个线程中被调用,就返回当前线程的对象
                # 活着的线程,包括主线程
                    # enumerate 返回当前活着的线程的对象列表
                    # active_count 返回当前或者的线程的个数
            # 测试
                # 进程和线程的效率差,线程的开启、关闭、切换效率更高
                # 线程的数据共享的效果
# 操作系统
    # 多道 遇到io会切换
    # 分时 时间片到了会切换
# 进程的重点
    # 进程的特点 :
        # 数据隔离  IPC
        # 开销大
        # 能利用多核
        # 进程之间共享数据的安全问题:Lock
# 生产者消费者模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

# 3.池

  • 预先开启固定个数的进程数,当任务来临的时候,直接交给已经开好的进程去执行

    • 节省了额进程和线程的开启,关闭,切换都需要时间
    • 减轻了操作系统调度的负担
  • concurrent.futures 模块,开启池

# 3.1 进程池

  • ProcessPoolExcutor类

  • 进程池里的进程数一般为cpu个数或者加一

  • 一个池中的进程个数限制了我们程序的并发个数

#基本格式
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(i,name):
    print('start',os.getpid())
    time.sleep(random.randint(1,3))
    print('end', os.getpid())
    return '%s * %s'%(i,os.getpid())  #返回值
if __name__ == '__main__':
    p = ProcessPoolExecutor(5)		#池中进程数
    ret_l = []
    for i in range(10):
        ret = p.submit(func,i,'alex')
        ret_l.append(ret)				#提交任务
    for ret in ret_l:		
        print('ret-->',ret.result())  # ret.result()	取返回值, 同步阻塞
    p.shutdown()  ## 关闭池之后就不能继续提交任务,并且会阻塞,直到已经提交的任务完成
    print('main',os.getpid())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 3.2 线程池

  • ThreadPoolExcutor类

  • 线程池里的线程个数一般为cpu的4到5倍

#基本格式
#类不同,其余格式相同
from concurrent.futures import ThreadPoolExecutor
def func(i):
    print('start', os.getpid())
    time.sleep(random.randint(1,3))
    print('end', os.getpid())
    return '%s * %s'%(i,os.getpid())
tp = ThreadPoolExecutor(20)
# ret = tp.map(func,range(20))
# for i in ret:
#     print(i)
for i in range(10):
    ret = tp.submit(func,i)
tp.shutdown()
print('main')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

检测所有线程结束:

from concurrent.futures import ThreadPoolExecutor,wait


tp = ThreadPoolExecutor(THREAD_POOL_MAX_NUM)
task_list = []
# 读取conf主机列表
for host_item in HOST_LIST:
  task = tp.submit(worker, host_item)
  task_list.append(task)
  wait(task_list,return_when=ALL_COMPLETED) # 阻塞至结束
1
2
3
4
5
6
7
8
9
10

# 3.3 回调函数

  • 执行完子线程任务之后直接调用对应的回调函数
import requests
from concurrent.futures import ThreadPoolExecutor

def func(url):
    """
    线程池
    :return:
    """
    ret=requests.get(url)
    ret=ret.text
    return {'url':url,'ret':ret}
def foo(ret):
    ret=ret.result()
    print(ret['url'])

url=url_lst = [
    'http://www.baidu.com',  
    'http://www.cnblogs.com', 
    'http://www.douban.com',  
    'http://www.alibaba.com',
    'http://www.cnblogs.com/Eva-J/articles/8306047.html',
    'http://www.cnblogs.com/Eva-J/articles/7206498.html',
]
for url in url_lst:
    t=ThreadPoolExecutor(20)
    ret = t.submit(func,url)
    ret.add_done_callback(foo)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  • 是单独开启线程还是池?

    • 如果只是开一个单独的子线程去做一件事,就可以单独开线程

    • 有大量任务等待去做,要求一定的并发,就需要开启线程池去做

    • 根据程序的IO操作判定是否开进程池:

      • 有大量的阻塞io使用池
      • 爬虫大量用到池
  • 是否开启回调函数:

    • 执行完子线程任务之后直接调用对应的回调函数
    • 爬取网页,需要等待数据传输和网络上的响应高IO的不需要回调函数,子线程处理
    • 分析网页,没有什么io操作,这个操作没有必要再子线程完成,交给回调函数
进程和线程都有锁
-所有线程中能工作的基本都不能在进程中工作
-在进程中能使用的基本在线程中也可以使用
1
2
3
#Python#
上次更新: 2023/04/16, 18:35:33
生产者消费者模型
协程

← 生产者消费者模型 协程→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式