Golang 操作kafka
旧版本go操作kafka库:github.com/Shopify/sarama
新版本go操作kafka库:github.com/IBM/sarama
# 0. Kafka 命令行
# 启动
zkServer.sh start
# 生产者
./kafka-console-producer.sh --broker-list 10.0.23.106:9092 --topic HX_DJ.DJ_NSRXX_20210919
# 消费者
./kafka-console-consumer.sh --bootstrap-server 10.0.23.106:9092 --topic HX_DJ.DJ_NSRXX_20210919 --from-beginning # 从头开始消费
# 新消费者列表查询(支持0.9版本+)
./kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.0.23.106:9092 --list
# 创建topic
./kafka-topics.sh --create --bootstrap-server 10.0.23.106:9092 --replication-factor 1 --partitions 1 --topic test
# 查看所有topic
./kafka-topics.sh --bootstrap-server 10.0.23.106:9092 --list
# 查看一个tpoic 详情
./kafka-topics.sh --describe --bootstrap-server 10.0.23.106:9092 --topic liussj_test
# 删除topic
./kafka-topics.sh --bootstrap-server 10.0.23.106:9092 --delete --topic liussj_test
# 查看group offset
./kafka-consumer-groups.sh --bootstrap-server 192.168.50.120:9092 --group test_group --describe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 1. 消费者
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍历所有的分区
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 异步从每个分区消费信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 2. 生产者
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// 基于sarama第三方库开发的kafka client
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log")
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 3. 消费者组
code
package main // SIGUSR1 toggle the pause/resume consumption import ( "context" "fmt" "log" "strings" "time" "github.com/IBM/sarama" ) const addr = "192.168.50.120:9092" const topics = "liuss_test" const groupId = "test_group" func main() { // endpoint := "cn-hangzhou-intranet.log.aliyuncs.com" // port := "10011" version := "2.1.0" // project := "test-project" // sls project // topics := "your sls logstore" // sls logstore // accessId := os.Getenv("SLS_ACCESS_KEY_ID") // aliyun accessId // accessKey := os.Getenv("SLS_ACCESS_KEY_SECRET") // aliyun accessKeySecret // group := "test-groupId" // consume group name log.Println("Starting a new Sarama consumer") vs, err := sarama.ParseKafkaVersion(version) if err != nil { log.Panicf("Error parsing Kafka version: %v", err) } /** * Construct a new Sarama confuration. * The Kafka cluster version has to be defined before the consumer/producer is initialized. */ brokers := []string{addr} conf := sarama.NewConfig() conf.Version = vs // conf.Net.TLS.Enable = true // conf.Net.SASL.Enable = true // conf.Net.SASL.User = project // conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey) // conf.Net.SASL.Mechanism = "PLAIN" conf.Consumer.Fetch.Min = 1 conf.Consumer.Fetch.Default = 1024 * 1024 conf.Consumer.Retry.Backoff = 2 * time.Second conf.Consumer.MaxWaitTime = 250 * time.Millisecond conf.Consumer.MaxProcessingTime = 100 * time.Millisecond conf.Consumer.Return.Errors = false conf.Consumer.Offsets.AutoCommit.Enable = false // 是否自动提交offset conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second conf.Consumer.Offsets.Initial = sarama.OffsetOldest conf.Consumer.Offsets.Retry.Max = 3 /** * Setup a new Sarama consumer group */ consumer := Consumer{ ready: make(chan bool), } ctx, _ := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(brokers, groupId, conf) if err != nil { log.Panicf("Error creating consumer group client: %v", err) } go func() { for { fmt.Println("start consume") // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil { log.Panicf("Error from consumer: %v", err) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { return } consumer.ready = make(chan bool) } }() <-consumer.ready // Await till the consumer has been set up log.Println("Sarama consumer up and running!...") select {} } func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) { if *isPaused { client.ResumeAll() log.Println("Resuming consumption") } else { client.PauseAll() log.Println("Pausing consumption") } *isPaused = !*isPaused } // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool } // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready close(consumer.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29 for { select { case message := <-claim.Messages(): realUnixTimeSeconds := message.Timestamp.Unix() if realUnixTimeSeconds < 2000000 { realUnixTimeSeconds = message.Timestamp.UnixMicro() / 1000 } // session.Commit() log.Printf("Message claimed: value = %s, timestamp = %d, topic = %s\n", string(message.Value), realUnixTimeSeconds, message.Topic) log.Printf("Message claimed: offset = %s", message.Offset) session.MarkMessage(message, "") session.Commit() // 手动提交offset // Should return when `session.Context()` is done. // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see: // https://github.com/IBM/sarama/issues/1192 case <-session.Context().Done(): return nil } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# 4. 高阶使用
- https://zhuanlan.zhihu.com/p/412869212
- https://www.kancloud.cn/goodday/rizhi/2003291
上次更新: 2023/08/27, 21:33:49