Skip to content

Commit 9eb542f

Browse files
authored
feat(rocketmq): add sendMessage process (doocs#110)
1 parent 49eeffe commit 9eb542f

File tree

2 files changed

+364
-0
lines changed

2 files changed

+364
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@
275275

276276
- [RocketMQ NameServer 与 Broker 的通信](docs/rocketmq/rocketmq-nameserver-broker.md)
277277
- [RocketMQ 生产者启动流程](docs/rocketmq/rocketmq-producer-start.md)
278+
- [RocketMQ 消息发送流程](docs/rocketmq/rocketmq-send-message.md)
278279

279280
## 番外篇(JDK 1.8)
280281

Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
# RocketMQ 消息发送流程
2+
3+
这里以同步发送为示例讲解:
4+
5+
入口:
6+
7+
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
8+
9+
消息发送 默认超时时间 3 秒
10+
11+
第一步:验证
12+
13+
主题的长度不能大于 127,消息的大小不能大于 4M
14+
15+
```java
16+
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
17+
if (null == msg) {
18+
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
19+
}
20+
// topic
21+
Validators.checkTopic(msg.getTopic());
22+
Validators.isNotAllowedSendTopic(msg.getTopic());
23+
24+
// body
25+
if (null == msg.getBody()) {
26+
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
27+
}
28+
29+
if (0 == msg.getBody().length) {
30+
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
31+
}
32+
33+
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
34+
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
35+
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
36+
}
37+
}
38+
```
39+
40+
第二步:查找路由信息
41+
42+
如果缓存中存在路由信息,并且队列信息不为空直接返回路由信息,如果缓存不存在,根据当前主题从 NameServer 中获取 路由信息,如果路由信息没有找到,根据默认主题查询路由信息,如果没有找到抛出异常
43+
44+
```java
45+
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
46+
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
47+
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
48+
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
49+
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
50+
topicPublishInfo = this.topicPublishInfoTable.get(topic);
51+
}
52+
53+
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
54+
return topicPublishInfo;
55+
} else {
56+
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
57+
topicPublishInfo = this.topicPublishInfoTable.get(topic);
58+
return topicPublishInfo;
59+
}
60+
}
61+
62+
```
63+
64+
从 NameServer 查询路由信息方法:
65+
66+
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
67+
68+
1、如果是默认的主题查询路由信息,返回成功,更新读队列和写队列的个数为默认的队列个数
69+
70+
```java
71+
if (isDefault && defaultMQProducer != null) {
72+
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
73+
clientConfig.getMqClientApiTimeout());
74+
if (topicRouteData != null) {
75+
for (QueueData data : topicRouteData.getQueueDatas()) {
76+
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
77+
data.setReadQueueNums(queueNums);
78+
data.setWriteQueueNums(queueNums);
79+
}
80+
}
81+
}
82+
```
83+
84+
2、返回路由信息之后,与本地缓存的路由信息比对,判断路由信息是否发生变化,如果发生变化更新 broker 地址缓存,更新`topicPublishInfoTable`,更新 topic 路由信息缓存`topicRouteTable`
85+
86+
```java
87+
if (changed) {
88+
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
89+
90+
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
91+
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
92+
}
93+
94+
// Update Pub info
95+
if (!producerTable.isEmpty()) {
96+
TopicPublishInfo publishInfo =topicRouteData2TopicPublishInfo(topic, topicRouteData);
97+
publishInfo.setHaveTopicRouterInfo(true);
98+
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
99+
while (it.hasNext()) {
100+
Entry<String, MQProducerInner> entry = it.next();
101+
MQProducerInner impl = entry.getValue();
102+
if (impl != null) {
103+
impl.updateTopicPublishInfo(topic, publishInfo);
104+
}
105+
}
106+
}
107+
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
108+
this.topicRouteTable.put(topic, cloneTopicRouteData);
109+
return true;
110+
}
111+
```
112+
113+
第三步:选择消息 队列
114+
115+
设置消息发送失败重试次数
116+
117+
`int timesTotal = communicationMode == CommunicationMode.*SYNC* ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;`
118+
119+
`MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);`
120+
121+
首先判断是否启用故障延迟机制 ,默认不启用,第一次查询 lastBrokerName 为空,`sendWhichQueue`自增然后对队列个数取模获取队列,如果消息发送失败,下一次`sendWhichQueue`仍然自增然后对队列个数取模,可以规避掉上次失败的 broker
122+
123+
```java
124+
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
125+
if (lastBrokerName == null) {
126+
return selectOneMessageQueue();
127+
} else {
128+
for (int i = 0; i < this.messageQueueList.size(); i++) {
129+
int index = this.sendWhichQueue.incrementAndGet();
130+
int pos = Math.abs(index) % this.messageQueueList.size();
131+
if (pos < 0)
132+
pos = 0;
133+
MessageQueue mq = this.messageQueueList.get(pos);
134+
if (!mq.getBrokerName().equals(lastBrokerName)) {
135+
return mq;
136+
}
137+
}
138+
return selectOneMessageQueue();
139+
}
140+
}
141+
```
142+
143+
如果启用故障延迟机制:
144+
145+
轮询获取队列 ,如果可用直接返回
146+
147+
```java
148+
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
149+
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
150+
if (pos < 0)
151+
pos = 0;
152+
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
153+
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
154+
return mq;
155+
}
156+
```
157+
158+
判断是否可用逻辑:先从要规避的 broker 集合`faultItemTable`中获取该 broker 是否存在,如果存在判断是否可用,可用的标准是当前时间的时间戳大于上次该 broker 失败的时间 + 规避的时间,如果该 broker 在规避的 broker 集合中不存在,直接返回可用
159+
160+
```java
161+
public boolean isAvailable(final String name) {
162+
final FaultItem faultItem = this.faultItemTable.get(name);
163+
if (faultItem != null) {
164+
return faultItem.isAvailable();
165+
}
166+
return true;
167+
}
168+
```
169+
170+
如果没有可用的 broker,尝试从 规避的 broker 集合中选择一个可用的 broker,如果选择的 broker 没有写队列,则从规避的 broker 列表中移除该 broker
171+
172+
```java
173+
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
174+
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
175+
if (writeQueueNums > 0) {
176+
final MessageQueue mq = tpInfo.selectOneMessageQueue();
177+
if (notBestBroker != null) {
178+
mq.setBrokerName(notBestBroker);
179+
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
180+
}
181+
return mq;
182+
} else {
183+
latencyFaultTolerance.remove(notBestBroker);
184+
}
185+
```
186+
187+
P.S. :
188+
189+
要规避的 broker 集合在同步发送的时候不会 更新,在异步发送的时候会更新
190+
191+
```java
192+
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
193+
if (this.sendLatencyFaultEnable) {
194+
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
195+
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
196+
}
197+
}
198+
```
199+
200+
主要更新消息发送故障的延迟时间`currentLatency`和故障规避的 开始时间`startTimestamp`
201+
202+
```java
203+
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
204+
FaultItem old = this.faultItemTable.get(name);
205+
if (null == old) {
206+
final FaultItem faultItem = new FaultItem(name);
207+
faultItem.setCurrentLatency(currentLatency);
208+
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
209+
210+
old = this.faultItemTable.putIfAbsent(name, faultItem);
211+
if (old != null) {
212+
old.setCurrentLatency(currentLatency);
213+
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
214+
}
215+
} else {
216+
old.setCurrentLatency(currentLatency);
217+
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
218+
}
219+
}
220+
```
221+
222+
总结:
223+
224+
不管开不开启故障延迟机制,都可以规避故障的 broker,只是开启故障延迟机制,会在一段时间内都不会访问到该 broker,而不开启只是下一次不会访问到该 broker
225+
226+
第四步:消息发送
227+
228+
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
229+
230+
1、为消息分配全局唯一 id
231+
232+
```java
233+
if (!(msg instanceof MessageBatch)) {
234+
MessageClientIDSetter.setUniqID(msg);
235+
}
236+
```
237+
238+
2、消息体大于 4k 启用压缩
239+
240+
```java
241+
boolean msgBodyCompressed = false;
242+
if (this.tryToCompressMessage(msg)) {
243+
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
244+
msgBodyCompressed = true;
245+
}
246+
```
247+
248+
3、如果是事务消息,设置消息类型为事务消息
249+
250+
```java
251+
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
252+
if (Boolean.parseBoolean(tranMsg)) {
253+
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
254+
}
255+
```
256+
257+
4、校验是否超时
258+
259+
```java
260+
long costTimeSync = System.currentTimeMillis() - beginStartTime;
261+
if (timeout < costTimeSync) {
262+
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
263+
}
264+
```
265+
266+
5、组装请求头
267+
268+
```java
269+
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
270+
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
271+
requestHeader.setTopic(msg.getTopic());
272+
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
273+
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
274+
requestHeader.setQueueId(mq.getQueueId());
275+
requestHeader.setSysFlag(sysFlag);
276+
requestHeader.setBornTimestamp(System.currentTimeMillis());
277+
requestHeader.setFlag(msg.getFlag());
278+
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
279+
requestHeader.setReconsumeTimes(0);
280+
requestHeader.setUnitMode(this.isUnitMode());
281+
requestHeader.setBatch(msg instanceof MessageBatch);
282+
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
283+
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
284+
if (reconsumeTimes != null) {
285+
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
286+
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
287+
}
288+
289+
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
290+
if (maxReconsumeTimes != null) {
291+
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
292+
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
293+
}
294+
}
295+
```
296+
297+
6、发送请求
298+
299+
```java
300+
caseSYNC:
301+
long costTimeSync = System.currentTimeMillis() - beginStartTime;
302+
if (timeout < costTimeSync) {
303+
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
304+
}
305+
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
306+
brokerAddr,
307+
mq.getBrokerName(),
308+
msg,
309+
requestHeader,
310+
timeout - costTimeSync,
311+
communicationMode,
312+
context,
313+
this);
314+
break;
315+
```
316+
317+
第五步:处理响应结果
318+
319+
1、处理状态码
320+
321+
```java
322+
switch (response.getCode()) {
323+
case ResponseCode.FLUSH_DISK_TIMEOUT: {
324+
sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
325+
break;
326+
}
327+
case ResponseCode.FLUSH_SLAVE_TIMEOUT: {
328+
sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
329+
break;
330+
}
331+
case ResponseCode.SLAVE_NOT_AVAILABLE: {
332+
sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
333+
break;
334+
}
335+
case ResponseCode.SUCCESS: {
336+
sendStatus = SendStatus.SEND_OK;
337+
break;
338+
}
339+
default: {
340+
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
341+
}
342+
}
343+
```
344+
345+
2、构造 SendResult
346+
347+
```java
348+
SendResult sendResult = new SendResult(sendStatus,
349+
uniqMsgId,
350+
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
351+
sendResult.setTransactionId(responseHeader.getTransactionId());
352+
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
353+
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
354+
if (regionId == null || regionId.isEmpty()) {
355+
regionId = MixAll.DEFAULT_TRACE_REGION_ID;
356+
}
357+
if (traceOn != null && traceOn.equals("false")) {
358+
sendResult.setTraceOn(false);
359+
} else {
360+
sendResult.setTraceOn(true);
361+
}
362+
sendResult.setRegionId(regionId);
363+
```

0 commit comments

Comments
 (0)