分布式事务消息
分布式事务
单体应用被拆为分布式应用,一个借口需要调用多个服务,且操作不同的数据库,数据一致性难以保障。
常见解决方案
- 2PC : 两阶段提交, 基于 XA 协议
- TCC : Try、Confirm、Cancel
框架
RocketMQ4.X 分布式事务消息架构
RocketMQ 事务消息
RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。
半消息 Half Message
暂不能投递的消息(暂不能消费),Producer 已经将消息成功发送到了 Broker 端,但是Broker 端未收到 Produce 对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
消息回查
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
整体交互流程
- Producer 向 broker 端发送消息。
- 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息
- 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作
RocketMQ 事务消息的状态
- COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
- ROLLBACK_MESSAGE:回滚事务消息,消息会在 broker 中删除,消费者不能消费
- UNKNOW:Broker 需要回查确认消息的状态
关于事务消息的消费
事务消息 consumer 端的消费方式和普通消息是一样的,RocketMQ 能保证消息能被 consumer 收到(消息重试等机制,最后也存在 consumer 消费失败的情况,这种情况出现的概率极低)。
RocketMQ 分布式事务消息演练
TransactionProducer 发送器
@Component
public class TransactionProducer {
//定义ProducerGroup:不能和普通的Producer公用同一个组,必须唯一。
private String producerGroup = "pay_group";
//事务监听器 ,执行本地事务(监听本地事务)
private TransactionListener transactionListener = new TransactionListenerImpl();
//NameServer地址
private String nameServerAddr = "192.168.31.220:9876";
//定义TransactionProducer
private TransactionMQProducer producer = null;
//一般自定义线程池的时候,需要给线程加个名称
private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
public TransactionProducer(){
//初始化TransactionProducer
producer = new TransactionMQProducer(producerGroup);
//指定NameServer地址,多个地址以 ; 隔开
//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
producer.setNamesrvAddr(nameServerAddr);
//设置事务监听器
producer.setTransactionListener(transactionListener);
//设置线程池
producer.setExecutorService(executorService);
start();
}
public TransactionMQProducer getProducer(){
return this.producer;
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown(){
this.producer.shutdown();
}
}
class TransactionListenerImpl implements TransactionListener{
@Override
//执行本地事务:发送半消息成功之后执行本地事务
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
System.out.println("=============executeLocalTransaction=============");
String body = new String(message.getBody());
String key = message.getKeys();
String transactionId = message.getTransactionId();
System.out.println("transactionId="+transactionId+",key="+key+",body="+body);
//执行本地事务begin TODO
//执行本地事务end TODO
int status = Integer.parseInt(arg.toString());
//status等于1 表示事务执行成功,返回确认半消息commit 提交事务消息,消费者可以消费此消息
if(status == 1){
return LocalTransactionState.COMMIT_MESSAGE;
}
// 回滚事务消息,消息会在broker中删除,消费者不能消费
if(status == 2){
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// Broker需要回查确认消息的状态:不响应
//checkLocalTransaction(MessageExt message)
if(status == 3){
return LocalTransactionState.UNKNOW;
}
return null;
}
@Override
//回查消息,要么commit要么rollback,reconsumeTimes不生效
public LocalTransactionState checkLocalTransaction(MessageExt message) {
System.out.println("=============checkLocalTransaction=============");
String body = new String(message.getBody());
//比如key是订单id号,可以去数据库中查询此订单的状态,根据数据库查询结果决定最终返回commit或是rollback。
String key = message.getKeys();
String transactionId = message.getTransactionId();
System.out.println("transactionId="+transactionId+",key="+key+",body="+body);
//要么commit 要么rollback
//可以根据key去检查本地事务消息是否完成
return LocalTransactionState.COMMIT_MESSAGE;
}
}
PayController 在事务中发送消息
控制台输出
# http://192.168.31.230:8080/api/v1/pay_cb?tag=soulboy&otherParam=1
=============executeLocalTransaction=============
transactionId=A9FEC026399018B4AAC26012E4BA0000,key=soulboy_key,body=soulboy
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026399018B4AAC26012E4BA0000, offsetMsgId=null, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=0]
ConsumeMessageThread_1 Receive New Messages: soulboy
# http://192.168.31.230:8080/api/v1/pay_cb?tag=soulboy2&otherParam=3
=============executeLocalTransaction=============
transactionId=A9FEC026399018B4AAC26015B81C0001,key=soulboy2_key,body=soulboy2
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026399018B4AAC26015B81C0001, offsetMsgId=null, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=1], queueOffset=1]
=============checkLocalTransaction=============
transactionId=A9FEC026399018B4AAC26015B81C0001,key=soulboy2_key,body=soulboy2
ConsumeMessageThread_2 Receive New Messages: soulboy2