2020import java .io .ByteArrayOutputStream ;
2121import java .lang .reflect .Field ;
2222import java .net .InetSocketAddress ;
23- import java .util .Collections ;
24- import java .util .HashSet ;
25- import java .util .List ;
26- import java .util .Set ;
23+ import java .util .*;
24+
2725import org .apache .rocketmq .client .ClientConfig ;
2826import org .apache .rocketmq .client .consumer .store .OffsetStore ;
2927import org .apache .rocketmq .client .consumer .store .ReadOffsetType ;
@@ -515,11 +513,14 @@ public void testComputePullByTimeStamp() throws Exception{
515513
516514 @ Test
517515 public void testConsumerAfterShutdown () throws Exception {
518- DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer ();
519- defaultLitePullConsumer .setNamesrvAddr ("127.0.0.1:9876" );
520- defaultLitePullConsumer .subscribe (topic , "*" );
516+ DefaultLitePullConsumer defaultLitePullConsumer = createSubscribeLitePullConsumer ();
517+
518+ DefaultLitePullConsumer mockConsumer = spy (defaultLitePullConsumer );
519+ when (mockConsumer .poll (anyLong ())).thenReturn (new ArrayList <>());
520+
521521 new AsyncConsumer ().executeAsync (defaultLitePullConsumer );
522- Thread .sleep (10 * 1000 );
522+
523+ Thread .sleep (100 );
523524 defaultLitePullConsumer .shutdown ();
524525 assertThat (defaultLitePullConsumer .isRunning ()).isFalse ();
525526 }
@@ -531,9 +532,7 @@ public void executeAsync(final DefaultLitePullConsumer consumer) {
531532 public void run () {
532533 while (consumer .isRunning ()) {
533534 List <MessageExt > poll = consumer .poll (2 * 1000 );
534- System .out .println ("consumer is still running" );
535535 }
536- System .out .println ("consumer shutdown" );
537536 }
538537 }).start ();
539538 }
0 commit comments