裸泳的猪

沾沾自喜其实最可悲

0%

消息队列_Rocketmq

主流消息队列选型

维度 Kafka RocketMQ RabbitMQ ActiveMQ
单机吞吐量 10万级 10万级 万级 万级
开发语言 Scala Java Erlang Java
高可用 分布式架构 分布式架构 主从架构 主从架构
性能 ms级 ms级 us级 ms级
功能 只支持主要的MQ功能 顺序消息、事务消息等功能完善 并发强、性能好、延时低 成熟的社区产品、文档丰富
  1. rocketmq主要为java开发,语言适配性好,
  2. 性能好,社区成熟。
  3. 支持顺序消息,事务消息。
  4. 支持分布式架构

ROCKETMQ特点

  1. 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  2. 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
  3. 支持拉(pull)和推(push)两种消息模式(其实push也是用轮询形式的pull实现)
  4. 单一队列百万消息的堆积能力
  5. 支持多种消息协议,如 JMS、MQTT 等
  6. 分布式高可用的部署架构,满足至少一次消息传递语义
  7. 提供 docker 镜像用于隔离测试和云集群部署
  8. 提供配置、指标和监控等功能丰富的 Dashboard

集群分布式原理

RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干Broker(RocketMQ进程)组成,它的架构原理是这样的:

  1. Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳
  2. Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息
  3. Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费

消息丢失问题

消息丢失分为3个点

  1. 生产者丢失,即消息没发送出去,也没有重试机制来确保发送

    解决方案:

    1. 同步发送(几乎不会用)
    2. 异步发送时采用异步有回调的方式 (可以再加个本地消息表,MQ回调通知消息发送结果,对应更新数据库MQ发送状态,JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试)
  2. mq丢失,即mq未及时刷盘,然后宕机了,导致内存中未处理的消息丢了

    RocketMQ分为同步刷盘和异步刷盘两种方式,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。(对性能有损耗,需要根据具体业务逻辑设置)

  3. 消费者丢失(消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。)

RocketMQ默认是需要消费者回复ack确认,而kafka需要手动开启配置关闭自动offset。

消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。(Kafka没有这些) 重发的机制可能需要业务代码支持幂等操作。

顺序消息问题

RocketMQ发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)。

而消费端消费的时候,是会分配到多个queue的,多个queue是同时拉取提交消费。

但是同一条queue里面,RocketMQ的确是能保证FIFO的。那么要做到顺序消息,应该怎么实现呢——把消息确保投递到同一条queue。

可以在生产者投递时指定MessageQueueSelector,比如使用取模运算,让相同模的投递到同一条queue。

接下来就要保证消费是顺序的就可以了,可以使用MessageListenerOrderly

rocketmq的顺序消息需要满足2点:
1.Producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。

如何保证消息不被重复消费

分析:这个问题其实换一种问法就是,如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题。换句话来说,是在考察你的设计能力,这个问题的回答可以根据具体的业务场景来答,没有固定的答案。

回答:先来说一下为什么会造成重复消费?
其实无论是哪种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。RocketMQ返回一个CONSUME_SUCCESS成功标志,简单说一下,就是每一个消息都有一个offset,让消息队列知道自己已经消费过了。

那造成重复消费的原因?,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。

如何解决?这个问题针对业务场景来答,分以下三种情况:

(1)比如,你拿到这个消息做数据库的insert操作,那就容易了,给这个消息做一个唯一的主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

(2)再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

(3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis.那消费者开始消费前,先去redis中查询有没有消费记录即可。

Master和Slave之间是怎么同步数据的呢?

而消息在master和slave之间的同步是根据raft协议来进行的:

  1. 在broker收到消息后,会被标记为uncommitted状态
  2. 然后会把消息发送给所有的slave
  3. slave在收到消息之后返回ack响应给master
  4. master在收到超过半数的ack之后,把消息标记为committed
  5. 发送committed消息给所有slave,slave也修改状态为committed
-------------本文结束感谢您的阅读-------------