RocketMq消息流转流程分析

概述

本文分析rocketmq发送消息的流程,从客户端怎么启动,到启动后怎么发,发到哪里,消费者又是怎么接收消息的对整个流程做一个简要分析。

消息流转流程

在说明流程之前先对几个概念做一个简要说明:

  • 生产者:消息的发送方,消息通过生产者发送
  • 消费者:消息的接收方
  • NameServer:负责rocketmq中broker的服务注册和发现
  • broker:消息发送和接收以及存储等等都是由broker处理

流程

流程图.jpg

  1. 当NameServer和Broker启动的时候,Broker会定期上报信息,NameServer会维护Broker信息以及及时删除离线的Broker,这个过程中不需要NameServer中的数据是强一致性的,原因就是最终生产者和消费者都只需要知道任意一个可用的MessageQueue就行,所以NameServer中多一个Broker少一个Broker并不会影响功能。
  2. 客户端(包括生产者和消费者)会从NameServer列表中去定期(默认30秒)获取到Broker信息保存到本地。同时客户端(包括生产者和消费者)会去和Broker保持心跳,如果Broker离线,那么本地会删除这个Broker。这也能保证发送消息的时候只要任意有一个Broker可以工作就行不需要服务器是强一致性的。
  3. 发送的时候会根据是否启用故障延迟机制(启用:选择延迟最小的;不启用:轮询的方式并且重试3次)来选择MessageQueue,因为前面本地保存了Broker的所有信息包括MessageQuque序号,所所以可以选择到MessageQueue。
  4. 发消息可以同步、异步、OneWay(只管发不管发送是否成功)或批量发送消息。注意发送的时候不会校验Broker是否可用这个是统一定时任务去处理的,发送的时候只会默认有3次重发的机会。这个设计可以让检查Broker和发送逻辑分开,整个逻辑会变简单。
  5. 不管是推和拉,本质上都是拉消息。用的长轮询的方式,“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。需要注意的是如果是广播就是由Consumer自己维护消息的Offset,如果是集群的方式就是由Broker维护offset。

为什么会有消息重复?

在一些极端情况下Rocketmq会有消息重复,这个是用Rocketmq一定要幂等消费的原因。从上面的流程可以看到,有很多种情况会导致消息重复,这里就简单列几个:

  • 如果消费者处理不好offset或者这中间系统出现问题会导致从之前的offset再拉信息处理(自己拉消息的时候最容易出现,如果不是很熟悉就会导致很多bug)
  • 如果Broker还没维护好offset但是消费者已经处理完了,这时候没有把信息反馈给Broker,Broker就会认为消息没处理就会导致重复
  • 发送的时候如果是异步或者OneWay的方式,如果Broker存储了一条消息,但是没有反馈给客户端,客户端就会重新发也会导致消息重复

为什么会有消息丢失?

Rocketmq是可以保证消息不丢失的,不管是同步还是异步都要等待结果,但是它一定会出现消息重复。即使是这样,但是还是有一些情况会导致消息丢失:

  • 如果consumerqueue文件被删掉(默认72小时后会删除没有修改的consumerqueue文件),rocketmq是不会管这个文件中是否有消息还没有被消费的会直接删掉。这个文件是mq在写入commitlog文件(消息都是保存到这个文件的)的时候会也给consumerqueue写一份(主要是提高消费者的查询消息的速度的可以简单理解成是commitlog文件的索引文件),消费者具体是从consumerqueue文件中查询消息然后去commitlog中取数据来给消费者的。
  • 使用OneWay方式发消息。当使用OneWay方式的时候,Broker不用存储好消息就直接返回,这时候如果Broker没有存储好消息就会导致这条消息丢失了。

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

Captcha Code