RocketMQ生产者核心概念
常见核心配置
VIM /test/rocketmq/distribution/target/apache-rocketmq/conf/2m-2s-sync/broker-a.properties
- compressMsgBodyOverHowmuch :消息超过默认字节 4096 后进行压缩
- retryTimesWhenSendFailed : 失败重发次数
- maxMessageSize : 最大消息配置,默认 128k
- topicQueueNums : 主题下面的队列数量,默认是 4
- autoCreateTopicEnable : 是否自动创建主题 Topic, 开发建议为 true,生产要为 false
- defaultTopicQueueNums : 自动创建服务器不存在的 topic,默认创建的队列数
- autoCreateSubscriptionGroup: 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭
- brokerClusterName : 集群名称
- brokerId : 0 表示 Master 主节点 大于 0 表示从节点
- brokerIP1 : Broker 服务地址
- brokerRole : broker 角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
- deleteWhen : 每天执行删除过期文件的时间,默认每天凌晨 4 点
- flushDiskType :刷盘策略, 默认为 ASYNC_FLUSH(异步刷盘), 另外是 SYNC_FLUSH(同步刷盘)
- listenPort : Broker 监听的端口号
- mapedFileSizeCommitLog : 单个 conmmitlog 文件大小,默认是 1GB
- mapedFileSizeConsumeQueue:ConsumeQueue 每个文件默认存 30W 条,可以根据项目调整
- storePathRootDir : 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store
- storePathCommitLog:commitlog 存储目录默认为 ${storePathRootDir}/commitlog
- storePathIndex: 消息索引存储路径
- syncFlushTimeout : 同步刷盘超时时间
- diskMaxUsedSpaceRatio : 检测可用的磁盘空间大小,超过后会写入报错
消息发送状态
消息发送有同步和异步。
Broker 消息投递状态
- FLUSH_DISK_TIMEOUT:没有在规定时间内完成刷盘 (刷盘策略需要为 SYNC_FLUSH 才会出这个错误)
- FLUSH_SLAVE_TIMEOUT:主从模式下,broker 是 SYNC_MASTER, 没有在规定时间内完成主从复制。
- SLAVE_NOT_AVAILABLE:从模式下,broker 是 SYNC_MASTER, 但是没有找到被配置成 Slave 的 Broker。
- SEND_OK:发送成功,没有发生上面的三种问题
生产和消费消息重试及处理
生产者 Producer 重试(异步和 SendOneWay 下配置无效)
- 消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是 2。如果网络情况比较差,或者跨集群则建改多几次。
Producer
//初始化Producer
producer = new DefaultMQProducer(producerGroup);
//Producer投递消息重试次数
producer.setRetryTimesWhenSendFailed(3)
Message
/**
* topic: 主题名称
* tag: 标签,用于过滤,二级分类。
* key: 消息唯一标示,可以是业务字段组合 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
* body: 消息体,字节数组
*/
Message message = new Message(topic,"taga","9527", ("hello xdclass rocketmq = "+text).getBytes() );
消费端 Consumer 重试
原因:消息处理异常、broker 端到 consumer 端各种问题,如网络原因闪断,消费处理失败,ACK 返回失败等等问题。
重试间隔时间配置 ,默认每条消息最多重试 16 次
超过重试次数人工补偿。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
消费端重试会导致重复消费,因此消费端需要去重(使用去重表)。基于一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息,
Consumer 端限定重试次数,如果超过指定重试次数,记录消息到数据库,并发短信通知,人工介入
控制台输出
重试次数0
消费异常
重试次数1
消费异常
重试次数2
消费异常
重试次数大于2,记录数据库,发短信通知开发人员或运维人员
异步发送消息和回调
异步传输通常用于响应时间敏感的业务场景。
异步发送:不会重试,发送总次数等于 1
OneWay 发送消息
SYNC
应用场景:重要通知邮件、报名短信通知、营销短信系统等。
ASYNC
应用场景:对 RT 时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功后通知积分系统发放优惠券。
ONEWAY
无需要等待响应。
- 官方文档:https://rocketmq.apache.org/docs/simple-example/
- 使用场景:主要是日志收集,适用于某些耗时非常短,但对可靠性要求并不高的场景, 也就是 LogServer, LogServer 配合 MQ 的 OneWay 方式应对高峰。只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() );
//采用Oneway模式发送消息
payProducer.getProducer().sendOneway(message);
return new HashMap<>();
}
}
汇总对比
发送方式 | 发送 TPS | 发送结果反馈 | 可靠性 |
---|---|---|---|
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向发送 | 最快 | 有 | 可能丢失 |
延迟消息
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息,目前支持固定精度的消息。
定时消息:目前 rocketmq 开源版本还不支持,商业版本则有,两者使用场景类似。
使用场景
- 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
- 消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略
延迟的时间级别
代码:rocketmq-store > MessageStoreConfig.java 属性 messageDelayLevel
使用 message.setDelayTimeLevel(xxx) //xxx 是级别,1 表示配置里面的第一个级别,2 表示第二个级别
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
延迟消息代码示例
Message message = new Message(topic,"taga","9527", ("hello xdclass rocketmq = "+text).getBytes() );
//设置延迟消息级别: 第一个级别就是1,从1开始,不是从0开始。
message.setDelayTimeLevel(2);
生产者之 MessageQueueSelector
使用 MessageQueueSelector 投递到 Topic 下指定的 queue。
默认 topic 下的 queue 数量是 4,可以配置。
- 应用场景:顺序消息,分摊负载,这样如果消息特别多,只会堆积到一个 queue 中,不会影响其他的 queue。一个类目只占用一个 queue,这样可以避免因某一类目的某项产品负载大,从而导致所有 queue 堆积大量的消息。
- 可以根据订单号的 key 或者 id 根据一定规则运算(取模等)得出要投递的 queueId。
MessageQueueSelector 同步发送示例代码
MessageQueueSelector 异步发送示例代码
控制台输出
发送多条消息,发现所有消息的都是 queueId=0
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC026046818B4AAC250223AA40000, offsetMsgId=C0A81FDC00002A9F0000000000059504, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=0], queueOffset=1] 重试次数0
ConsumeMessageThread_1 Receive New Messages: hello xdclass rocketmq = aaa
topic=soulboy_pay_test_topic2, tags=taga, keys=9527, msg=hello xdclass rocketmq = aaa
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC026046818B4AAC250228C120001, offsetMsgId=C0A81FDC00002A9F00000000000595DA, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=0], queueOffset=2] 重试次数0
ConsumeMessageThread_2 Receive New Messages: hello xdclass rocketmq = bbb
topic=soulboy_pay_test_topic2, tags=taga, keys=9527, msg=hello xdclass rocketmq = bbb
顺序消息的应用场景
消息的生产和消费顺序一致。顺序消息的实现需要生产者和消费一起配合,譬如:在指定 Topic 的指定 queueID 下进行小的生产和消费。
- 顺序消息暂不支持广播模式。
- 顺序消息不支持异步发送方式,否则将无法严格保证顺序。
全局顺序
topic 下面全部消息都要有序(少用)。
- 性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景,并行度成为消息系统的瓶颈, 吞吐量不够。
- 在证券处理中,以人民币兑换美元为例子,在价格相同的情况下,先出价者优先处理,则可以通过全局顺序的方式按照 FIFO 的方式进行发布和消费
局部顺序
只要保证一组消息被顺序消费即可(RocketMQ 使用)。
- 性能要求高
- 电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费。(阿里巴巴集团内部电商系统均使用局部顺序消息,既保证业务的顺序,同时又能保证业务的高性能)
顺序发布
对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息。
顺序消费
对于指定的一个 Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客户端接收到。
顺序消息的使用
生产端
生产端保证发送消息有序,**且发送到同一个 Topic 的同个 queue 里面,**RocketMQ 的确是能保证 FIFO 的。例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,根据 MessageQueueSelector 里面自定义策略,根据同个业务 id 放置到同个 queue 里面,如订单号取模运算再放到 selector 中,同一个模的值都会投递到同一条 queue。
POJO 订单类
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ProductOrder implements Serializable {
//订单id
private long orderID;
//订单类型
private String type;
public long getOrderID() {
return orderID;
}
public void setOrderID(long orderID) {
this.orderID = orderID;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public ProductOrder() {}
public ProductOrder(long orderID, String type){
this.orderID = orderID;
this.type = type;
}
public static List<ProductOrder> getOrderList(){
ArrayList<ProductOrder> list = new ArrayList<>();
list.add(new ProductOrder(111L, "创建订单"));
list.add(new ProductOrder(222L, "创建订单"));
list.add(new ProductOrder(111L, "支付订单"));
list.add(new ProductOrder(222L, "支付订单"));
list.add(new ProductOrder(111L, "完成订单"));
list.add(new ProductOrder(333L, "创建订单"));
list.add(new ProductOrder(222L, "完成订单"));
list.add(new ProductOrder(333L, "支付订单"));
list.add(new ProductOrder(333L, "完成订单"));
return list;
}
@Override
public String toString() {
return "ProductOrder{" +
"orderID=" + orderID +
", type='" + type + '\'' +
'}';
}
}
Producer 顺序消息示例代码
控制台输出
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC0263AE818B4AAC250A6DA950000, offsetMsgId=C0A81FDC00002A9F000000000005A8BC, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=3], queueOffset=7], orderid=111, type=创建订单
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC0263AE818B4AAC250A6DA9F0001, offsetMsgId=C0A81FDC00002A9F000000000005A999, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=8], orderid=222, type=创建订单
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC0263AE818B4AAC250A6DAA20002, offsetMsgId=C0A81FDC00002A9F000000000005AA76, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=3], queueOffset=8], orderid=111, type=支付订单
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC0263AE818B4AAC250A6DAA40003, offsetMsgId=C0A81FDC00002A9F000000000005AB53, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=9], orderid=222, type=支付订单
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC0263AE818B4AAC250A6DAA80004, offsetMsgId=C0A81FDC00002A9F000000000005AC30, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=3], queueOffset=9], orderid=111, type=完成订单
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC0263AE818B4AAC250A6DAAB0005, offsetMsgId=C0A81FDC00002A9F000000000005AD0D, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=1], queueOffset=7], orderid=333, type=创建订单
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC0263AE818B4AAC250A6DAAE0006, offsetMsgId=C0A81FDC00002A9F000000000005ADEA, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=2], queueOffset=10], orderid=222, type=完成订单
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC0263AE818B4AAC250A6DAB30007, offsetMsgId=C0A81FDC00002A9F000000000005AEC7, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=1], queueOffset=8], orderid=333, type=支付订单
发送结果=SEND_OK, msg=SendResult [sendStatus=SEND_OK, msgId=A9FEC0263AE818B4AAC250A6DAB60008, offsetMsgId=C0A81FDC00002A9F000000000005AFA4, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=1], queueOffset=9], orderid=333, type=完成订单
消费端
消费端要在保证消费同个 topic 里的同个队列,不应该 MessageListenerConcurrently。
应该使用 MessageListenerOrderly,自带单线程消费消息,不能再 Consumer 端再使用多线程去消费,消费端分配到的 queue 数量是固定的,集群消费的时候会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。
- Consumer 会平均分配 queue 的数量,queue 的数量要大于 Consumer。
- 并不是简单禁止并发处理,而是为每个 Consumer Quene 加个锁,消费每个消息之前,需要获得这个消息所在的 Queue 的锁,这样同个时间,同个 Queue 的消息不被并发消费,但是不同 Queue 的消息可以并发处理。局部锁可以大幅度提高并发性能。 类似于 ConcurrentHashMap 的分段锁 Segment 的思想。
Consumer 顺序消息示例代码
控制台输出
ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderID=333, type='创建订单'}
ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderID=222, type='创建订单'}
ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderID=111, type='创建订单'}
ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderID=111, type='支付订单'}
ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderID=333, type='支付订单'}
ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderID=222, type='支付订单'}
ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderID=333, type='完成订单'}
ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderID=111, type='完成订单'}
ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderID=222, type='完成订单'}