Skip to content

Commit d5cb67f

Browse files
maixiaohaizhangxu16
andauthored
[ISSUE apache#2165] Slave read enable not work sometimes When cluster deployed on DLedger mode (apache#2167)
* [Client] Fix slaveReadEnable=true not work sometimes When cluster deployed on DLedger mode * [Client] Add unit test for findBrokerAddressInSubscribe Co-authored-by: zhangxu16 <[email protected]>
1 parent c932941 commit d5cb67f

File tree

2 files changed

+44
-0
lines changed

2 files changed

+44
-0
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,6 +1043,11 @@ public FindBrokerResult findBrokerAddressInSubscribe(
10431043
slave = brokerId != MixAll.MASTER_ID;
10441044
found = brokerAddr != null;
10451045

1046+
if (!found && slave) {
1047+
brokerAddr = map.get(brokerId + 1);
1048+
found = brokerAddr != null;
1049+
}
1050+
10461051
if (!found && !onlyThisBroker) {
10471052
Entry<Long, String> entry = map.entrySet().iterator().next();
10481053
brokerAddr = entry.getValue();

client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
import java.util.ArrayList;
2020
import java.util.HashMap;
2121
import java.util.List;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ConcurrentMap;
2224
import org.apache.rocketmq.client.ClientConfig;
2325
import org.apache.rocketmq.client.admin.MQAdminExtInner;
2426
import org.apache.rocketmq.client.exception.MQBrokerException;
27+
import org.apache.rocketmq.client.impl.FindBrokerResult;
2528
import org.apache.rocketmq.client.impl.MQClientManager;
2629
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
2730
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
@@ -30,8 +33,10 @@
3033
import org.apache.rocketmq.common.protocol.route.QueueData;
3134
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
3235
import org.apache.rocketmq.remoting.exception.RemotingException;
36+
import org.junit.Before;
3337
import org.junit.Test;
3438
import org.junit.runner.RunWith;
39+
import org.mockito.internal.util.reflection.FieldSetter;
3540
import org.mockito.junit.MockitoJUnitRunner;
3641

3742
import static org.assertj.core.api.Assertions.assertThat;
@@ -42,6 +47,12 @@ public class MQClientInstanceTest {
4247
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
4348
private String topic = "FooBar";
4449
private String group = "FooBarGroup";
50+
private ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
51+
52+
@Before
53+
public void init() throws Exception {
54+
FieldSetter.setField(mqClientInstance, MQClientInstance.class.getDeclaredField("brokerAddrTable"), brokerAddrTable);
55+
}
4556

4657
@Test
4758
public void testTopicRouteData2TopicPublishInfo() {
@@ -74,6 +85,34 @@ public void testTopicRouteData2TopicPublishInfo() {
7485
assertThat(topicPublishInfo.getMessageQueueList().size()).isEqualTo(4);
7586
}
7687

88+
@Test
89+
public void testFindBrokerAddressInSubscribe() {
90+
// dledger normal case
91+
String brokerName = "BrokerA";
92+
HashMap<Long, String> addrMap = new HashMap<Long, String>();
93+
addrMap.put(0L, "127.0.0.1:10911");
94+
addrMap.put(1L, "127.0.0.1:10912");
95+
addrMap.put(2L, "127.0.0.1:10913");
96+
brokerAddrTable.put(brokerName, addrMap);
97+
long brokerId = 1;
98+
FindBrokerResult brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false);
99+
assertThat(brokerResult).isNotNull();
100+
assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912");
101+
assertThat(brokerResult.isSlave()).isTrue();
102+
103+
// dledger case, when node n0 was voted as the leader
104+
brokerName = "BrokerB";
105+
HashMap<Long, String> addrMapNew = new HashMap<Long, String>();
106+
addrMapNew.put(0L, "127.0.0.1:10911");
107+
addrMapNew.put(2L, "127.0.0.1:10912");
108+
addrMapNew.put(3L, "127.0.0.1:10913");
109+
brokerAddrTable.put(brokerName, addrMapNew);
110+
brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false);
111+
assertThat(brokerResult).isNotNull();
112+
assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912");
113+
assertThat(brokerResult.isSlave()).isTrue();
114+
}
115+
77116
@Test
78117
public void testRegisterProducer() {
79118
boolean flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));

0 commit comments

Comments
 (0)