目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Springboot整合RocketMQ

Trouble Shooting

多网卡下为 rocketmq 指定 IP

# 修改配置文件
vim /test/rocketmq/distribution/target/apache-rocketmq/conf/broker.conf
brokerIP1=192.168.31.220	#新增内容

# 启动Broker命令命令
nohup sh /test/rocketmq/distribution/target/apache-rocketmq/bin/mqbroker -n 192.168.31.220:9876 -c /test/rocketmq/distribution/target/apache-rocketmq/conf/broker.conf &

控制台查看不了数据,提示连接 10909 错误

原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909
解决:阿里云安全组需要增加一个端口 10909

消息发送

添加 Maven 相关依赖

<dependency>

		<groupId>org.apache.rocketmq</groupId>
		<artifactId>rocketmq-client</artifactId>
		<version>4.4.0</version>
</dependency>

Producer

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Component
public class PayProducer {

    //定义ProducerGroup
    private String producerGroup = "pay_group";

    //NameServer地址
    private String nameServerAddr = "192.168.31.220:9876";

    //定义Producer
    private DefaultMQProducer producer;

    public  PayProducer(){
        //初始化Producer
        producer = new DefaultMQProducer(producerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
        producer.setNamesrvAddr(nameServerAddr);
        start();
    }

    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }

}

Controller 使用 PayProducer 生产消息
 发送消息到 Broker,需要判断是否有此 topic。
 本地环境建议开启自动创建 topic,生产环境建议关闭自动化创建 topic
建议先手工创建 Topic
 因为如果靠程序自动创建,然后再投递消息,会出现延迟情况

概念模型: 一个 topic 下面对应多个 queue,可以在创建 Topic 时指定,如订单类 topic

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;

@RestController
public class PayController {

    @Autowired
    //注入Producer
    private PayProducer payProducer;

    //定义Topic主题
    private static  final String topic = "soulboy_pay_test_topic2";

    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        /**
         *  topic: 主题名称
         *  tag: 标签,用于过滤,二级分类。
         *  key: 消息唯一标示,可以是业务字段组合
         *  body: 消息体,字节数组
         */
        Message message = new Message(topic,"taga", ("hello xdclass rocketmq = "+text).getBytes() );

        //拿到Producer之后传入Message进行消息的发送
        SendResult sendResult = payProducer.getProducer().send(message);

        //打印发送结果
        System.out.println(sendResult);

        return new HashMap<>();
    }

}

浏览器访问

http://192.168.31.230:8080/api/v1/pay_cb?text=666
{

}

控制台输出

SendResult [sendStatus=SEND_OK, msgId=A9FEC0262F4418B4AAC2454B1CD70000, offsetMsgId=C0A81FDC00002A9F0000000000057D64, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=0]

消息消费

JmsConfig 配置类(抽取 NameServer 和 Topic)

public class JmsConfig {

    public static final String NAME_SERVER = "192.168.31.220:9876";

    public static final String TOPIC = "soulboy_pay_test_topic2";
}

PayConsumer 消费者

控制台输出

consumer start ...
ConsumeMessageThread_1 Receive New Messages: hello xdclass rocketmq = 666 
topic=soulboy_pay_test_topic2, tags=taga, keys=null, msg=hello xdclass rocketmq = 666

作者:Soulboy