Skip to content

Commit eef79aa

Browse files
reiabreurzo1
authored andcommitted
Adding unit tests
(cherry picked from commit 4a5c383)
1 parent 4d70aec commit eef79aa

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,19 @@ public class KafkaOffsetPartitionAndTopicMetrics implements MetricSet {
4747
private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
4848
private final Supplier<Admin> adminSupplier;
4949
private final Set<TopicPartition> assignment;
50-
private final Map<String, KafkaOffsetTopicMetrics> topicMetricsMap;
51-
5250

5351
public KafkaOffsetPartitionAndTopicMetrics(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<Admin> adminSupplier, Set<TopicPartition> assignment) {
5452
this.offsetManagerSupplier = offsetManagerSupplier;
5553
this.adminSupplier = adminSupplier;
5654
this.assignment = assignment;
57-
this.topicMetricsMap=new HashMap<>();
55+
5856
}
5957

6058
@Override
6159
public Map<String, Metric> getMetrics() {
6260

61+
Map<String, KafkaOffsetTopicMetrics> topicMetricsMap=new HashMap<>();
62+
6363
Map<String, Metric> metrics = new HashMap<>();
6464

6565
for (TopicPartition topicPartition : assignment) {

external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,23 @@ public void registerPartitionAndTopicMetrics() throws ExecutionException, Interr
101101
Gauge gBTotal= (Gauge) result.get("topicB/totalSpoutLag");
102102
assertEquals(gBTotal.getValue(),150L);
103103

104+
//get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values.
105+
106+
result = kafkaOffsetPartitionAndTopicMetrics.getMetrics();
107+
g1= (Gauge) result.get("topicA/partition_1/spoutLag");
108+
g2= (Gauge) result.get("topicA/partition_2/spoutLag");
109+
g3= (Gauge) result.get("topicB/partition_1/spoutLag");
110+
g4= (Gauge) result.get("topicB/partition_2/spoutLag");
111+
assertEquals(g1.getValue(),10L);
112+
assertEquals(g2.getValue(),30L);
113+
assertEquals(g3.getValue(),100L);
114+
assertEquals(g4.getValue(),50L);
115+
116+
gATotal= (Gauge) result.get("topicA/totalSpoutLag");
117+
assertEquals(gATotal.getValue(),40L);
118+
gBTotal= (Gauge) result.get("topicB/totalSpoutLag");
119+
assertEquals(gBTotal.getValue(),150L);
120+
104121

105122
}
106123
}

0 commit comments

Comments
 (0)