目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

RocketMQ消费者核心概念

常见核心配置

  • consumeFromWhere 配置 参考
CONSUME_FROM_FIRST_OFFSET: 初次从消息队列头部开始消费,即历史消息(还储存在broker的)全部消费一遍,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_LAST_OFFSET: 默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP : 从某个时间点开始消费,默认是半个小时以前,后续再启动接着上次消费的进度开始消费
  • allocateMessageQueueStrategy:负载均衡策略算法,即消费者分配到 queue 的算法,默认值是 AllocateMessageQueueAveragely 即取模平均分配。
  • offsetStore:消息消费进度存储器
offsetStore 有两个策略:
* LocalFileOffsetStore   : Consumer记录消息的消费进度。
* RemoteBrokerOffsetStor : Broker记录消息的消费进度。
广播模式默认使用LocalFileOffsetStore 集群模式默认使用RemoteBrokerOffsetStore
  • consumeThreadMin 最小消费线程池数量
  • consumeThreadMax 最大消费线程池数量
  • pullBatchSize: 消费者去 broker 拉取消息时,一次拉取多少条。可选配置
  • consumeMessageBatchMaxSize: 单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置
  • messageModel : 消费者消费模式, CLUSTERING——默认是集群模式 CLUSTERING BROADCASTING——广播模式

集群和广播模式下 RocketMQ 消费端处理

  • Topic 下队列的奇偶数会影响 Customer 个数里面的消费数量
* 如果是4个队列,8个消息,4个节点则会各消费2条,如果不对等,则负载均衡会分配不均,
* 如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量
  • 集群模式(默认):
* Consumer实例平均分摊消费生产者发送的消息
* 例子:订单消息,一般是只被消费一次
  • 广播模式:
* 广播模式下消费消息:投递到Broker的消息会被每个Consumer进行消费,一条消息被多个Consumer消费,广播消费中ConsumerGroup暂时无用
* 例子:群公告,每个人都需要消费这个消息
  • 怎么切换模式:通过 setMessageModel()

消费者模式切换示例

//默认是集群模式,可以更改为广播模式,但是广播模式不支持重试
        //consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setMessageModel(MessageModel.CLUSTERING);

标签 Tag、消息过滤

  • 一个 Message 只有一个 Tag,tag 是二级分类
*   订单:数码类订单、食品类订单
  • 过滤分为 Broker 端和 Consumer 端过滤
*   Broker端过滤,减少了无用的消息的进行网络传输,增加了broker的负担
*   Consumer端过滤,完全可以根据业务需求进行实习,但是增加了很多无用的消息传输
  • 一般是监听 * ,或者指定 tag,|| 运算 , SLQ92 , FilterServer 等;
* tag性能高,逻辑简单
* SQL92 性能差点,支持复杂逻辑(只支持PushConsumer中使用) MessageSelector.bySql
* 语法:> , < = ,IS NULL, AND, OR, NOT 等,sql where后续的语法即可(大部分)
  • 注意:消费者订阅关系要一致,不然会消费混乱,甚至消息丢失
* 订阅关系一致:订阅关系由 Topic和 Tag 组成,同一个 group name,订阅的 topic和tag 必须是一样的
  • 在 Broker 端进行 MessageTag 过滤,遍历 message queue 存储的 message tag 和 订阅传递的 tag 的 hashcode 不一样则跳过,符合的则传输给 Consumer,在 consumer queue 存储的是对应的 hashcode, 对比也是通过 hashcode 对比;Consumer 收到过滤消息后也会进行匹配操作(为了防止 hash 碰撞,使用原生 Tag 比对),但是是对比真实的 message tag 而不是 hashcode。
* Consume Queue存储使用hashcode定长,节约空间
* 使用hashcode过滤可以不访问commit log(日志文件,真正存储消息的文件),可以高效过滤
* 如果存在hash冲突,Consumer端可以进行再次确认(防止hash碰撞)

Consume Queue

  • 如果想使用多个 Tag,可以使用 SQL 表达式,但是不建议,推荐:单一职责,多个队列。

生产端

消费端示例

控制台输出

http://192.168.31.230:8080/api/v1/pay_cb?tag=order_pay
http://192.168.31.230:8080/api/v1/pay_cb?tag=xxx


发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026146418B4AAC254961CCE0000, offsetMsgId=C0A81FDC00002A9F000000000005B081, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=1], queueOffset=10] 
ConsumeMessageThread_1 Receive New Messages: order_pay 
发送结果=SEND_OK, sendResult=SendResult [sendStatus=SEND_OK, msgId=A9FEC026146418B4AAC2549633C10001, offsetMsgId=C0A81FDC00002A9F000000000005B13F, messageQueue=MessageQueue [topic=soulboy_pay_test_topic2, brokerName=broker-a, queueId=3], queueOffset=10]

开启 Broker 支持 SLQ92

The broker does not support consumer to filter message by SQL92

# 解决:broker.conf 里面配置如下
master节点配置:vim conf/2m-2s-async/broker-a.properties
slave节点配置:vim conf/2m-2s-async/broker-a-s.properties
enablePropertyFilter=true

#修改之后要重启Broker
jps 
kill -9
nohup ....

PushConsumer/PullConsumer 消费消息模式分析

Push 和 Pull 优缺点分析

  • Push:实时性高;但增加服务端负载,消费端能力不同,如果 Push 推送过快,消费端会出现很多问题。(消息丢失、阻塞等……)
  • Pull:消费者从 Server 端拉取消息,主动权在消费者端,可控性好;但 间隔时间不好设置,间隔太短,则空请求,浪费资源;间隔时间太长,则消息不能及时处理。
  • 长轮询: Consumer 请求 Broker 的时候,如果没有消息,Broker 会保持当前连接一段时间 默认是 15s,如果这段时间内有消息到达,则立刻返回给 Consumer。 如果没消息的并超过 15s,则返回空,客户端若干时间再进行重新请求;主动权在 Consumer 中,Broker 即使有大量的消息 也不会主动提送 Consumer。
缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控 否则会一堆连接。

PushConsumer 本质是长轮训

  • 系统收到消息后自动处理消息和 offset,如果有新的 Consumer 加入会自动做负载均衡,
  • 在 broker 端可以通过 longPollingEnable=true 来开启长轮询(1 秒)
* 消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
* 服务端代码:broker.longpolling
  • 虽然是 push,但是代码里面大量使用了 pull,是因为使用长轮训方式达到 Push 效果,既有 pull 有的,又有 Push 的实时性。(需要 Broker 端和 Consumer 端配合才能完成)
  • 优雅关闭:主要是释放资源和保存 Offset(不需要自己维护消息和 offset,由 broker 自动维护), 调用 shutdown()即可 ,参考 @PostConstruct、@PreDestroy

PullConsumer 需要自己维护 Offset

  • 官方例子路径:org.apache.rocketmq.example.simple.PullConsumer
  • 获取 MessageQueue 遍历:遍历 Topic 下面所有 Queue。
  • Consumer 需要自己维护 Offset,需用用户本地存储 Offset,存储内存、磁盘、数据库等
  • 处理不同状态的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4 种状态
  • 灵活性高可控性强,但是编码复杂度会高
  • 优雅关闭:主要是释放资源和保存 Offset,需用程序自己保存好 Offset,特别是异常处理的时候

作者:Soulboy