kafka 概述及原理
# 消息队列概述
- 消息队列内部实现原理
消息队列模式
点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) 点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,
而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者 接收处理,即使有多个消息监听者也是如此。
发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅 者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即 使当前订阅者不可用,处于离线状态。
# 1. 消息队列的优点
解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风 险。
扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
灵活性 & 峰值处理能力: 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。 如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列 能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
可恢复性:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所 以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
顺序保证:在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且 能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致 的情况。
异步通信: 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要 的时候再去处理它们。
# 2. kafka 架构
概念:
Kafka 是一个分布式消息队列。
Kafka 对消息保存时根据 Topic 进行归类,发送消息 者称为 Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个 实例(server)称为 broker。
无论是 kafka 集群,还是 consumer 都依赖于 zookeeper 集群保存一些 meta 信息, 来保证系统可用性。
架构:
Producer :消息生产者,就是向 kafka broker 发消息的客户端;
Consumer :消息消费者,向 kafka broker 取消息的客户端;
Topic :可以理解为一个队列;
Consumer Group(CG):
- 这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer) 和单播(发给任意一个 consumer)的手段。
- 一个 topic 可以有多个 CG。topic 的消息会复制 (不是真的复制,是概念上的)到所有的 CG,但每个 partion 只会把消息发给该 CG 中的一 个 consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。
- 要实现 单播只要所有的 consumer 在同一个 CG。
- 用 CG 还可以将 consumer 进行自由的分组而不需 要多次发送消息到不同的 topic;
Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic;
Partition:(隔断)为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息 都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给 consumer,不保证一个 topic 的整体(多个 partition 间)的顺序;
第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去 第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区 第三种分区策略:既没有给定分区号,也没有给定key值,直接轮循进行分区
1
2
3Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查 找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就 是 00000000000.kafka。
# Kafka 生产过程分析
# 1. 写入方式
producer 采用推(push)模式将消息发布到 broker,每条消息都被追加(append)到分 区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
# 2. 分区(Partition)
分区的原因
方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
可以提高并发,因为可以以 Partition 为单位读写了。
分区的原则
- 指定了 patition,则直接使用;
- 未指定 patition 但指定 key,通过对 key 的 value 进行 hash 出一个 patition;
- patition 和 key 都未指定,使用轮询选出一个 patition。
Partition
- 对于主题来说每一个Partition都是一个队列
# 3. 写入流程
- producer 先从 zookeeper 的 "/brokers/.../state"节点找到该 partition 的 leader
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
- leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK
# Broker 保存消息
存储方式
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配 置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文 件)
# 1. 存储策略
- 无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
- 基于时间:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
- 需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
# 2. Zookeeper 存储结构
- 注意:producer 不在 zk 中注册,消费者在 zk 中注册。
# Kafka 消费过程分析
- kafka 提供了两套 consumer API:高级 Consumer API 和低级 Consumer API。
# 1. 高级 API
高级 API 优点
- 高级 API 写起来简单
- 不需要自行去管理 offset,系统通过 zookeeper 自行管理。 不需要管理分区,副本等情况,.系统自动管理。
- 消费者断线会自动根据上一次记录在 zookeeper 中的 offset 去接着获取数据(默认设置1 分钟更新一下 zookeeper 中存的 offset)
- 可以使用 group 来区分对同一个 topic 的不同程序访问分离开来(不同的 group 记录不同的 offset,这样不同程序读取同一个 topic 才不会因为 offset 互相影响)
高级 API 缺点
- 不能自行控制 offset(对于某些特殊需求来说)
- 不能细化控制如分区、副本、zk 等
# 2. 低级 API
低级 API 优点
能够让开发者自己控制 offset,想从哪里读取就从哪里读取。
自行控制连接分区,对分区自定义进行负载均衡
对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储 offset 即可,
比如存在文件或者内存中)
低级 API 缺点
- 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等。
# 3.消费者组
- 消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组, 共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个 partition。
- 在图中,有一个由三个消费者组成的 group,有一个消费者读 取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做 某个消费者是某个分区的拥有者。
- 消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个 消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。
- 多个groupid 相同的consumer 不能同时消费同一个partition.
- 多个groupid 不同的consumer 同时消费同一个partition的时候拿到的消息是完全相同的.
# 4.消费方式
- consumer 采用 pull(拉)模式从 broker 中读取数据。
- push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
- pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直等待数据 到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达 的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。
# Kafka Streams
- Kafka Streams。Apache Kafka 开源项目的一个组成部分。是一个功能强大,易于使用的 库。用于在 Kafka 上构建高可分布式、拓展性,容错的应用程序。
# 1. Kafka Streams 特点
功能强大 高扩展性,弹性,容错
轻量级
无需专门的集群
一个库,而不是框架
完全集成
100%的 Kafka 0.10.0 版本兼容
易于集成到现有的应用程序
实时性
- 毫秒级延迟
- 并非微批处理
- 窗口允许乱序数据
- 允许迟到数据
# 2. 为什么要有 Kafka Stream
Spark 和 Storm 都是流式处理框架,而 Kafka Stream 提供的是一个基于 Kafka 的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而 Kafka Stream 作为流式 处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便 使用和调试。
虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,但是这些框架的部 署仍然相对复杂。而 Kafka Stream 作为类库,可以非常方便的嵌入应用程序中,它对应用的 打包和部署基本没有任何要求。
就流式处理系统而言,基本都支持 Kafka 作为数据源。大部分流式系统中都已部署了 Kafka,此时使用 Kafka Stream 的成本非常低。
使用 Storm 或 Spark Streaming 时,需要为框架本身的进程预留资源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即使对于应用实例而言,框架本身也会占 用部分资源,如 Spark Streaming 需要为 shuffle 和 storage 预留内存。但是 Kafka 作为类库不 占用系统资源。
由于 Kafka 本身提供数据持久化,因此 Kafka Stream 提供滚动部署和滚动升级以及重新计算的能力。
Kafka Stream 可以在线动态调整并行度。
# kafka 扩展
# 1. Kafka 与 Flume 比较
flume:cloudera 公司研发:
- 适合多个生产者;
- 适合下游数据消费者不多的情况;
- 适合数据安全性要求不高的操作;
- 适合与 Hadoop 生态圈对接的操作。
kafka:linkedin 公司研发:
- 适合数据下游消费众多的情况;
- 适合数据安全性要求较高的操作,支持 replication(复制)。
因此我们常用的一种模型是:数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS
# 2. Kafka 配置信息
auto.offset.reset
earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
# kafka常用命令
1、启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties
2、停止kafka服务
./kafka-server-stop.sh
3、查看所有的话题
./kafka-topics.sh --list --zookeeper localhost:9092
或者
./kafka-topics.sh --list --bootstrap-server localhost:9092
2
3
4、查看所有话题的详细信息
./kafka-topics.sh --zookeeper localhost:2181 --describe
5、列出指定话题的详细信息
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo
6、删除一个话题
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
7、创建一个叫test的话题,有两个分区,每个分区3个副本
./kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 2
8、测试kafka发送和接收消息(启动两个终端)
#发送消息(注意端口号为配置文件里面的端口号)
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
#消费消息(可能端口号与配置文件保持一致,或与发送端口保持一致)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning #加了--from-beginning 重头消费所有的消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test #不加--from-beginning 从最新的一条消息开始消费
2
3
4
5
9、查看某个topic对应的消息数量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
10、显示所有消费者
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
11、获取正在消费的topic(console-consumer-63307)的group的offset
./kafka-consumer-groups.sh --describe --group console-consumer-63307 --bootstrap-server localhost:9092
11、显示消费者
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list