11package org .apache .storm .kafka .spout .metric2 ;
22
3+
4+ import java .util .Collections ;
5+ import java .util .HashMap ;
6+ import java .util .HashSet ;
7+ import java .util .Map ;
8+ import java .util .Optional ;
9+ import java .util .Set ;
10+ import java .util .concurrent .ExecutionException ;
311import com .codahale .metrics .Gauge ;
412import com .codahale .metrics .Metric ;
513import org .apache .kafka .clients .admin .Admin ;
1321import org .junit .jupiter .api .extension .ExtendWith ;
1422import org .mockito .junit .jupiter .MockitoExtension ;
1523
16- import java .util .*;
17- import java .util .concurrent .ExecutionException ;
18-
1924import static org .junit .jupiter .api .Assertions .assertEquals ;
2025import static org .mockito .Mockito .*;
2126
@@ -25,7 +30,6 @@ public class KafkaOffsetPartitionAndTopicMetricsTest {
2530 private Set <TopicPartition > assignment ;
2631 private Admin admin = mock (Admin .class );
2732 private HashMap <TopicPartition , OffsetManager > offsetManagers ;
28- private ListOffsetsResult listOffsetsResultEarliest ;
2933 private KafkaFuture kafkaFuture = mock (KafkaFuture .class );
3034
3135 @ BeforeEach
@@ -35,7 +39,7 @@ public void initializeTests() {
3539 }
3640
3741 @ Test
38- public void registerMetricsGetSpoutLag () throws ExecutionException , InterruptedException {
42+ public void registerMetricsGetSpoutLagAndPartitionRecords () throws ExecutionException , InterruptedException {
3943
4044 TopicPartition tAp1 = new TopicPartition ("topicA" , 1 );
4145 TopicPartition tAp2 = new TopicPartition ("topicA" , 2 );
@@ -47,21 +51,19 @@ public void registerMetricsGetSpoutLag() throws ExecutionException, InterruptedE
4751 ListOffsetsResult .ListOffsetsResultInfo tBp1LatestListOffsetsResultInfo = new ListOffsetsResult .ListOffsetsResultInfo (300 , System .currentTimeMillis (), Optional .empty ());
4852 ListOffsetsResult .ListOffsetsResultInfo tBp2LatestListOffsetsResultInfo = new ListOffsetsResult .ListOffsetsResultInfo (400 , System .currentTimeMillis (), Optional .empty ());
4953
50- Map <TopicPartition , ListOffsetsResult .ListOffsetsResultInfo > topicPartitionLatestListOffsetsResultInfoMap ;
51-
52- topicPartitionLatestListOffsetsResultInfoMap = new HashMap <>();
54+ Map <TopicPartition , ListOffsetsResult .ListOffsetsResultInfo > topicPartitionLatestListOffsetsResultInfoMap = new HashMap <>();
5355 topicPartitionLatestListOffsetsResultInfoMap .put (tAp1 , tAp1LatestListOffsetsResultInfo );
5456 topicPartitionLatestListOffsetsResultInfoMap .put (tAp2 , tAp2LatestListOffsetsResultInfo );
5557 topicPartitionLatestListOffsetsResultInfoMap .put (tBp1 , tBp1LatestListOffsetsResultInfo );
5658 topicPartitionLatestListOffsetsResultInfoMap .put (tBp2 , tBp2LatestListOffsetsResultInfo );
5759
5860 when (kafkaFuture .get ()).thenReturn (topicPartitionLatestListOffsetsResultInfoMap );
5961
60- listOffsetsResultEarliest = mock (ListOffsetsResult .class );
61- when (listOffsetsResultEarliest .all ()).thenReturn (kafkaFuture );
62+ ListOffsetsResult listOffsetsResult = mock (ListOffsetsResult .class );
63+ when (listOffsetsResult .all ()).thenReturn (kafkaFuture );
6264
6365 admin = mock (Admin .class );
64- when (admin .listOffsets (anyMap ())).thenReturn (listOffsetsResultEarliest );
66+ when (admin .listOffsets (anyMap ())).thenReturn (listOffsetsResult );
6567
6668 OffsetManager offsetManagerTaP1 = mock (OffsetManager .class );
6769 when (offsetManagerTaP1 .getCommittedOffset ()).thenReturn (90L );
@@ -94,15 +96,15 @@ public void registerMetricsGetSpoutLag() throws ExecutionException, InterruptedE
9496 Gauge g2 = (Gauge ) result .get ("topicA/partition_2/spoutLag" );
9597 Gauge g3 = (Gauge ) result .get ("topicB/partition_1/spoutLag" );
9698 Gauge g4 = (Gauge ) result .get ("topicB/partition_2/spoutLag" );
97- assertEquals (g1 .getValue (), 10L );
98- assertEquals (g2 .getValue (), 30L );
99- assertEquals (g3 .getValue (), 100L );
100- assertEquals (g4 .getValue (), 50L );
99+ assertEquals (10L , g1 .getValue ());
100+ assertEquals (30L , g2 .getValue ());
101+ assertEquals (100L , g3 .getValue ());
102+ assertEquals (50L , g4 .getValue ());
101103
102104 Gauge gATotal = (Gauge ) result .get ("topicA/totalSpoutLag" );
103- assertEquals (gATotal .getValue (), 40L );
105+ assertEquals (40L , gATotal .getValue ());
104106 Gauge gBTotal = (Gauge ) result .get ("topicB/totalSpoutLag" );
105- assertEquals (gBTotal .getValue (), 150L );
107+ assertEquals (150L , gBTotal .getValue ());
106108
107109 //get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values.
108110
@@ -121,6 +123,39 @@ public void registerMetricsGetSpoutLag() throws ExecutionException, InterruptedE
121123 gBTotal = (Gauge ) result .get ("topicB/totalSpoutLag" );
122124 assertEquals (gBTotal .getValue (), 150L );
123125
126+ //get partition records
127+
128+ ListOffsetsResult .ListOffsetsResultInfo tAp1EarliestListOffsetsResultInfo = new ListOffsetsResult .ListOffsetsResultInfo (1 , System .currentTimeMillis (), Optional .empty ());
129+ ListOffsetsResult .ListOffsetsResultInfo tAp2EarliestListOffsetsResultInfo = new ListOffsetsResult .ListOffsetsResultInfo (2 , System .currentTimeMillis (), Optional .empty ());
130+ ListOffsetsResult .ListOffsetsResultInfo tBp1EarliestListOffsetsResultInfo = new ListOffsetsResult .ListOffsetsResultInfo (3 , System .currentTimeMillis (), Optional .empty ());
131+ ListOffsetsResult .ListOffsetsResultInfo tBp2EarliestListOffsetsResultInfo = new ListOffsetsResult .ListOffsetsResultInfo (4 , System .currentTimeMillis (), Optional .empty ());
132+
133+ Map <TopicPartition , ListOffsetsResult .ListOffsetsResultInfo > topicPartitionEarliestListOffsetsResultInfoMap = new HashMap <>();
134+ topicPartitionEarliestListOffsetsResultInfoMap .put (tAp1 , tAp1EarliestListOffsetsResultInfo );
135+ topicPartitionEarliestListOffsetsResultInfoMap .put (tAp2 , tAp2EarliestListOffsetsResultInfo );
136+ topicPartitionEarliestListOffsetsResultInfoMap .put (tBp1 , tBp1EarliestListOffsetsResultInfo );
137+ topicPartitionEarliestListOffsetsResultInfoMap .put (tBp2 , tBp2EarliestListOffsetsResultInfo );
138+
139+ //mock consecutive calls. Each call to the recordsInPartition gauge will call kafkaFuture.get() twice
140+ when (kafkaFuture .get ()).thenReturn (topicPartitionLatestListOffsetsResultInfoMap , topicPartitionEarliestListOffsetsResultInfoMap ,
141+ topicPartitionLatestListOffsetsResultInfoMap , topicPartitionEarliestListOffsetsResultInfoMap ,
142+ topicPartitionLatestListOffsetsResultInfoMap , topicPartitionEarliestListOffsetsResultInfoMap ,
143+ topicPartitionLatestListOffsetsResultInfoMap , topicPartitionEarliestListOffsetsResultInfoMap );
144+
145+ result = kafkaOffsetPartitionAndTopicMetrics .getMetrics ();
146+ g1 = (Gauge ) result .get ("topicA/partition_1/recordsInPartition" );
147+ g2 = (Gauge ) result .get ("topicA/partition_2/recordsInPartition" );
148+ g3 = (Gauge ) result .get ("topicB/partition_1/recordsInPartition" );
149+ g4 = (Gauge ) result .get ("topicB/partition_2/recordsInPartition" );
150+ assertEquals (99L , g1 .getValue ());
151+ assertEquals (198L , g2 .getValue ());
152+ assertEquals (297L , g3 .getValue ());
153+ assertEquals (396L , g4 .getValue ());
154+
155+ gATotal = (Gauge ) result .get ("topicA/totalRecordsInPartitions" );
156+ assertEquals (297L , gATotal .getValue ());
157+ gBTotal = (Gauge ) result .get ("topicB/totalRecordsInPartitions" );
158+ assertEquals (693L , gBTotal .getValue ());
124159
125160 }
126161
@@ -145,11 +180,11 @@ public void registerMetricsGetEarliestAndLatest() throws ExecutionException, Int
145180
146181 when (kafkaFuture .get ()).thenReturn (topicPartitionEarliestListOffsetsResultInfoMap );
147182
148- listOffsetsResultEarliest = mock (ListOffsetsResult .class );
149- when (listOffsetsResultEarliest .all ()).thenReturn (kafkaFuture );
183+ ListOffsetsResult listOffsetsResult = mock (ListOffsetsResult .class );
184+ when (listOffsetsResult .all ()).thenReturn (kafkaFuture );
150185
151186 admin = mock (Admin .class );
152- when (admin .listOffsets (anyMap ())).thenReturn (listOffsetsResultEarliest );
187+ when (admin .listOffsets (anyMap ())).thenReturn (listOffsetsResult );
153188
154189 OffsetManager offsetManagerTaP1 = mock (OffsetManager .class );
155190 when (offsetManagerTaP1 .getLatestEmittedOffset ()).thenReturn (50L );
0 commit comments