刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • Kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • python 操作kafka
      • 基本操作
        • 0. Kafka 命令行
        • 1.连接kafka
        • 2.查看topic
        • 3.consumer
        • 3.1 重要参数
        • 3.2 查看kafka堆积剩余量
        • 4.producer
    • Golang 操作kafka
    • Kafka消费问题记录
    • kafka夺命16问(面试相关)
    • 零拷贝技术
    • kafka分区规则
  • RabbitMQ

  • RocketMQ

  • MQ
  • Kafka
bigox
2022-06-16
目录

python 操作kafka

  • 环境依赖
    • kafka-python producer线程安全, consumer线程不安全
    • pip install kafka-python
    • pykafka 文档: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# 基本操作

# 0. Kafka 命令行


# 启动
zkServer.sh start

# 生产者
./kafka-console-producer.sh --broker-list 10.0.23.106:9092 --topic HX_DJ.DJ_NSRXX_20210919

# 消费者
./kafka-console-consumer.sh --bootstrap-server 10.0.23.106:9092 --topic liussj_test --from-beginning # 从头开始消费

# 新消费者列表查询(支持0.9版本+)
./kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

# 创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 查看所有topic
./kafka-topics.sh --zookeeper localhost:2181 --list

# 查看一个tpoic 详情

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic liussj_test
  
# 删除topic 
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic liussj_test


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 1.连接kafka

from kafka import KafkaClient

client = KafkaClient(
    bootstrap_servers=["10.0.23.106:9092"],
    client_id="test",
)
1
2
3
4
5
6

# 2.查看topic

  • 查看所有topic
from kafka import KafkaConsumer

client = KafkaConsumer(
    bootstrap_servers=["10.0.23.106:9092"],
    client_id="test",
)
print(client.topics()) // set 
1
2
3
4
5
6
7

# 3.consumer

线程不安全:

  1. 多个groupid 相同的consumer 不能同时消费同一个partition.
  2. 多个groupid 不同的consumer 同时消费同一个partition的时候拿到的消息是完全相同的.
  • 简单消费
from kafka import KafkaProducer, KafkaConsumer
import json


def consumer_demo():
    consumer = KafkaConsumer(
        'liussj_test',
        bootstrap_servers='10.0.23.106:9092',
        group_id='test'
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
        )
    )

if __name__ == '__main__':
    consumer_demo()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 3.1 重要参数

  • group_id

    • 高并发量,则需要有多个消费者协作,消费进度,则由group_id统一。例如消费者A与消费者B,在初始化时使用同一个group_id。在进行消费时,一条消息被消费者A消费后,在kafka中会被标记,这条消息不会再被B消费(前提是A消费后正确commit)。
  • key_deserializer, value_deserializer

    • 与生产者中的参数一致,自动解析。
  • auto_offset_reset

    • 消费者启动的时刻,消息队列中或许已经有堆积的未消费消息,有时候需求是从上一次未消费的位置开始读(则该参数设置为earliest),有时候的需求为从当前时刻开始读之后产生的,之前产生的数据不再消费(则该参数设置为latest),其他值报错。
  • enable_auto_commit, auto_commit_interval_ms

    • 是否自动commit,当前消费者消费完该数据后,需要commit,才可以将消费完的信息传回消息队列的控制中心。enable_auto_commit默认设置为True后,消费者将自动commit,并且两次commit的时间间隔为auto_commit_interval_ms。

    • 手动commit:

      consumer.commit()

      def consumer_demo():
          consumer = KafkaConsumer(
              'kafka_demo', 
              bootstrap_servers=':9092',
              group_id='test',
              enable_auto_commit=False
          )
          for message in consumer:
              print("receive, key: {}, value: {}".format(
                  json.loads(message.key.decode()),
                  json.loads(message.value.decode())
                  )
              )
              consumer.commit()
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14

# 3.2 查看kafka堆积剩余量

  • 在线环境中,需要保证消费者的消费速度大于生产者的生产速度,所以需要检测kafka中的剩余堆积量是在增加还是减小。可以用如下代码,观测队列消息剩余量:

    from kafka import KafkaConsumer, TopicPartition
    
    consumer = KafkaConsumer(
        "liussj_test",
        bootstrap_servers='10.0.23.106:9092',
        group_id='test'
    )
    partitions = [TopicPartition("liussj_test", p) for p in consumer.partitions_for_topic('liussj_test')]
    
    print("start to cal offset:")
    
    # total
    toff = consumer.end_offsets(partitions)
    toff = [(key.partition, toff[key]) for key in toff.keys()]
    toff.sort()
    print("total offset: {}".format(str(toff)))
    
    # current
    coff = [(x.partition, consumer.committed(x)) for x in partitions]
    coff.sort()
    print("current offset: {}".format(str(coff)))
    
    # cal sum and left
    toff_sum = sum([x[1] for x in toff])
    cur_sum = sum([x[1] for x in coff if x[1] is not None])
    left_sum = toff_sum - cur_sum
    print("kafka left: {}".format(left_sum))
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27

# 4.producer

线程安全:

  1. 可以多个producer同时往同一个topic中的同一个分区发送消息
  2. 同一个key值,会被送至同一个分区
    • 第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去
    • 第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
    • 第三种分区策略:既没有给定分区号,也没有给定key值,直接轮循进行分区
  • demo
import time

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json


def producer_demo():
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(
        bootstrap_servers=['10.0.23.106:9092'],
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode()
    )
    # 发送三条消息
    for i in range(0, 10000000):
        time.sleep(2)
        future = producer.send(
            'liussj_test',
            key='count_num',  # 同一个key值,会被送至同一个分区
            value=str(i))  # 向分区1发送消息
        print("send {}".format(str(i)))
        try:
            future.get(timeout=10)  # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出kafka_errors
            traceback.format_exc()


if __name__ == '__main__':
    producer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#消息队列#
上次更新: 2023/04/16, 18:35:33
kafka 概述及原理
Golang 操作kafka

← kafka 概述及原理 Golang 操作kafka→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式