可靠消息队列-rocketMQ
阿里提供云端的rocketmq商业版
# rocketMQ基础
# 1. 基本概念
- Producer:消息的发送者;举例:发信者
- Consumer:消息接收者;举例:收信者
- Broker:暂存和传输消息;举例:邮局
- NameServer:管理Broker;举例:各个邮局的管理机构
- Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
- Message Queue:相当于是Topic的分区;用于并行发送和接收消息
# 2. 消息类型
- 普通消息
- 同步发送
- 异步发送
- 单向发送
- 顺序消息
- 普通消息(订阅,无序)
- 普通消息是我们在业务开发中用到的最多的消息类型,生产者需要关注消息发送成功即可,消费者 消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合 大部分场景。
- 全局顺序消息
- 分区顺序消息
- 普通消息(订阅,无序)
- 延时消息
- 事务消息
# 2.1 同步发送
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
a. 同步发送,线程阻塞,投递 completes阻塞结束
b. 如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次
c. 投递 completes.不代表投递成功,要 check Send Result. sendstatus来判断是否投递成功
d. Sendresult里面有发送状态的枚举: Sendstatus,同步的消息投递有一个状态返回值的
e. retry的实现原理只有ack的 Sendstatus=SEND_OKオ会停I止 retry
注意事项:发送同步消息且Ack为SEND_OK,只代表该消息成功的写入了MQ当中,并不代表该消息成功的被Consumer消费了
- 应用场景:
- 这种可靠性同步地发送方式应用场景非常广泛,例如重要通知邮件、报名短信 (opens new window)通知、营销短信系统等。
# 2.2 异步发送
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。RocketMQ异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
a.异步调用的话,当前线程一定要等待异步线程回调结束再关闭 producer啊,因为是异步的,不会阻塞,提前关 闭 producer会导致未回调链接就断开了 b.异步消息不 retry,投递失败回调 onexception0方法,只有同步消息才会 retry
应用场景:
异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
# 2.3 单向发送
- 生产者消费者模型,理解为普通给的消息队列
# 2.4 全局顺序消息
- 概念:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景:适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
- 示例:在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。
# 2.5 分区顺序消息
- 概念:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
- 适用场景:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照FIFO原则进行消息发布和消费的场景。
- 示例:
- 用户注册需要发送发验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
- 电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
# 2.6 延时消息
概念:
- Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
适用场景:
- 消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
- 延迟的机制是在服务端实现的,也就是 Broker收到了消息,但是经过一段时间以后才发送,
- 服务器按照1-N定义了如下级别:“1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h”; 若 要发送定时消息,在应用层初始化 Message消息对象之后,调用 Message. setdelay Level( int level)方法来设置 延迟级别, 按照序列取相应的延迟级别, 例如level=2,则延迟为5s
- 实现原理 a. 发送消息的时候如果消息设置了 Delaytime Level,那么该消息会被去到 SchedulemessageService. SCHEDULE_TOPIC 这个 Topic 里面 b. 根据 DelayTimelevel选择对应的 queue c. 再把真实的 topic和 queue信息封装起来, set到msg里面 d.然后每个 SCHEDULE TOPIC_XXXX的每个 DelaytimeLevelqueue, 有定时任务去刷新, 是否有待投递的消息 e. 每10s定时持久化发送进度
# 2.7 事务消息
概念:
事务消息:消息队列RocketMQ提供类似X/Open XA的分布式事务功能,通过消息队列RocketMQ事务消息能达到分布式事务的最终一致。
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
分布式事务消息的优势:
- 消息队列RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
典型场景:
- 在电商购物车下单时,涉及到购物车系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物车系统只需要订阅消息队列RocketMQ的交易订单消息,做相应的业务处理,即可保证最终的数据一致性。
事务消息交互流程如下图所示:
事务消息发送步骤如下:
发送方将半事务消息发送至消息队列RocketMQ服务端。
消息队列RocketMQ服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤如下:
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
# python操作 RocketMQ
阿里提供云端的rocketmq商业版
阿里文档: https://help.aliyun.com/document_detail/29532.html
一库: https://github.com/apache/rocketmq-client-python
# 1. 发送普通消息
producer
from rocketmq.client import Producer, Message producer = Producer('PID-XXX') producer.set_name_server_address('127.0.0.1:9876') producer.start() msg = Message('TEST-EVAN-TOPIC') msg.set_keys('EVAN2') msg.set_tags('EVAN2') msg.set_body('{test:1231}}') msg.set_property("name","test") ret = producer.send_sync(msg) print(ret.status, ret.msg_id, ret.offset) producer.shutdown()
1
2
3
4
5
6
7
8
9
10
11
12
13
14consumer
import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS consumer = PushConsumer('CID_XXX') consumer.set_name_server_address('127.0.0.1:9876') consumer.subscribe('TEST-EVAN-TOPIC', callback) consumer.start() while True: time.sleep(3600) consumer.shutdown()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 2. 发送延迟消息
producer
from rocketmq.client import Producer, Message producer = Producer('PID-XXX') producer.set_name_server_address('127.0.0.1:9876') producer.start() msg = Message('TEST-EVAN-TOPIC') msg.set_keys('EVAN2') """ 1: "1s", 2: "5s", 3: "10s", 4: "30s", 5: "1m", 6: "2m", 7: "3m", 8: "4m", 9: "5m", 10: "6m", 11: "7m", 12: "8m",13: "9m", 14: "10m", 15: "20m", 16: "30m", 17: "1h", 18: "2h", """ # 不同时间对应的不同编号 msg.set_delay_time_level(1) msg.set_tags('EVAN22') msg.set_property("name", "test") msg.set_body('{test2:12314}}') ret = producer.send_sync(msg) print(ret.status, ret.msg_id, ret.offset) producer.shutdown()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 3.发送事务消息
producer
#!/usr/bin/env python # -*- coding: utf-8 -*- # @time : 2021/11/10 # @desc : ... import time from rocketmq.client import TransactionMQProducer, Message, TransactionStatus def check_callback(msg): # todo mac 电脑测试回查事件并未触发 """ roocketmq服务消息回查, 自己的本地逻辑 回查的回调函数只会在 local_execute() 返回UNKNOWN,或者没有返回的时候执行 :param msg: :return: """ # TransactionStatus.COMMIT,TransactionStatus.UNKNOWN,TransactionStatus.ROLLBACK print("服务消息回查:", msg.body.decode(), ) return TransactionStatus.COMMIT def local_execute(msg, user_args): """ 发送消息的时候,执行的回调; 执行自己的本地逻辑 :param msg: :param user_args: :return: """ # TransactionStatus.COMMIT,TransactionStatus.UNKNOWN,TransactionStatus.ROLLBACK print("发送消息回调,执行本地逻辑:", msg.body.decode(), ) return TransactionStatus.UNKNOWN producer = TransactionMQProducer('PID-XXX', checker_callback=check_callback) producer.set_name_server_address('127.0.0.1:9876') producer.start() msg = Message('TEST-EVAN-TOPIC') msg.set_keys('EVAN2') msg.set_tags('EVAN22') msg.set_property("name", "test") msg.set_body('-事务消息--') ret = producer.send_message_in_transaction(msg, local_execute, None) print(ret.status, ret.msg_id, ret.offset) print("事务消息发送成功") while True: time.sleep(3600) producer.shutdown()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# go 操作 rocketMQ
https://help.aliyun.com/document_detail/255805.html