Springboot整合RocketMQ
Trouble Shooting
多网卡下为 rocketmq 指定 IP
1# 修改配置文件
2vim /test/rocketmq/distribution/target/apache-rocketmq/conf/broker.conf
3brokerIP1=192.168.31.220 #新增内容
4
5# 启动Broker命令命令
6nohup sh /test/rocketmq/distribution/target/apache-rocketmq/bin/mqbroker -n 192.168.31.220:9876 -c /test/rocketmq/distribution/target/apache-rocketmq/conf/broker.conf &
控制台查看不了数据,提示连接 10909 错误
1原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909
2解决:阿里云安全组需要增加一个端口 10909
消息发送
添加 Maven 相关依赖
1<dependency>
2
3 <groupId>org.apache.rocketmq</groupId>
4 <artifactId>rocketmq-client</artifactId>
5 <version>4.4.0</version>
6</dependency>
Producer
1import org.apache.rocketmq.client.exception.MQClientException;
2import org.apache.rocketmq.client.producer.DefaultMQProducer;
3import org.springframework.stereotype.Component;
4
5@Component
6public class PayProducer {
7
8 //定义ProducerGroup
9 private String producerGroup = "pay_group";
10
11 //NameServer地址
12 private String nameServerAddr = "192.168.31.220:9876";
13
14 //定义Producer
15 private DefaultMQProducer producer;
16
17 public PayProducer(){
18 //初始化Producer
19 producer = new DefaultMQProducer(producerGroup);
20
21 //指定NameServer地址,多个地址以 ; 隔开
22 //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
23 producer.setNamesrvAddr(nameServerAddr);
24 start();
25 }
26
27 public DefaultMQProducer getProducer(){
28 return this.producer;
29 }
30
31 /**
32 * 对象在使用之前必须要调用一次,只能初始化一次
33 */
34 public void start(){
35 try {
36 this.producer.start();
37 } catch (MQClientException e) {
38 e.printStackTrace();
39 }
40 }
41
42 /**
43 * 一般在应用上下文,使用上下文监听器,进行关闭
44 */
45 public void shutdown(){
46 this.producer.shutdown();
47 }
48
49}
Controller 使用 PayProducer 生产消息
发送消息到 Broker,需要判断是否有此 topic。
本地环境建议开启自动创建 topic,生产环境建议关闭自动化创建 topic
建议先手工创建 Topic。
因为如果靠程序自动创建,然后再投递消息,会出现延迟情况
。
概念模型: 一个 topic 下面对应多个 queue,可以在创建 Topic 时指定,如订单类 topic
1import org.apache.rocketmq.client.exception.MQBrokerException;
2import org.apache.rocketmq.client.exception.MQClientException;
3import org.apache.rocketmq.client.producer.SendResult;
4import org.apache.rocketmq.common.message.Message;
5import org.apache.rocketmq.remoting.exception.RemotingException;
6import org.springframework.beans.factory.annotation.Autowired;
7import org.springframework.web.bind.annotation.RequestMapping;
8import org.springframework.web.bind.annotation.RestController;
9import java.util.HashMap;
10
11@RestController
12public class PayController {
13
14 @Autowired
15 //注入Producer
16 private PayProducer payProducer;
17
18 //定义Topic主题
19 private static final String topic = "soulboy_pay_test_topic2";
20
21 @RequestMapping("/api/v1/pay_cb")
22 public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
23
24 /**
25 * topic: 主题名称
26 * tag: 标签,用于过滤,二级分类。
27 * key: 消息唯一标示,可以是业务字段组合
28 * body: 消息体,字节数组
29 */
30 Message message = new Message(topic,"taga", ("hello xdclass rocketmq = "+text).getBytes() );
31
32 //拿到Producer之后传入Message进行消息的发送
33 SendResult sendResult = payProducer.getProducer().send(message);
34
35 //打印发送结果
36 System.out.println(sendResult);
37
38 return new HashMap<>();
39 }
40
41}
浏览器访问
1http://192.168.31.230:8080/api/v1/pay_cb?text=666
2{
3
4}
控制台输出
1SendResult [sendStatus=SEND_OK, msgId=A9FEC0262F4418B4AAC2454B1CD70000, offsetMsgId=C0A81FDC00002A9F0000000000057D64, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=0]
消息消费
JmsConfig 配置类(抽取 NameServer 和 Topic)
1public class JmsConfig {
2
3 public static final String NAME_SERVER = "192.168.31.220:9876";
4
5 public static final String TOPIC = "soulboy_pay_test_topic2";
6}
PayConsumer 消费者
控制台输出
1consumer start ...
2ConsumeMessageThread_1 Receive New Messages: hello xdclass rocketmq = 666
3topic=soulboy_pay_test_topic2, tags=taga, keys=null, msg=hello xdclass rocketmq = 666