目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

分布式事务消息

分布式事务

分布式事务
 单体应用被拆为分布式应用,一个借口需要调用多个服务,且操作不同的数据库,数据一致性难以保障。

常见解决方案

  • 2PC : 两阶段提交, 基于 XA 协议
  • TCC : Try、Confirm、Cancel

框架

RocketMQ4.X 分布式事务消息架构

RocketMQ 事务消息
 RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。

半消息 Half Message
 暂不能投递的消息(暂不能消费),Producer 已经将消息成功发送到了 Broker 端,但是Broker 端未收到 Produce 对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息

消息回查
 由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

整体交互流程

分布式事务消息交互流程

  • Producer 向 broker 端发送消息。
  • 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  • 发送方开始执行本地事务逻辑。
  • 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息
  • 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
  • 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
  • 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作

RocketMQ 事务消息的状态

  • COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
  • ROLLBACK_MESSAGE:回滚事务消息,消息会在 broker 中删除,消费者不能消费
  • UNKNOW:Broker 需要回查确认消息的状态

关于事务消息的消费

 事务消息 consumer 端的消费方式和普通消息是一样的,RocketMQ 能保证消息能被 consumer 收到(消息重试等机制,最后也存在 consumer 消费失败的情况,这种情况出现的概率极低)。

RocketMQ 分布式事务消息演练

TransactionProducer 发送器

@Component
public class TransactionProducer {
    //定义ProducerGroup:不能和普通的Producer公用同一个组,必须唯一。
    private String producerGroup = "pay_group";

    //事务监听器 ,执行本地事务(监听本地事务)
    private TransactionListener transactionListener = new TransactionListenerImpl();

    //NameServer地址
    private String nameServerAddr = "192.168.31.220:9876";

    //定义TransactionProducer
    private TransactionMQProducer producer = null;

    //一般自定义线程池的时候,需要给线程加个名称
    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });

    public TransactionProducer(){
        //初始化TransactionProducer
        producer = new TransactionMQProducer(producerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
        producer.setNamesrvAddr(nameServerAddr);

        //设置事务监听器
        producer.setTransactionListener(transactionListener);

        //设置线程池
        producer.setExecutorService(executorService);

        start();
    }

    public TransactionMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }

}

class TransactionListenerImpl implements TransactionListener{

    @Override
    //执行本地事务:发送半消息成功之后执行本地事务
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {

        System.out.println("=============executeLocalTransaction=============");
        String body = new String(message.getBody());
        String key = message.getKeys();
        String transactionId = message.getTransactionId();
        System.out.println("transactionId="+transactionId+",key="+key+",body="+body);
        //执行本地事务begin  TODO

        //执行本地事务end TODO

        int status = Integer.parseInt(arg.toString());
        //status等于1 表示事务执行成功,返回确认半消息commit 提交事务消息,消费者可以消费此消息
        if(status == 1){
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        // 回滚事务消息,消息会在broker中删除,消费者不能消费
        if(status == 2){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // Broker需要回查确认消息的状态:不响应
	//checkLocalTransaction(MessageExt message)
        if(status == 3){
            return LocalTransactionState.UNKNOW;
        }
        return null;
    }

    @Override
    //回查消息,要么commit要么rollback,reconsumeTimes不生效
    public LocalTransactionState checkLocalTransaction(MessageExt message) {
        System.out.println("=============checkLocalTransaction=============");
        String body = new String(message.getBody());
	//比如key是订单id号,可以去数据库中查询此订单的状态,根据数据库查询结果决定最终返回commit或是rollback。
        String key = message.getKeys();
        String transactionId = message.getTransactionId();
        System.out.println("transactionId="+transactionId+",key="+key+",body="+body);

        //要么commit 要么rollback

        //可以根据key去检查本地事务消息是否完成

        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

PayController 在事务中发送消息

控制台输出

# http://192.168.31.230:8080/api/v1/pay_cb?tag=soulboy&otherParam=1
=============executeLocalTransaction=============
transactionId=A9FEC026399018B4AAC26012E4BA0000,key=soulboy_key,body=soulboy
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026399018B4AAC26012E4BA0000, offsetMsgId=null, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=0] 
ConsumeMessageThread_1 Receive New Messages: soulboy 

# http://192.168.31.230:8080/api/v1/pay_cb?tag=soulboy2&otherParam=3
=============executeLocalTransaction=============
transactionId=A9FEC026399018B4AAC26015B81C0001,key=soulboy2_key,body=soulboy2
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026399018B4AAC26015B81C0001, offsetMsgId=null, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=1], queueOffset=1] 

=============checkLocalTransaction=============
transactionId=A9FEC026399018B4AAC26015B81C0001,key=soulboy2_key,body=soulboy2
ConsumeMessageThread_2 Receive New Messages: soulboy2

作者:Soulboy