88import  org .apache .kafka .common .TopicPartition ;
99import  org .apache .storm .kafka .spout .internal .OffsetManager ;
1010import  org .apache .storm .kafka .spout .metrics2 .KafkaOffsetPartitionAndTopicMetrics ;
11+ import  org .junit .jupiter .api .BeforeEach ;
1112import  org .junit .jupiter .api .Test ;
1213import  org .junit .jupiter .api .extension .ExtendWith ;
1314import  org .mockito .junit .jupiter .MockitoExtension ;
1617import  java .util .concurrent .ExecutionException ;
1718
1819import  static  org .junit .jupiter .api .Assertions .assertEquals ;
19- import  static  org .mockito .ArgumentMatchers .anyMap ;
20- import  static  org .mockito .Mockito .mock ;
21- import  static  org .mockito .Mockito .when ;
20+ import  static  org .mockito .Mockito .*;
2221
2322@ ExtendWith (MockitoExtension .class )
2423public  class  KafkaOffsetPartitionAndTopicMetricsTest  {
2524
2625    private  Set <TopicPartition > assignment ;
27-     private  Admin  admin ;
26+     private  Admin  admin  =  mock ( Admin . class ) ;
2827    private  HashMap <TopicPartition , OffsetManager > offsetManagers ;
29-     private  ListOffsetsResult  listOffsetsResult ;
30-     private  KafkaFuture <Map <TopicPartition , ListOffsetsResult .ListOffsetsResultInfo >> kafkaFuture ;
31-     private  Map <TopicPartition , ListOffsetsResult .ListOffsetsResultInfo > topicPartitionListOffsetsResultInfoMap ;
28+     private  ListOffsetsResult  listOffsetsResultEarliest ;
29+     private  KafkaFuture  kafkaFuture  = mock (KafkaFuture .class );
30+ 
31+     @ BeforeEach 
32+     public  void  initializeTests () {
33+         reset (admin , kafkaFuture );
34+ 
35+     }
3236
3337    @ Test 
34-     public  void  registerPartitionAndTopicMetrics () throws  ExecutionException , InterruptedException  {
38+     public  void  registerMetricsGetLatest () throws  ExecutionException , InterruptedException  {
3539
3640        TopicPartition  tAp1  = new  TopicPartition ("topicA" ,1 );
3741        TopicPartition  tAp2  = new  TopicPartition ("topicA" ,2 );
3842        TopicPartition  tBp1  = new  TopicPartition ("topicB" ,1 );
3943        TopicPartition  tBp2  = new  TopicPartition ("topicB" ,2 );
4044
41-         ListOffsetsResult .ListOffsetsResultInfo  tAp1ListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (100 ,System .currentTimeMillis (),Optional .empty ());
42-         ListOffsetsResult .ListOffsetsResultInfo  tAp2ListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (200 ,System .currentTimeMillis (),Optional .empty ());
43-         ListOffsetsResult .ListOffsetsResultInfo  tBp1ListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (300 ,System .currentTimeMillis (),Optional .empty ());
44-         ListOffsetsResult .ListOffsetsResultInfo  tBp2ListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (400 ,System .currentTimeMillis (),Optional .empty ());
45+         ListOffsetsResult .ListOffsetsResultInfo  tAp1LatestListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (100 ,System .currentTimeMillis (),Optional .empty ());
46+         ListOffsetsResult .ListOffsetsResultInfo  tAp2LatestListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (200 ,System .currentTimeMillis (),Optional .empty ());
47+         ListOffsetsResult .ListOffsetsResultInfo  tBp1LatestListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (300 ,System .currentTimeMillis (),Optional .empty ());
48+         ListOffsetsResult .ListOffsetsResultInfo  tBp2LatestListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (400 ,System .currentTimeMillis (),Optional .empty ());
4549
46-         topicPartitionListOffsetsResultInfoMap  =  new   HashMap <>() ;
50+           Map < TopicPartition ,  ListOffsetsResult . ListOffsetsResultInfo >  topicPartitionLatestListOffsetsResultInfoMap ;
4751
48-         topicPartitionListOffsetsResultInfoMap .put (tAp1 ,tAp1ListOffsetsResultInfo );
49-         topicPartitionListOffsetsResultInfoMap .put (tAp2 ,tAp2ListOffsetsResultInfo );
50-         topicPartitionListOffsetsResultInfoMap .put (tBp1 ,tBp1ListOffsetsResultInfo );
51-         topicPartitionListOffsetsResultInfoMap .put (tBp2 ,tBp2ListOffsetsResultInfo );
52+         topicPartitionLatestListOffsetsResultInfoMap  = new  HashMap <>();
53+         topicPartitionLatestListOffsetsResultInfoMap .put (tAp1 ,tAp1LatestListOffsetsResultInfo );
54+         topicPartitionLatestListOffsetsResultInfoMap .put (tAp2 ,tAp2LatestListOffsetsResultInfo );
55+         topicPartitionLatestListOffsetsResultInfoMap .put (tBp1 ,tBp1LatestListOffsetsResultInfo );
56+         topicPartitionLatestListOffsetsResultInfoMap .put (tBp2 ,tBp2LatestListOffsetsResultInfo );
5257
53-         kafkaFuture  = mock (KafkaFuture .class );
54-         when (kafkaFuture .get ()).thenReturn (topicPartitionListOffsetsResultInfoMap );
58+         when (kafkaFuture .get ()).thenReturn (topicPartitionLatestListOffsetsResultInfoMap );
5559
56-         listOffsetsResult  = mock (ListOffsetsResult .class );
57-         when (listOffsetsResult .all ()).thenReturn (kafkaFuture );
60+         listOffsetsResultEarliest  = mock (ListOffsetsResult .class );
61+         when (listOffsetsResultEarliest .all ()).thenReturn (kafkaFuture );
5862
5963        admin =mock (Admin .class );
60-         when (admin .listOffsets (anyMap ())).thenReturn (listOffsetsResult );
61- 
62-         offsetManagers = new  HashMap <>();
64+         when (admin .listOffsets (anyMap ())).thenReturn (listOffsetsResultEarliest );
6365
6466        OffsetManager  offsetManagerTaP1  = mock (OffsetManager .class );
6567        when (offsetManagerTaP1 .getCommittedOffset ()).thenReturn (90L );
@@ -73,6 +75,7 @@ public void registerPartitionAndTopicMetrics() throws ExecutionException, Interr
7375        OffsetManager  offsetManagerTbP2  = mock (OffsetManager .class );
7476        when (offsetManagerTbP2 .getCommittedOffset ()).thenReturn (350L );
7577
78+         offsetManagers = new  HashMap <>();
7679        offsetManagers .put (tAp1 ,offsetManagerTaP1 );
7780        offsetManagers .put (tAp2 ,offsetManagerTaP2 );
7881        offsetManagers .put (tBp1 ,offsetManagerTbP1 );
@@ -104,10 +107,10 @@ public void registerPartitionAndTopicMetrics() throws ExecutionException, Interr
104107        //get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values. 
105108
106109        result  = kafkaOffsetPartitionAndTopicMetrics .getMetrics ();
107-           g1 = (Gauge ) result .get ("topicA/partition_1/spoutLag" );
108-           g2 = (Gauge ) result .get ("topicA/partition_2/spoutLag" );
109-           g3 = (Gauge ) result .get ("topicB/partition_1/spoutLag" );
110-           g4 = (Gauge ) result .get ("topicB/partition_2/spoutLag" );
110+         g1 = (Gauge ) result .get ("topicA/partition_1/spoutLag" );
111+         g2 = (Gauge ) result .get ("topicA/partition_2/spoutLag" );
112+         g3 = (Gauge ) result .get ("topicB/partition_1/spoutLag" );
113+         g4 = (Gauge ) result .get ("topicB/partition_2/spoutLag" );
111114        assertEquals (g1 .getValue (),10L );
112115        assertEquals (g2 .getValue (),30L );
113116        assertEquals (g3 .getValue (),100L );
@@ -119,5 +122,79 @@ public void registerPartitionAndTopicMetrics() throws ExecutionException, Interr
119122        assertEquals (gBTotal .getValue (),150L );
120123
121124
125+     }
126+ 
127+     @ Test 
128+     public  void  registerMetricsGetEarliest () throws  ExecutionException , InterruptedException  {
129+ 
130+         TopicPartition  tAp1  = new  TopicPartition ("topicA" ,1 );
131+         TopicPartition  tAp2  = new  TopicPartition ("topicA" ,2 );
132+         TopicPartition  tBp1  = new  TopicPartition ("topicB" ,1 );
133+         TopicPartition  tBp2  = new  TopicPartition ("topicB" ,2 );
134+ 
135+         ListOffsetsResult .ListOffsetsResultInfo  tAp1EarliestListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (1 ,System .currentTimeMillis (),Optional .empty ());
136+         ListOffsetsResult .ListOffsetsResultInfo  tAp2EarliestListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (1 ,System .currentTimeMillis (),Optional .empty ());
137+         ListOffsetsResult .ListOffsetsResultInfo  tBp1EarliestListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (1 ,System .currentTimeMillis (),Optional .empty ());
138+         ListOffsetsResult .ListOffsetsResultInfo  tBp2EarliestListOffsetsResultInfo  = new  ListOffsetsResult .ListOffsetsResultInfo (1 ,System .currentTimeMillis (),Optional .empty ());
139+ 
140+          Map <TopicPartition , ListOffsetsResult .ListOffsetsResultInfo > topicPartitionEarliestListOffsetsResultInfoMap ;
141+ 
142+         topicPartitionEarliestListOffsetsResultInfoMap  = new  HashMap <>();
143+         topicPartitionEarliestListOffsetsResultInfoMap .put (tAp1 ,tAp1EarliestListOffsetsResultInfo );
144+         topicPartitionEarliestListOffsetsResultInfoMap .put (tAp2 ,tAp2EarliestListOffsetsResultInfo );
145+         topicPartitionEarliestListOffsetsResultInfoMap .put (tBp1 ,tBp1EarliestListOffsetsResultInfo );
146+         topicPartitionEarliestListOffsetsResultInfoMap .put (tBp2 ,tBp2EarliestListOffsetsResultInfo );
147+ 
148+         when (kafkaFuture .get ()).thenReturn (topicPartitionEarliestListOffsetsResultInfoMap );
149+ 
150+         listOffsetsResultEarliest  = mock (ListOffsetsResult .class );
151+         when (listOffsetsResultEarliest .all ()).thenReturn (kafkaFuture );
152+ 
153+         admin =mock (Admin .class );
154+         when (admin .listOffsets (anyMap ())).thenReturn (listOffsetsResultEarliest );
155+ 
156+         offsetManagers = new  HashMap <>();
157+ 
158+         assignment =new  HashSet <>();
159+         assignment .add (tAp1 );
160+         assignment .add (tAp2 );
161+         assignment .add (tBp1 );
162+         assignment .add (tBp2 );
163+ 
164+         KafkaOffsetPartitionAndTopicMetrics  kafkaOffsetPartitionAndTopicMetrics  = new  KafkaOffsetPartitionAndTopicMetrics (() -> Collections .unmodifiableMap (offsetManagers ),() -> admin ,assignment );
165+         Map <String , Metric >  result  = kafkaOffsetPartitionAndTopicMetrics .getMetrics ();
166+         Gauge  g1 = (Gauge ) result .get ("topicA/partition_1/earliestTimeOffset" );
167+         Gauge  g2 = (Gauge ) result .get ("topicA/partition_2/earliestTimeOffset" );
168+         Gauge  g3 = (Gauge ) result .get ("topicB/partition_1/earliestTimeOffset" );
169+         Gauge  g4 = (Gauge ) result .get ("topicB/partition_2/earliestTimeOffset" );
170+         assertEquals (g1 .getValue (),1L );
171+         assertEquals (g2 .getValue (),1L );
172+         assertEquals (g3 .getValue (),1L );
173+         assertEquals (g4 .getValue (),1L );
174+ 
175+         Gauge  gATotal = (Gauge ) result .get ("topicA/totalEarliestTimeOffset" );
176+         assertEquals (2L ,gATotal .getValue ());
177+         Gauge  gBTotal = (Gauge ) result .get ("topicB/totalEarliestTimeOffset" );
178+         assertEquals (2L ,gBTotal .getValue ());
179+ 
180+         //get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values. 
181+ 
182+         result  = kafkaOffsetPartitionAndTopicMetrics .getMetrics ();
183+ 
184+         g1 =(Gauge ) result .get ("topicA/partition_1/earliestTimeOffset" );
185+         g2 =(Gauge ) result .get ("topicA/partition_2/earliestTimeOffset" );
186+         g3 =(Gauge ) result .get ("topicB/partition_1/earliestTimeOffset" );
187+         g4 =(Gauge ) result .get ("topicB/partition_2/earliestTimeOffset" );
188+         assertEquals (g1 .getValue (),1L );
189+         assertEquals (g2 .getValue (),1L );
190+         assertEquals (g3 .getValue (),1L );
191+         assertEquals (g4 .getValue (),1L );
192+ 
193+         gATotal = (Gauge ) result .get ("topicA/totalEarliestTimeOffset" );
194+         assertEquals (2L ,gATotal .getValue ());
195+         gBTotal = (Gauge ) result .get ("topicB/totalEarliestTimeOffset" );
196+         assertEquals (2L ,gBTotal .getValue ());
197+ 
198+ 
122199    }
123200}
0 commit comments