刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • Kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • python 操作kafka
    • Golang 操作kafka
      • 0. Kafka 命令行
      • 1. 消费者
      • 2. 生产者
      • 3. 消费者组
      • 4. 高阶使用
    • Kafka消费问题记录
    • kafka夺命16问(面试相关)
    • 零拷贝技术
    • kafka分区规则
  • RabbitMQ

  • RocketMQ

  • MQ
  • Kafka
bigox
2022-06-16
目录

Golang 操作kafka

旧版本go操作kafka库:github.com/Shopify/sarama

新版本go操作kafka库:github.com/IBM/sarama

# 0. Kafka 命令行

# 启动
zkServer.sh start

# 生产者
./kafka-console-producer.sh --broker-list 10.0.23.106:9092 --topic HX_DJ.DJ_NSRXX_20210919

# 消费者
./kafka-console-consumer.sh --bootstrap-server 10.0.23.106:9092 --topic HX_DJ.DJ_NSRXX_20210919 --from-beginning # 从头开始消费

# 新消费者列表查询(支持0.9版本+)
./kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.0.23.106:9092 --list

# 创建topic
./kafka-topics.sh --create --bootstrap-server 10.0.23.106:9092  --replication-factor 1 --partitions 1 --topic test

# 查看所有topic
./kafka-topics.sh --bootstrap-server 10.0.23.106:9092 --list

# 查看一个tpoic 详情

./kafka-topics.sh --describe --bootstrap-server 10.0.23.106:9092 --topic liussj_test
  
# 删除topic 
./kafka-topics.sh --bootstrap-server 10.0.23.106:9092  --delete --topic liussj_test

# 查看group offset
./kafka-consumer-groups.sh --bootstrap-server 192.168.50.120:9092 --group test_group --describe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 1. 消费者

package main

import (
    "fmt"

    "github.com/IBM/sarama"
)

// kafka consumer

func main() {
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
    }
    partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
    if err != nil {
        fmt.Printf("fail to get list of partition:err%v\n", err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList { // 遍历所有的分区
        // 针对每个分区创建一个对应的分区消费者
        pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
            return
        }
        defer pc.AsyncClose()
        // 异步从每个分区消费信息
        go func(sarama.PartitionConsumer) {
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
            }
        }(pc)
    }
}	
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 2. 生产者

package main

import (
    "fmt"

    "github.com/IBM/sarama"
)

// 基于sarama第三方库开发的kafka client

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

    // 构造一个消息
    msg := &sarama.ProducerMessage{}
    msg.Topic = "web_log"
    msg.Value = sarama.StringEncoder("this is a test log")
    // 连接kafka
    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Println("producer closed, err:", err)
        return
    }
    defer client.Close()
    // 发送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 3. 消费者组

  • code

    package main
    
    // SIGUSR1 toggle the pause/resume consumption
    import (
    	"context"
    	"fmt"
    	"log"
    	"strings"
    	"time"
    
    	"github.com/IBM/sarama"
    )
    
    const addr = "192.168.50.120:9092"
    const topics = "liuss_test"
    const groupId = "test_group"
    
    func main() {
    
    	// endpoint := "cn-hangzhou-intranet.log.aliyuncs.com"
    	// port    := "10011"
    	version := "2.1.0"
    	// project   := "test-project"                   // sls project
    	// topics    := "your sls logstore"              // sls logstore
    	// accessId  := os.Getenv("SLS_ACCESS_KEY_ID")                     // aliyun accessId
    	// accessKey := os.Getenv("SLS_ACCESS_KEY_SECRET")                 // aliyun accessKeySecret
    	// group     := "test-groupId"                   // consume group name
    
    	log.Println("Starting a new Sarama consumer")
    
    	vs, err := sarama.ParseKafkaVersion(version)
    	if err != nil {
    		log.Panicf("Error parsing Kafka version: %v", err)
    	}
    
    	/**
    	 * Construct a new Sarama confuration.
    	 * The Kafka cluster version has to be defined before the consumer/producer is initialized.
    	 */
    	brokers := []string{addr}
    
    	conf := sarama.NewConfig()
    	conf.Version = vs
    
    	// conf.Net.TLS.Enable = true
    	// conf.Net.SASL.Enable = true
    	// conf.Net.SASL.User = project
    	// conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
    	// conf.Net.SASL.Mechanism = "PLAIN"
    
    	conf.Consumer.Fetch.Min = 1
    	conf.Consumer.Fetch.Default = 1024 * 1024
    	conf.Consumer.Retry.Backoff = 2 * time.Second
    	conf.Consumer.MaxWaitTime = 250 * time.Millisecond
    	conf.Consumer.MaxProcessingTime = 100 * time.Millisecond
    	conf.Consumer.Return.Errors = false
    	conf.Consumer.Offsets.AutoCommit.Enable = false // 是否自动提交offset
    	conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
    	conf.Consumer.Offsets.Initial = sarama.OffsetOldest
    	conf.Consumer.Offsets.Retry.Max = 3
    
    	/**
    	 * Setup a new Sarama consumer group
    	 */
    	consumer := Consumer{
    		ready: make(chan bool),
    	}
    
    	ctx, _ := context.WithCancel(context.Background())
    	client, err := sarama.NewConsumerGroup(brokers, groupId, conf)
    	if err != nil {
    		log.Panicf("Error creating consumer group client: %v", err)
    	}
    
    	go func() {
    		for {
    			fmt.Println("start consume")
    			// `Consume` should be called inside an infinite loop, when a
    			// server-side rebalance happens, the consumer session will need to be
    			// recreated to get the new claims
    			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
    				log.Panicf("Error from consumer: %v", err)
    			}
    			// check if context was cancelled, signaling that the consumer should stop
    			if ctx.Err() != nil {
    				return
    			}
    			consumer.ready = make(chan bool)
    		}
    	}()
    
    	<-consumer.ready // Await till the consumer has been set up
    	log.Println("Sarama consumer up and running!...")
    	select {}
    }
    
    func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
    	if *isPaused {
    		client.ResumeAll()
    		log.Println("Resuming consumption")
    	} else {
    		client.PauseAll()
    		log.Println("Pausing consumption")
    	}
    
    	*isPaused = !*isPaused
    }
    
    // Consumer represents a Sarama consumer group consumer
    type Consumer struct {
    	ready chan bool
    }
    
    // Setup is run at the beginning of a new session, before ConsumeClaim
    func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    	// Mark the consumer as ready
    	close(consumer.ready)
    	return nil
    }
    
    // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
    func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    	return nil
    }
    
    // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
    func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    	// NOTE:
    	// Do not move the code below to a goroutine.
    	// The `ConsumeClaim` itself is called within a goroutine, see:
    	// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
    	for {
    		select {
    		case message := <-claim.Messages():
    			realUnixTimeSeconds := message.Timestamp.Unix()
    			if realUnixTimeSeconds < 2000000 {
    				realUnixTimeSeconds = message.Timestamp.UnixMicro() / 1000
    			}
    			// session.Commit()
    
    			log.Printf("Message claimed: value = %s, timestamp = %d, topic = %s\n", string(message.Value), realUnixTimeSeconds, message.Topic)
    			log.Printf("Message claimed: offset =  %s", message.Offset)
    			session.MarkMessage(message, "")
    			session.Commit() // 手动提交offset
    
    		// Should return when `session.Context()` is done.
    		// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
    		// https://github.com/IBM/sarama/issues/1192
    		case <-session.Context().Done():
    			return nil
    		}
    	}
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154

# 4. 高阶使用

  • https://zhuanlan.zhihu.com/p/412869212
  • https://www.kancloud.cn/goodday/rizhi/2003291
#消息队列#
上次更新: 2023/08/27, 21:33:49
python 操作kafka
Kafka消费问题记录

← python 操作kafka Kafka消费问题记录→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式