python 操作kafka
- 环境依赖
- kafka-python producer线程安全, consumer线程不安全
pip install kafka-python
- pykafka 文档: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
# 基本操作
# 0. Kafka 命令行
# 启动
zkServer.sh start
# 生产者
./kafka-console-producer.sh --broker-list 10.0.23.106:9092 --topic HX_DJ.DJ_NSRXX_20210919
# 消费者
./kafka-console-consumer.sh --bootstrap-server 10.0.23.106:9092 --topic liussj_test --from-beginning # 从头开始消费
# 新消费者列表查询(支持0.9版本+)
./kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
# 创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查看所有topic
./kafka-topics.sh --zookeeper localhost:2181 --list
# 查看一个tpoic 详情
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic liussj_test
# 删除topic
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic liussj_test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 1.连接kafka
from kafka import KafkaClient
client = KafkaClient(
bootstrap_servers=["10.0.23.106:9092"],
client_id="test",
)
1
2
3
4
5
6
2
3
4
5
6
# 2.查看topic
- 查看所有topic
from kafka import KafkaConsumer
client = KafkaConsumer(
bootstrap_servers=["10.0.23.106:9092"],
client_id="test",
)
print(client.topics()) // set
1
2
3
4
5
6
7
2
3
4
5
6
7
# 3.consumer
线程不安全:
- 多个groupid 相同的consumer 不能同时消费同一个partition.
- 多个groupid 不同的consumer 同时消费同一个partition的时候拿到的消息是完全相同的.
- 简单消费
from kafka import KafkaProducer, KafkaConsumer
import json
def consumer_demo():
consumer = KafkaConsumer(
'liussj_test',
bootstrap_servers='10.0.23.106:9092',
group_id='test'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
if __name__ == '__main__':
consumer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 3.1 重要参数
group_id
- 高并发量,则需要有多个消费者协作,消费进度,则由group_id统一。例如消费者A与消费者B,在初始化时使用同一个group_id。在进行消费时,一条消息被消费者A消费后,在kafka中会被标记,这条消息不会再被B消费(前提是A消费后正确commit)。
key_deserializer, value_deserializer
- 与生产者中的参数一致,自动解析。
auto_offset_reset
- 消费者启动的时刻,消息队列中或许已经有堆积的未消费消息,有时候需求是从上一次未消费的位置开始读(则该参数设置为earliest),有时候的需求为从当前时刻开始读之后产生的,之前产生的数据不再消费(则该参数设置为latest),其他值报错。
enable_auto_commit, auto_commit_interval_ms
是否自动commit,当前消费者消费完该数据后,需要commit,才可以将消费完的信息传回消息队列的控制中心。enable_auto_commit默认设置为True后,消费者将自动commit,并且两次commit的时间间隔为auto_commit_interval_ms。
手动commit:
consumer.commit()
def consumer_demo(): consumer = KafkaConsumer( 'kafka_demo', bootstrap_servers=':9092', group_id='test', enable_auto_commit=False ) for message in consumer: print("receive, key: {}, value: {}".format( json.loads(message.key.decode()), json.loads(message.value.decode()) ) ) consumer.commit()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 3.2 查看kafka堆积剩余量
在线环境中,需要保证消费者的消费速度大于生产者的生产速度,所以需要检测kafka中的剩余堆积量是在增加还是减小。可以用如下代码,观测队列消息剩余量:
from kafka import KafkaConsumer, TopicPartition consumer = KafkaConsumer( "liussj_test", bootstrap_servers='10.0.23.106:9092', group_id='test' ) partitions = [TopicPartition("liussj_test", p) for p in consumer.partitions_for_topic('liussj_test')] print("start to cal offset:") # total toff = consumer.end_offsets(partitions) toff = [(key.partition, toff[key]) for key in toff.keys()] toff.sort() print("total offset: {}".format(str(toff))) # current coff = [(x.partition, consumer.committed(x)) for x in partitions] coff.sort() print("current offset: {}".format(str(coff))) # cal sum and left toff_sum = sum([x[1] for x in toff]) cur_sum = sum([x[1] for x in coff if x[1] is not None]) left_sum = toff_sum - cur_sum print("kafka left: {}".format(left_sum))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 4.producer
线程安全:
- 可以多个producer同时往同一个topic中的同一个分区发送消息
- 同一个key值,会被送至同一个分区
- 第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去
- 第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
- 第三种分区策略:既没有给定分区号,也没有给定key值,直接轮循进行分区
- demo
import time
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
def producer_demo():
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
producer = KafkaProducer(
bootstrap_servers=['10.0.23.106:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode()
)
# 发送三条消息
for i in range(0, 10000000):
time.sleep(2)
future = producer.send(
'liussj_test',
key='count_num', # 同一个key值,会被送至同一个分区
value=str(i)) # 向分区1发送消息
print("send {}".format(str(i)))
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
if __name__ == '__main__':
producer_demo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
上次更新: 2023/04/16, 18:35:33