Go操作rabbitmq
# docker安装 rabbitMQ
docker 安装
docker pull rabbitmq:management-alpine docker run -d --hostname my-rabbit --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=1234abcd -p15672:15672 -p5672:5672 rabbitmq:management-alpine
1
2web页面: 127.0.0.1:15672
创建direct模式的exchange: testExchange.testRouter
创建queue : testExchange.testRouter.testBinding
绑定queue和exchange
# Golang操作rabbitMq
go get github.com/streadway/amqp
# 1. productor 生产者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
const (
// RabbitURL : rabbitmq服务的入口url
RabbitURL = "amqp://admin:1234abcd@127.0.0.1:5672/"
// TransExchangeName : 用于文件transfer的交换机
TransExchangeName = "testExchange.testRouter"
// TransOSSQueueName : oss转移队列名
TransOSSQueueName = "testExchange.testRouter.testBinding"
// TransOSSErrQueueName : oss转移失败后写入另一个队列的队列名
TransOSSErrQueueName = "testExchange.testRouter.testBinding.err"
// TransOSSRoutingKey : routingkey
TransOSSRoutingKey = "testBinding"
)
var conn *amqp.Connection
var rChannel *amqp.Channel
// 如果异常关闭,会接收通知
var notifyClose chan *amqp.Error
func init() {
if initAMQPChan() {
rChannel.NotifyClose(notifyClose)
}
// 断线自动重连
go func() {
for {
select {
case msg := <-notifyClose:
conn = nil
rChannel = nil
log.Printf("onNotifyChannelClosed: %+v\n", msg)
initAMQPChan()
}
}
}()
}
func initAMQPChan() bool {
// 判断 rChannel 是否创建
if rChannel != nil {
return true
}
// 获取连接
var err error
conn, err = amqp.Dial(RabbitURL)
if err != nil {
log.Println(err.Error())
return false
}
// 创建 rChannel
rChannel, err = conn.Channel()
if err != nil {
log.Println(err.Error())
return false
}
return true
}
// Publish : 发布消息
func Publish(exchange, routingKey string, msg []byte) bool {
// 判断rChannel 是否正常
if !initAMQPChan() {
return false
}
err := rChannel.Publish(
exchange, // 交换机
routingKey,
false, // 如果没有对应的queue, 就会丢弃这条小心
false, // 立即
amqp.Publishing{ // 发布内容
ContentType: "text/plain", // 明文格式 application/json .
Body: msg})
if err == nil {
return true
}
return false
}
func main() {
for i := 0; i < 10; i++ {
Publish(TransExchangeName, TransOSSRoutingKey, []byte(fmt.Sprintf("%d", i)))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# 2. consumer 消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
var done chan bool
const (
// RabbitURL : rabbitmq服务的入口url
RabbitURL = "amqp://admin:1234abcd@127.0.0.1:5672/"
// TransExchangeName : 用于文件transfer的交换机
TransExchangeName = "testExchange.testRouter"
// TransOSSQueueName : oss转移队列名
TransOSSQueueName = "testExchange.testRouter.testBinding"
// TransOSSErrQueueName : oss转移失败后写入另一个队列的队列名
TransOSSErrQueueName = "testExchange.testRouter.testBinding.err"
// TransOSSRoutingKey : routingkey
TransOSSRoutingKey = "testBinding"
)
var conn *amqp.Connection
var rChannel *amqp.Channel
// 如果异常关闭,会接收通知
var notifyClose chan *amqp.Error
func init() {
if initAMQPChan() {
rChannel.NotifyClose(notifyClose)
}
// 断线自动重连
go func() {
for {
select {
case msg := <-notifyClose:
conn = nil
rChannel = nil
log.Printf("onNotifyChannelClosed: %+v\n", msg)
initAMQPChan()
}
}
}()
}
func initAMQPChan() bool {
// 判断 rChannel 是否创建
if rChannel != nil {
return true
}
// 获取连接
var err error
conn, err = amqp.Dial(RabbitURL)
if err != nil {
log.Println(err.Error())
return false
}
// 创建 rChannel
rChannel, err = conn.Channel()
if err != nil {
log.Println(err.Error())
return false
}
return true
}
//
// StartConsume 消费者
// @Description:
// @param qName: 队列名
// @param cName: 消费折者名
// @param callback: 回调函数
//
func StartConsume(qName, cName string, callback func(msg []byte) bool) {
// 获取一个信道
msgs, err := rChannel.Consume(
qName,
cName,
true, // 自动应答
false, // 非唯一的消费者, 如果为true,会根据竞争机制派发消息
false, // rabbitMQ只能设置为false
false, // noWait, false表示会阻塞直到有消息过来
nil,
)
if err != nil {
log.Fatal(err)
return
}
done = make(chan bool)
go func() {
// 循环读取channel的数据
for d := range msgs {
processErr := callback(d.Body)
if processErr {
// TODO: 将任务写入错误队列,待后续处理,或者用户异常重试
}
}
}()
// 接收done的信号, 没有信息过来则会一直阻塞,避免该函数退出
<-done
// 关闭通道
rChannel.Close()
}
// StopConsume : 停止监听队列
func StopConsume() {
done <- true
}
// MsgHandler 处理消息的逻辑
func MsgHandler(msg []byte) bool {
fmt.Println(string(msg))
return true
}
func main() {
StartConsume(TransOSSQueueName, "consumer00", MsgHandler)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
上次更新: 2023/04/16, 18:35:33