Springboot整合RocketMQ
Trouble Shooting
多网卡下为 rocketmq 指定 IP
# 修改配置文件
vim /test/rocketmq/distribution/target/apache-rocketmq/conf/broker.conf
brokerIP1=192.168.31.220 #新增内容
# 启动Broker命令命令
nohup sh /test/rocketmq/distribution/target/apache-rocketmq/bin/mqbroker -n 192.168.31.220:9876 -c /test/rocketmq/distribution/target/apache-rocketmq/conf/broker.conf &
控制台查看不了数据,提示连接 10909 错误
原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909
解决:阿里云安全组需要增加一个端口 10909
消息发送
添加 Maven 相关依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
Producer
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;
@Component
public class PayProducer {
//定义ProducerGroup
private String producerGroup = "pay_group";
//NameServer地址
private String nameServerAddr = "192.168.31.220:9876";
//定义Producer
private DefaultMQProducer producer;
public PayProducer(){
//初始化Producer
producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多个地址以 ; 隔开
//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
producer.setNamesrvAddr(nameServerAddr);
start();
}
public DefaultMQProducer getProducer(){
return this.producer;
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown(){
this.producer.shutdown();
}
}
Controller 使用 PayProducer 生产消息
发送消息到 Broker,需要判断是否有此 topic。
本地环境建议开启自动创建 topic,生产环境建议关闭自动化创建 topic
建议先手工创建 Topic。
因为如果靠程序自动创建,然后再投递消息,会出现延迟情况
。
概念模型: 一个 topic 下面对应多个 queue,可以在创建 Topic 时指定,如订单类 topic
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
@RestController
public class PayController {
@Autowired
//注入Producer
private PayProducer payProducer;
//定义Topic主题
private static final String topic = "soulboy_pay_test_topic2";
@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
/**
* topic: 主题名称
* tag: 标签,用于过滤,二级分类。
* key: 消息唯一标示,可以是业务字段组合
* body: 消息体,字节数组
*/
Message message = new Message(topic,"taga", ("hello xdclass rocketmq = "+text).getBytes() );
//拿到Producer之后传入Message进行消息的发送
SendResult sendResult = payProducer.getProducer().send(message);
//打印发送结果
System.out.println(sendResult);
return new HashMap<>();
}
}
浏览器访问
http://192.168.31.230:8080/api/v1/pay_cb?text=666
{
}
控制台输出
SendResult [sendStatus=SEND_OK, msgId=A9FEC0262F4418B4AAC2454B1CD70000, offsetMsgId=C0A81FDC00002A9F0000000000057D64, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=0]
消息消费
JmsConfig 配置类(抽取 NameServer 和 Topic)
public class JmsConfig {
public static final String NAME_SERVER = "192.168.31.220:9876";
public static final String TOPIC = "soulboy_pay_test_topic2";
}
PayConsumer 消费者
控制台输出
consumer start ...
ConsumeMessageThread_1 Receive New Messages: hello xdclass rocketmq = 666
topic=soulboy_pay_test_topic2, tags=taga, keys=null, msg=hello xdclass rocketmq = 666