刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • Kafka

    • 消息队列MQ总结
      • MQ作用
      • MQ优缺点
      • 常见MQ对比
      • MQ高可用性
        • 1. RabbitMQ高可用性
        • 2. kafka实现高可用
      • 重复消费问题
      • 数据丢失问题
        • 1. 生产端丢消息问题解决
        • 2. 消费端消息丢失问题解决
      • 消息顺序保证
        • 1. RabbitMQ保证消息顺序性
        • 2. Kafka保证消息消息顺序性
      • 消息积压问题
        • 1. 积压原因
        • a. 消息体过大
        • b. 路由规则不合理
        • c. 批量操作引起的连锁反应
        • d. 表过大
        • 2.积压应对办法
        • a. 场景一: 积压大量消息
        • b.场景二: 大量消息积压+TTL
      • 如何设计一个消息中间件架构?
    • kafka 概述及原理
    • python 操作kafka
    • Golang 操作kafka
    • Kafka消费问题记录
    • kafka夺命16问(面试相关)
    • 零拷贝技术
    • kafka分区规则
  • RabbitMQ

  • RocketMQ

  • MQ
  • Kafka
bigox
2022-07-04
目录

消息队列MQ总结

# MQ作用

  1. 解耦
  2. 异步
  3. 削锋

# MQ优缺点

  • 优点

    • 解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

    • 冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风 险。

    • 扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。

    • 灵活性 & 峰值处理能力: 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。 如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列 能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

      • 可恢复性:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所 以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

      • 顺序保证:在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且 能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)

      • 缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致 的情况。

      • 异步通信: 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要 的时候再去处理它们。

  • 缺点

    • 系统可用性降低:系统引入的外部依赖越多,越容易挂掉,本来你就是A系统调用BCD三个系统接口就好了,人家ABCD四个系统好好的,没啥问题,这个时候却加入了MQ进来,万一MQ挂了怎么办?MQ挂了整套系统也会崩溃了。

    • 系统复杂性提高:硬生生加个MQ进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?

    • 一致性问题:A系统处理完了直接返回成功了,人都以为你的请求成功了,但是问题是,要在BCD三个系统中,BD两个系统写库成功了,结果C系统写库失败了,这样就会存在数据不一致的问题。

所以说消息队列实际上是一种复杂的架构,引入它有好多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,最后发现系统复杂性提升了一个数量级,也许是复杂10倍,但是关键时刻,用还是得用。

# 常见MQ对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,吞吐量比RocketMQ和Kafka要低一个数量级 万级,吞吐量比RocketMQ和Kafka要低一个数量级 10万级,RocketMQ也是可以支撑高吞吐的一种MQ 10万级1这是kafka最大的优点,就是吞吐量高。一般配置和数据类的系统进行实时数据计算、日志采集等场景
时效性 ms级 微妙级,这是RabbitMQ的一大特点,就是延迟最低 ms级 延迟在ms级内
可用性 基于主从架构实现高可用 高,基于主从架构实现高可用 非常高,分布式架构 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机后,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 消息不丢失 经过参数优化配置,可以做到0丢失 经过参数优化配置可以做到0丢失
核心特点 MQ领域的功能及其完备 基于Erlang开发,所以并发能力强,性能及其好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是实时上的标准。
非常成熟,功能强大,在业内大量公司以及项目都有应用。 但是偶尔消息丢失的概率,并且现在社区以及国内应用都越来越少,官方社区对ActiveMQ5.X维护越来越少,而且确实主要是基于解耦和异步来用的,较少在大规模吞吐场景中使用 erlang语言开发的,性能及其好,延时很低。而且开源的版本,就提供的管理界面非常棒,在国内一些互联网公司近几年用RabbitMQ也是比较多一些,特别适用于中小型的公司 缺点显而易见,就是吞吐量会低一些,这是因为它做的实现机制比较中,因为使用erlang开发,目前没有多少公司使用其开发。所以针对源码界别的定制,非常困难,因此公司的掌控非常弱,只能依赖于开源社区的维护。 接口简单易用,毕竟在阿里大规模应用过,有阿里平台保障,日处理消息上 百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是OK的,还可以支撑大规模的topic数量,支持复杂MQ业务场景。 仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级别的延迟,极高的可用性以及可靠性,分布式可以任意扩展。 同时kafka最好是支撑较少的topic数量即可,保证其超高的吞吐量。
  • 一般的业务要引入MQ,最早大家都是用ACviceMQ,但是现在大家用的不多了,没有经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了,不太图鉴使用

  • RabbitMQ后面被大量的中小型公司所使用,但是erlang语言阻碍了大量的Java工程师深入研究和掌握它,对公司而言,几乎处于不可控的状态,但是RabbitMQ目前开源稳定,活跃度也表较高。

  • RocketMQ是阿里开源的一套消息中间件,目前也已经经历了天猫双十一,同时底层使用Java进行开发

  • 如果中小型企业技术实力一般,技术挑战不是很高,可以推荐,RabbitMQ。如果公司的基础研发能力很强,想精确到源码级别的掌握,那么推荐使用RocketMQ。同时如果项目是聚焦于大数据领域的实时计算,日志采集等场景,那么Kafka是业内标准。

# MQ高可用性

# 1. RabbitMQ高可用性

RabbitMQ 三种模式:单机模式,普通集群模式,镜像集群模式

  • 普通集群模式

    • 意思就是在多台机器上启动多个RabbitMQ实例,每台机器启动一个,但是创建的Queue,只会放在一个RabbitMQ实例上,但是每个实例都同步queue元数据,在消费的时候,实际上是连接到另外一个实例上,那么这个实例会从queue所在实例上拉取数据过来,这种方式确实很麻烦,也不怎么好,没做到所谓的分布式 ,就是个普通集群。因为这导致你要么消费每次随机连接一个实例,然后拉取数据,要么固定连接那个queue所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。

      • 而且如果那个放queue的实例宕机了,会导致接下来其它实例无法从那个实例拉取,如果 你开启了消息持久化,让rabbitmq落地存储消息的话,消息不一定会丢,得等到这个实例恢复了,然后才可以继续从这个queue拉取数据。

        image-20200420091806944

      • 这里没有什么所谓的高可用性可言,这个方案主要就是为了解决吐吞量,就是集群中的多个节点来服务于某个queue的读写操作。

      • 存在两个缺点

        • 可能会在RabbitMQ中存在大量的数据传输
        • 可用性没有什么保障,如果queue所在的节点宕机,就会导致queue的消息丢失
  • 集群镜像模式

    • RabbitMQ的高可用模式,和普通的集群模式不一样的是,创建的queue无论元数据还是queue里的消息都会存在与多个实例中,然后每次你写消息到queu的时候,都会自动把消息推送到多个实例的queue中进行消息同步。
      • 优点: 任何一个机器宕机了,别的机器都可以用。

      • 缺点性能开销提升,消息同步所有的机器,导致网络带宽压力和消耗增加,第二就是没有什么扩展性科研,如果某个queue负载很重,你加机器,新增的机器也包含了这个queue的所有数据,并没有办法线性扩展你的queue

      • 那么如何开启集群镜像策略呢?就是在RabbitMQ的管理控制台,新增一个策略,这个策略就是镜像集群模式下的策略,指定的时候,可以要求数据同步到所有的节点,也可以要求就 同步到指定数量的节点,然后再次创建queue的时候,应用这个策略,就会自动将数据同步到其它节点上去了。 image-20200420102752707

      • 集群镜像模式下,任何一个节点宕机了都是没问题的,因为其他节点还包含了这个queue的完整的数据,别的consumer可以到其它活着的节点上消费数据。

      • 但是这个模式还存在问题:就是不是分布式的,如果这个queue的数据量很大,大到这个机器上的容量无法容纳的时候,此时应该怎么办呢?

# 2. kafka实现高可用

  • kafka一个最基本的架构认识:多个broker组件,每个broker是一个节点,你创建一个topic,这个topic可以划分成多个partition,每个partition可以存在于不同的broker上,每个partition就放一部分数据。是天然的分布式消息队列,就是说一个topic的数据,是分散在多个机器上的,每个机器上就放一部分数据。

    image-20200420104251328

  • 实际上RabbitMQ之类的,并不是分布式消息队列,他就是传统的消息队列,只不过提供了一些集群、HA的机制而已,因为无论怎么玩,RabbitMQ一个queue的数据都放在一个节点里了,镜像集群下,也是每个节点都放这个queu的完整数据。

  • kafka在0.8版本后,提供了HA机制,就是replica副本机制,每个partition的数据都会同步到其它机器上,形成自己的多个replica副本,然后所有的replica就是follower,写的时候,leader会负责数据都同步到所有的follower上,读的时候就直接读取leader上的数据即可。只能读写leader?很简单,要是你能随意读写每个follower,那么就需要保证数据一致性的问题,系统复杂度太高,很容易出问题,kafka会均匀的将一个partition的所有replica分布在不同的机器上,这样才能够提高容错性

  • 每个副本不会存储节点的全部数据,而是数据可能分布在不同的机器上。

    image-20220622105210161

  • 同时多个副本中,会选取一个作为leader,其它的副本是作为follower,并且只有leader能对外提供读写,同时leader在写入数据后,它还会把全部的数据同步到follower中,保证数据的备份。

  • 此时,高可用的架构就出来了,假设现在某个机器宕机了,比如其中的一个leader宕机了,但是因为每个leader下还有多个follower,并且每个follower都进行了数据的备份,因此kafka会自动感知leader已经宕机,同时将其它的follower给选举出来,作为新的leader,并向外提供服务支持。

# 重复消费问题

  • 以kafka举例

    • kafka消费消息时支持三种模式:

      • at most once模式 最多一次。保证每一条消息commit成功之后,再进行消费处理。消息可能会丢失,但不会重复。

      • at least once模式 至少一次。保证每一条消息处理成功之后,再进行commit。消息不会丢失,但可能会重复。

      • exactly once模式 精确传递一次。将offset作为唯一id与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。

    • 消费者从节点读取到数据后处理完业务再去修改偏移量 这样可以确保数据不会丢失, 但是有可能出现重复消费, 在后续修改偏移量的时候可能失败; consumer在从broker读取消息后等消费完再commit,如果consumer还没来得及消费或消费时crash,导致offset未提交,该consumer下一次读取的开始位置会跟上一次commit之后的开始位置相同,导致重复消费问题

    • kafka默认的模式是at least once,但这种模式可能会产生重复消费的问题,所以我们的业务逻辑必须做幂等设计。

  • 可以通过将每次消费的数据的唯一标识存入Redis中,每次消费前先判断该条消息是否在Redis中,如果有则不再消费,如果没有再消费,消费完再将该条记录的唯一标识存入Redis中,并设置失效时间,防止Redis数据过多、垃圾数据问题。

  • 在偏移量存入redis, 每次消费前判断redis的偏移量(双重保证,看业务接收程度 )

  • 基于数据库唯一键来保证重复数据不会重复插入多条, 业务端保持幂等机制, 而我们的业务场景保存数据时使用了INSERT INTO ...ON DUPLICATE KEY UPDATE语法,不存在时插入,存在时更新,是天然支持幂等性的。

# 数据丢失问题

  1. producer把消息发送给broker,因为网络抖动,消息没有到达broker,且开发人员无感知。

    解决方案:

    producer设置acks参数,消息同步到master之后返回ack信号,否则抛异常使应用程序感知到并在业务中进行重试发送。这种方式一定程度保证了消息的可靠性,producer等待broker确认信号的时延也不高。

  2. producer把消息发送给broker-master,master接收到消息,在未将消息同步给follower之前,挂掉了,且开发人员无感知。

    解决方案:

    producer设置acks参数,消息同步到master且同步到所有follower之后返回ack信号,否则抛异常使应用程序感知到并在业务中进行重试发送。这样设置,在更大程度上保证了消息的可靠性,缺点是producer等待broker确认信号的时延比较高。

  3. producer把消息发送给broker-master,master接收到消息,master未成功将消息同步给每个follower,有消息丢失风险。

    解决方案:

    同2.

  4. 某个broker消息尚未从内存缓冲区持久化到磁盘,就挂掉了,这种情况无法通过ack机制感知。

    解决方案:

    设置参数,加快消息持久化的频率,能在一定程度上减少这种情况发生的概率。但提高频率自然也会影响性能。

  5. consumer成功拉取到了消息,consumer挂了, consumer写入失败等.

    解决方案:

    设置手动sync,消费成功才提交。

  • 意见
    • producer端确认消息是否到达集群,若有异常,进行重发。
    • consumer端保障消费幂等性。

# 1. 生产端丢消息问题解决

producer设置acks参数,消息同步到master之后返回ack信号,否则抛异常使应用程序感知到并在业务中进行重试发送。

  • go 生产者设置

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // -1
    
    1
    2

# 2. 消费端消息丢失问题解决

设置手动sync,消费成功才提交。

  • 通常消费端丢消息都是因为Offset自动提交了,但是数据并没有插入到mysql(比如出现BUG或者进程Crash),导致下一次消费者重启后,消息漏掉了,自然数据库中也查不到。这个时候,我们可以通过手动提交解决,甚至在一些复杂场景下,还要使用二阶段提交。

# 消息顺序保证

  • 为什么需要保证消息顺序?
    • 在mysql里增删改一条数据,对应出来的增删改3条binlog,接着这三条binlog发送到MQ里面,到消费出来依次执行,这个时候起码得保证能够顺序执行,不然本来是:增加、修改、删除,然后被换成了:删除、修改、增加,不全错了?

# 1. RabbitMQ保证消息顺序性

  • abbitMQ:拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦,或者就是一个queue,但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。

# 2. Kafka保证消息消息顺序性

  1. kafka的topic是无序的,但是一个topic包含多个partition,每个partition内部是有序的。

img

  1. 只要保证生产者写消息时,按照一定的规则写到同一个partition,不同的消费者读不同的partition的消息,就能保证生产和消费者消息的顺序。

    我们刚开始就是这么做的,同一个商户编号的消息写到同一个partition,topic中创建了4个partition,然后部署了4个消费者节点,构成消费者组,一个partition对应一个消费者节点。从理论上说,这套方案是能够保证消息顺序的。

    img

# 消息积压问题

消息的数量越来越大,导致消费者处理不过来,经常出现消息积压的情况。对于需要实时的数据来说是致命的.

# 1. 积压原因

# a. 消息体过大

  • 虽说kafka号称支持百万级的TPS,但从producer发送消息到broker需要一次网络IO,broker写数据到磁盘需要一次磁盘IO(写操作),consumer从broker获取消息先经过一次磁盘IO(读操作),再经过一次网络IO。

  • 一次简单的消息从生产到消费过程,需要经过2次网络IO和2次磁盘IO。如果消息体过大,势必会增加IO的耗时,进而影响kafka生产和消费的速度。消费者速度太慢的结果,就会出现消息积压情况。

    image-20220524152608381

解决办法:

尽可能存入关键信息, 优化数据结构, 使消息对象尽可能简洁

# b. 路由规则不合理

  • 路由规则设置不合理会导致消息到parttion不均衡, 有的 partition 消息过多,造成数据积压

    image-20220524163857897
  • 优化路由理想数据分布

    image-20220524163955898

# c. 批量操作引起的连锁反应

批量插入到partition(税务征期或者活动促销等原因)

  1. 不要盲目增加消费并发数, 会导致节点崩溃, 下游系统团队多线程调用接口一定要做压测。
  2. 批量操作一定提前通知下游系统
  3. 对消息积压情况加监控。

# d. 表过大

  • 数据表过大到时数据查询和保存耗时增加;
  • 建议将多余数据归档处理, 数据表保存近期数据;

# 2.积压应对办法

# a. 场景一: 积压大量消息

几千万的消息积压在MQ中七八个小时,这也是一个真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复consumer,让他恢复消费速度,然后傻傻的等待几个小时消费完毕,但是很显然这是一种比较不机智的做法。

假设1个消费者1秒消费1000条,1秒3个消费者能消费3000条,一分钟就是18万条,1000万条也需要花费1小时才能够把消息处理掉,这个时候在设备允许的情况下,如何才能够快速处理积压的消息呢?

  • 一般这个时候,只能够做紧急的扩容操作了,具体操作步骤和思路如下所示:

    • 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停止

    • 临时建立好原先10倍或者20倍的queue数量

    • 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue

    • 接着临时征用10倍机器来部署consumer,每一批consumer消费一个临时queue的数据

    • 这种做法相当于临时将queue资源和consumer资源扩大了10倍,以正常的10倍速度

      image-20200420160304030

  • 也就是让消费者把消息,重新写入MQ中,然后在用 10倍的消费者来进行消费。

    image-20200420160319662

# b.场景二: 大量消息积压+TTL

假设你用的是RabbitMQ,RabbitMQ是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间,就会被RabbitMQ给清理掉,这个数据就没了。这个时候就不是数据被大量积压的问题,而是大量的数据被直接搞丢了。

  • 这种情况下,就不是说要增加consumer消费积压的消息,因为实际上没有啥积压的,而是丢了大量的消息,我们可以采取的一个方案就是,批量重导,这个之前线上也有遇到类似的场景,就是大量的消息积压的时候,然后就直接丢弃了数据,然后等高峰期过了之后,例如在晚上12点以后,就开始写程序,将丢失的那批数据,写个临时程序,一点点查询出来,然后重新 添加MQ里面,把白天丢的数据,全部补回来。

  • 假设1万个订单积压在MQ里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单查询出来,然后手动发到MQ里面去再补一次。

# 如何设计一个消息中间件架构?

  • 首先MQ得支持可伸缩性,那就需要快速扩容,就可以增加吞吐量和容量,可以设计一个分布式的系统,参考kafka的设计理念,broker - > topic -> partition,每个partition放一台机器,那就存一部分数据,如果现在资源不够了,可以给topic增加partition,然后做数据迁移,增加机器,不就可以存放更多的数据,提高更高的吞吐量。
  • 其次得考虑一下这个MQ的数据要不要落地磁盘?也就是需不需要保证消息持久化,因为这样可以保证数据的不丢失,那落地盘的时候怎么落?顺序写,这样没有磁盘随机读写的寻址开销,磁盘顺序读的性能是很高的,这就是kafka的思路。
  • 其次需要考虑MQ的可用性?这个可以具体到我们上面提到的消息队列保证高可用,提出了多副本 ,leader 和follower模式,当一个leader宕机的时候,马上选取一个follower作为新的leader对外提供服务。
  • 需不需要支持数据0丢失?可以参考kafka零丢失方案
#消息队列#
上次更新: 2023/04/16, 18:35:33
kafka 概述及原理

kafka 概述及原理→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式