刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • Kafka

  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
      • docker安装 rabbitMQ
      • Golang操作rabbitMq
        • 1. productor 生产者
        • 2. consumer 消费者
  • RocketMQ

  • MQ
  • RabbitMQ
bigox
2022-06-16
目录

Go操作rabbitmq

# docker安装 rabbitMQ

  • docker 安装

    docker pull rabbitmq:management-alpine
    docker run -d --hostname my-rabbit --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=1234abcd -p15672:15672 -p5672:5672 rabbitmq:management-alpine
    
    1
    2
  • web页面: 127.0.0.1:15672

  • 创建direct模式的exchange: testExchange.testRouter

    image-20220507164701020
  • 创建queue : testExchange.testRouter.testBinding

    image-20220507164914493
  • 绑定queue和exchange

    image-20220507165602708

# Golang操作rabbitMq

go get github.com/streadway/amqp

# 1. productor 生产者

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const (
	// RabbitURL : rabbitmq服务的入口url
	RabbitURL = "amqp://admin:1234abcd@127.0.0.1:5672/"
	// TransExchangeName : 用于文件transfer的交换机
	TransExchangeName = "testExchange.testRouter"
	// TransOSSQueueName : oss转移队列名
	TransOSSQueueName = "testExchange.testRouter.testBinding"
	// TransOSSErrQueueName : oss转移失败后写入另一个队列的队列名
	TransOSSErrQueueName = "testExchange.testRouter.testBinding.err"
	// TransOSSRoutingKey : routingkey
	TransOSSRoutingKey = "testBinding"
)

var conn *amqp.Connection
var rChannel *amqp.Channel

// 如果异常关闭,会接收通知
var notifyClose chan *amqp.Error

func init() {
	if initAMQPChan() {
		rChannel.NotifyClose(notifyClose)
	}
	// 断线自动重连
	go func() {
		for {
			select {
			case msg := <-notifyClose:
				conn = nil
				rChannel = nil
				log.Printf("onNotifyChannelClosed: %+v\n", msg)
				initAMQPChan()
			}
		}
	}()
}

func initAMQPChan() bool {
	// 判断 rChannel 是否创建
	if rChannel != nil {
		return true
	}
	// 获取连接
	var err error
	conn, err = amqp.Dial(RabbitURL)
	if err != nil {
		log.Println(err.Error())
		return false
	}
	// 创建 rChannel
	rChannel, err = conn.Channel()
	if err != nil {
		log.Println(err.Error())
		return false
	}
	return true
}

// Publish : 发布消息
func Publish(exchange, routingKey string, msg []byte) bool {
	// 判断rChannel 是否正常
	if !initAMQPChan() {
		return false
	}
	err := rChannel.Publish(
		exchange, // 交换机
		routingKey,
		false, // 如果没有对应的queue, 就会丢弃这条小心
		false, // 立即
		amqp.Publishing{ // 发布内容
			ContentType: "text/plain", // 明文格式  application/json .
			Body:        msg})
	if err == nil {
		return true
	}
	return false
}

func main() {
	for i := 0; i < 10; i++ {
		Publish(TransExchangeName, TransOSSRoutingKey, []byte(fmt.Sprintf("%d", i)))
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

# 2. consumer 消费者


package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

var done chan bool

const (
	// RabbitURL : rabbitmq服务的入口url
	RabbitURL = "amqp://admin:1234abcd@127.0.0.1:5672/"
	// TransExchangeName : 用于文件transfer的交换机
	TransExchangeName = "testExchange.testRouter"
	// TransOSSQueueName : oss转移队列名
	TransOSSQueueName = "testExchange.testRouter.testBinding"
	// TransOSSErrQueueName : oss转移失败后写入另一个队列的队列名
	TransOSSErrQueueName = "testExchange.testRouter.testBinding.err"
	// TransOSSRoutingKey : routingkey
	TransOSSRoutingKey = "testBinding"
)

var conn *amqp.Connection
var rChannel *amqp.Channel

// 如果异常关闭,会接收通知
var notifyClose chan *amqp.Error

func init() {
	if initAMQPChan() {
		rChannel.NotifyClose(notifyClose)
	}
	// 断线自动重连
	go func() {
		for {
			select {
			case msg := <-notifyClose:
				conn = nil
				rChannel = nil
				log.Printf("onNotifyChannelClosed: %+v\n", msg)
				initAMQPChan()
			}
		}
	}()
}

func initAMQPChan() bool {
	// 判断 rChannel 是否创建
	if rChannel != nil {
		return true
	}
	// 获取连接
	var err error
	conn, err = amqp.Dial(RabbitURL)
	if err != nil {
		log.Println(err.Error())
		return false
	}
	// 创建 rChannel
	rChannel, err = conn.Channel()
	if err != nil {
		log.Println(err.Error())
		return false
	}
	return true
}

//
// StartConsume 消费者
// @Description:
// @param qName: 队列名
// @param cName: 消费折者名
// @param callback: 回调函数
//
func StartConsume(qName, cName string, callback func(msg []byte) bool) {
	// 获取一个信道
	msgs, err := rChannel.Consume(
		qName,
		cName,
		true,  // 自动应答
		false, // 非唯一的消费者, 如果为true,会根据竞争机制派发消息
		false, // rabbitMQ只能设置为false
		false, // noWait, false表示会阻塞直到有消息过来
		nil,
	)
	if err != nil {
		log.Fatal(err)
		return
	}
	done = make(chan bool)
	go func() {
		// 循环读取channel的数据
		for d := range msgs {
			processErr := callback(d.Body)
			if processErr {
				// TODO: 将任务写入错误队列,待后续处理,或者用户异常重试
			}
		}
	}()

	// 接收done的信号, 没有信息过来则会一直阻塞,避免该函数退出
	<-done
	// 关闭通道
	rChannel.Close()
}

// StopConsume : 停止监听队列
func StopConsume() {
	done <- true
}

// MsgHandler 处理消息的逻辑
func MsgHandler(msg []byte) bool {
	fmt.Println(string(msg))
	return true
}

func main() {
	StartConsume(TransOSSQueueName, "consumer00", MsgHandler)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#消息队列#
上次更新: 2023/04/16, 18:35:33
rabbitMQ基础
可靠消息队列-rocketMQ

← rabbitMQ基础 可靠消息队列-rocketMQ→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式