分布式事务消息
分布式事务
单体应用被拆为分布式应用,一个借口需要调用多个服务,且操作不同的数据库,数据一致性难以保障。
常见解决方案
- 2PC : 两阶段提交, 基于 XA 协议
- TCC : Try、Confirm、Cancel
框架
RocketMQ4.X 分布式事务消息架构
RocketMQ 事务消息
RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。
半消息 Half Message
暂不能投递的消息(暂不能消费),Producer 已经将消息成功发送到了 Broker 端,但是 Broker 端未收到 Produce 对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
消息回查
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
整体交互流程
- Producer 向 broker 端发送消息。
- 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息
- 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作
RocketMQ 事务消息的状态
- COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
- ROLLBACK_MESSAGE:回滚事务消息,消息会在 broker 中删除,消费者不能消费
- UNKNOW:Broker 需要回查确认消息的状态
关于事务消息的消费
事务消息 consumer 端的消费方式和普通消息是一样的,RocketMQ 能保证消息能被 consumer 收到(消息重试等机制,最后也存在 consumer 消费失败的情况,这种情况出现的概率极低)。
RocketMQ 分布式事务消息演练
TransactionProducer 发送器
1@Component
2public class TransactionProducer {
3 //定义ProducerGroup:不能和普通的Producer公用同一个组,必须唯一。
4 private String producerGroup = "pay_group";
5
6 //事务监听器 ,执行本地事务(监听本地事务)
7 private TransactionListener transactionListener = new TransactionListenerImpl();
8
9 //NameServer地址
10 private String nameServerAddr = "192.168.31.220:9876";
11
12 //定义TransactionProducer
13 private TransactionMQProducer producer = null;
14
15 //一般自定义线程池的时候,需要给线程加个名称
16 private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
17 new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
18 @Override
19 public Thread newThread(Runnable r) {
20 Thread thread = new Thread(r);
21 thread.setName("client-transaction-msg-check-thread");
22 return thread;
23 }
24 });
25
26 public TransactionProducer(){
27 //初始化TransactionProducer
28 producer = new TransactionMQProducer(producerGroup);
29
30 //指定NameServer地址,多个地址以 ; 隔开
31 //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
32 producer.setNamesrvAddr(nameServerAddr);
33
34 //设置事务监听器
35 producer.setTransactionListener(transactionListener);
36
37 //设置线程池
38 producer.setExecutorService(executorService);
39
40 start();
41 }
42
43 public TransactionMQProducer getProducer(){
44 return this.producer;
45 }
46
47 /**
48 * 对象在使用之前必须要调用一次,只能初始化一次
49 */
50 public void start(){
51 try {
52 this.producer.start();
53 } catch (MQClientException e) {
54 e.printStackTrace();
55 }
56 }
57
58 /**
59 * 一般在应用上下文,使用上下文监听器,进行关闭
60 */
61 public void shutdown(){
62 this.producer.shutdown();
63 }
64
65}
66
67class TransactionListenerImpl implements TransactionListener{
68
69 @Override
70 //执行本地事务:发送半消息成功之后执行本地事务
71 public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
72
73 System.out.println("=============executeLocalTransaction=============");
74 String body = new String(message.getBody());
75 String key = message.getKeys();
76 String transactionId = message.getTransactionId();
77 System.out.println("transactionId="+transactionId+",key="+key+",body="+body);
78 //执行本地事务begin TODO
79
80 //执行本地事务end TODO
81
82 int status = Integer.parseInt(arg.toString());
83 //status等于1 表示事务执行成功,返回确认半消息commit 提交事务消息,消费者可以消费此消息
84 if(status == 1){
85 return LocalTransactionState.COMMIT_MESSAGE;
86 }
87 // 回滚事务消息,消息会在broker中删除,消费者不能消费
88 if(status == 2){
89 return LocalTransactionState.ROLLBACK_MESSAGE;
90 }
91 // Broker需要回查确认消息的状态:不响应
92 //checkLocalTransaction(MessageExt message)
93 if(status == 3){
94 return LocalTransactionState.UNKNOW;
95 }
96 return null;
97 }
98
99 @Override
100 //回查消息,要么commit要么rollback,reconsumeTimes不生效
101 public LocalTransactionState checkLocalTransaction(MessageExt message) {
102 System.out.println("=============checkLocalTransaction=============");
103 String body = new String(message.getBody());
104 //比如key是订单id号,可以去数据库中查询此订单的状态,根据数据库查询结果决定最终返回commit或是rollback。
105 String key = message.getKeys();
106 String transactionId = message.getTransactionId();
107 System.out.println("transactionId="+transactionId+",key="+key+",body="+body);
108
109 //要么commit 要么rollback
110
111 //可以根据key去检查本地事务消息是否完成
112
113 return LocalTransactionState.COMMIT_MESSAGE;
114 }
115}
PayController 在事务中发送消息
控制台输出
1# http://192.168.31.230:8080/api/v1/pay_cb?tag=soulboy&otherParam=1
2=============executeLocalTransaction=============
3transactionId=A9FEC026399018B4AAC26012E4BA0000,key=soulboy_key,body=soulboy
4发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026399018B4AAC26012E4BA0000, offsetMsgId=null, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=0]
5ConsumeMessageThread_1 Receive New Messages: soulboy
6
7# http://192.168.31.230:8080/api/v1/pay_cb?tag=soulboy2&otherParam=3
8=============executeLocalTransaction=============
9transactionId=A9FEC026399018B4AAC26015B81C0001,key=soulboy2_key,body=soulboy2
10发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026399018B4AAC26015B81C0001, offsetMsgId=null, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=1], queueOffset=1]
11
12=============checkLocalTransaction=============
13transactionId=A9FEC026399018B4AAC26015B81C0001,key=soulboy2_key,body=soulboy2
14ConsumeMessageThread_2 Receive New Messages: soulboy2