目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

分布式事务消息

分布式事务

分布式事务
 单体应用被拆为分布式应用,一个借口需要调用多个服务,且操作不同的数据库,数据一致性难以保障。

常见解决方案

  • 2PC : 两阶段提交, 基于 XA 协议
  • TCC : Try、Confirm、Cancel

框架

RocketMQ4.X 分布式事务消息架构

RocketMQ 事务消息
 RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。

半消息 Half Message
 暂不能投递的消息(暂不能消费),Producer 已经将消息成功发送到了 Broker 端,但是 Broker 端未收到 Produce 对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息

消息回查
 由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

整体交互流程

分布式事务消息交互流程

  • Producer 向 broker 端发送消息。
  • 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  • 发送方开始执行本地事务逻辑。
  • 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息
  • 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
  • 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
  • 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作

RocketMQ 事务消息的状态

  • COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
  • ROLLBACK_MESSAGE:回滚事务消息,消息会在 broker 中删除,消费者不能消费
  • UNKNOW:Broker 需要回查确认消息的状态

关于事务消息的消费

 事务消息 consumer 端的消费方式和普通消息是一样的,RocketMQ 能保证消息能被 consumer 收到(消息重试等机制,最后也存在 consumer 消费失败的情况,这种情况出现的概率极低)。

RocketMQ 分布式事务消息演练

TransactionProducer 发送器

  1@Component
  2public class TransactionProducer {
  3    //定义ProducerGroup:不能和普通的Producer公用同一个组,必须唯一。
  4    private String producerGroup = "pay_group";
  5
  6    //事务监听器 ,执行本地事务(监听本地事务)
  7    private TransactionListener transactionListener = new TransactionListenerImpl();
  8
  9    //NameServer地址
 10    private String nameServerAddr = "192.168.31.220:9876";
 11
 12    //定义TransactionProducer
 13    private TransactionMQProducer producer = null;
 14
 15    //一般自定义线程池的时候,需要给线程加个名称
 16    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
 17            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
 18        @Override
 19        public Thread newThread(Runnable r) {
 20            Thread thread = new Thread(r);
 21            thread.setName("client-transaction-msg-check-thread");
 22            return thread;
 23        }
 24    });
 25
 26    public TransactionProducer(){
 27        //初始化TransactionProducer
 28        producer = new TransactionMQProducer(producerGroup);
 29
 30        //指定NameServer地址,多个地址以 ; 隔开
 31        //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
 32        producer.setNamesrvAddr(nameServerAddr);
 33
 34        //设置事务监听器
 35        producer.setTransactionListener(transactionListener);
 36
 37        //设置线程池
 38        producer.setExecutorService(executorService);
 39
 40        start();
 41    }
 42
 43    public TransactionMQProducer getProducer(){
 44        return this.producer;
 45    }
 46
 47    /**
 48     * 对象在使用之前必须要调用一次,只能初始化一次
 49     */
 50    public void start(){
 51        try {
 52            this.producer.start();
 53        } catch (MQClientException e) {
 54            e.printStackTrace();
 55        }
 56    }
 57
 58    /**
 59     * 一般在应用上下文,使用上下文监听器,进行关闭
 60     */
 61    public void shutdown(){
 62        this.producer.shutdown();
 63    }
 64
 65}
 66
 67class TransactionListenerImpl implements TransactionListener{
 68
 69    @Override
 70    //执行本地事务:发送半消息成功之后执行本地事务
 71    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
 72
 73        System.out.println("=============executeLocalTransaction=============");
 74        String body = new String(message.getBody());
 75        String key = message.getKeys();
 76        String transactionId = message.getTransactionId();
 77        System.out.println("transactionId="+transactionId+",key="+key+",body="+body);
 78        //执行本地事务begin  TODO
 79
 80        //执行本地事务end TODO
 81
 82        int status = Integer.parseInt(arg.toString());
 83        //status等于1 表示事务执行成功,返回确认半消息commit 提交事务消息,消费者可以消费此消息
 84        if(status == 1){
 85            return LocalTransactionState.COMMIT_MESSAGE;
 86        }
 87        // 回滚事务消息,消息会在broker中删除,消费者不能消费
 88        if(status == 2){
 89            return LocalTransactionState.ROLLBACK_MESSAGE;
 90        }
 91        // Broker需要回查确认消息的状态:不响应
 92	//checkLocalTransaction(MessageExt message)
 93        if(status == 3){
 94            return LocalTransactionState.UNKNOW;
 95        }
 96        return null;
 97    }
 98
 99    @Override
100    //回查消息,要么commit要么rollback,reconsumeTimes不生效
101    public LocalTransactionState checkLocalTransaction(MessageExt message) {
102        System.out.println("=============checkLocalTransaction=============");
103        String body = new String(message.getBody());
104	//比如key是订单id号,可以去数据库中查询此订单的状态,根据数据库查询结果决定最终返回commit或是rollback。
105        String key = message.getKeys();
106        String transactionId = message.getTransactionId();
107        System.out.println("transactionId="+transactionId+",key="+key+",body="+body);
108
109        //要么commit 要么rollback
110
111        //可以根据key去检查本地事务消息是否完成
112
113        return LocalTransactionState.COMMIT_MESSAGE;
114    }
115}

PayController 在事务中发送消息

控制台输出

 1# http://192.168.31.230:8080/api/v1/pay_cb?tag=soulboy&otherParam=1
 2=============executeLocalTransaction=============
 3transactionId=A9FEC026399018B4AAC26012E4BA0000,key=soulboy_key,body=soulboy
 4发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026399018B4AAC26012E4BA0000, offsetMsgId=null, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=0] 
 5ConsumeMessageThread_1 Receive New Messages: soulboy 
 6
 7# http://192.168.31.230:8080/api/v1/pay_cb?tag=soulboy2&otherParam=3
 8=============executeLocalTransaction=============
 9transactionId=A9FEC026399018B4AAC26015B81C0001,key=soulboy2_key,body=soulboy2
10发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026399018B4AAC26015B81C0001, offsetMsgId=null, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=1], queueOffset=1] 
11
12=============checkLocalTransaction=============
13transactionId=A9FEC026399018B4AAC26015B81C0001,key=soulboy2_key,body=soulboy2
14ConsumeMessageThread_2 Receive New Messages: soulboy2

作者:Soulboy