Skip to content

Commit 03c1f11

Browse files
authored
[ISSUE apache#2378] FIx NullPointerException when Consumer shutdown in the ClientRemotingProcessor.
1 parent f5a119f commit 03c1f11

File tree

2 files changed

+32
-0
lines changed

2 files changed

+32
-0
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,9 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg,
12141214

12151215
public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {
12161216
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
1217+
if (mqConsumerInner == null) {
1218+
return null;
1219+
}
12171220

12181221
ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();
12191222

client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.HashMap;
2121
import java.util.List;
22+
import java.util.Properties;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.concurrent.ConcurrentMap;
2425
import org.apache.rocketmq.client.ClientConfig;
@@ -29,6 +30,8 @@
2930
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
3031
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
3132
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
33+
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
34+
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
3235
import org.apache.rocketmq.common.protocol.route.BrokerData;
3336
import org.apache.rocketmq.common.protocol.route.QueueData;
3437
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -41,6 +44,7 @@
4144

4245
import static org.assertj.core.api.Assertions.assertThat;
4346
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.when;
4448

4549
@RunWith(MockitoJUnitRunner.class)
4650
public class MQClientInstanceTest {
@@ -139,6 +143,31 @@ public void testRegisterConsumer() throws RemotingException, InterruptedExceptio
139143
assertThat(flag).isTrue();
140144
}
141145

146+
147+
@Test
148+
public void testConsumerRunningInfoWhenConsumersIsEmptyOrNot() throws RemotingException, InterruptedException, MQBrokerException {
149+
MQConsumerInner mockConsumerInner = mock(MQConsumerInner.class);
150+
ConsumerRunningInfo mockConsumerRunningInfo = mock(ConsumerRunningInfo.class);
151+
when(mockConsumerInner.consumerRunningInfo()).thenReturn(mockConsumerRunningInfo);
152+
when(mockConsumerInner.consumeType()).thenReturn(ConsumeType.CONSUME_PASSIVELY);
153+
Properties properties = new Properties();
154+
when(mockConsumerRunningInfo.getProperties()).thenReturn(properties);
155+
mqClientInstance.unregisterConsumer(group);
156+
157+
ConsumerRunningInfo runningInfo = mqClientInstance.consumerRunningInfo(group);
158+
assertThat(runningInfo).isNull();
159+
boolean flag = mqClientInstance.registerConsumer(group, mockConsumerInner);
160+
assertThat(flag).isTrue();
161+
162+
runningInfo = mqClientInstance.consumerRunningInfo(group);
163+
assertThat(runningInfo).isNotNull();
164+
assertThat(mockConsumerInner.consumerRunningInfo().getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE));
165+
166+
mqClientInstance.unregisterConsumer(group);
167+
flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class));
168+
assertThat(flag).isTrue();
169+
}
170+
142171
@Test
143172
public void testRegisterAdminExt() {
144173
boolean flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));

0 commit comments

Comments
 (0)