Skip to content

Commit 357368c

Browse files
committed
Adding new MetricSet
1 parent 1492996 commit 357368c

File tree

4 files changed

+220
-2
lines changed

4 files changed

+220
-2
lines changed

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,7 @@ private void refreshAssignment() {
634634
boolean partitionChanged = topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener);
635635
if (partitionChanged && canRegisterMetrics()) {
636636
LOG.info("Partitions assignments has changed, updating metrics.");
637-
kafkaOffsetMetricManager.registerMetricsForNewTopicPartitions(assignedPartitions);
637+
kafkaOffsetMetricManager.registerPartitionAndTopicLevelMetrics(assignedPartitions);
638638
}
639639
}
640640

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ public void registerMetricsForNewTopicPartitions(Set<TopicPartition> newAssignme
7575
}
7676
}
7777

78+
public void registerPartitionAndTopicLevelMetrics(Set<TopicPartition> newAssignment) {
79+
80+
KafkaOffsetPartitionAndTopicMetrics topicPartitionMetricSet
81+
= new KafkaOffsetPartitionAndTopicMetrics<>(offsetManagerSupplier, adminSupplier,newAssignment);
82+
topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet);
83+
}
84+
7885
public Map<TopicPartition, KafkaOffsetPartitionMetrics> getTopicPartitionMetricsMap() {
7986
return topicPartitionMetricsMap;
8087
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
package org.apache.storm.kafka.spout.metrics2;
2+
3+
import com.codahale.metrics.Gauge;
4+
import com.codahale.metrics.Metric;
5+
import com.codahale.metrics.MetricSet;
6+
import org.apache.kafka.clients.admin.Admin;
7+
import org.apache.kafka.clients.admin.ListOffsetsResult;
8+
import org.apache.kafka.clients.admin.OffsetSpec;
9+
import org.apache.kafka.common.KafkaFuture;
10+
import org.apache.kafka.common.TopicPartition;
11+
import org.apache.kafka.common.errors.RetriableException;
12+
import org.apache.storm.kafka.spout.internal.OffsetManager;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import java.util.Collections;
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
import java.util.Set;
20+
import java.util.concurrent.ExecutionException;
21+
import java.util.function.Supplier;
22+
23+
/**
24+
* This class is used compute the partition and topic level offset metrics.
25+
* <p>
26+
* Partition level metrics are:
27+
* topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition
28+
* topicName/partition_{number}/latestTimeOffset //gives end offset of the partition
29+
* topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout
30+
* topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout
31+
* topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset
32+
* topicName/partition_{number}/recordsInPartition // total number of records in the partition
33+
* </p>
34+
* <p>
35+
* Topic level metrics are:
36+
* topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout
37+
* topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout
38+
* topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout
39+
* topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout
40+
* topicName/spoutLag // total spout lag of all the associated partitions of this spout
41+
* topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout
42+
* </p>
43+
*/
44+
public class KafkaOffsetPartitionAndTopicMetrics <K, V> implements MetricSet {
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetPartitionAndTopicMetrics.class);
47+
private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
48+
private final Supplier<Admin> adminSupplier;
49+
private final Set<TopicPartition> assignment;
50+
private Map<String, KafkaOffsetTopicMetrics> topicMetricsMap;
51+
52+
53+
public KafkaOffsetPartitionAndTopicMetrics(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<Admin> adminSupplier, Set<TopicPartition> assignment) {
54+
this.offsetManagerSupplier = offsetManagerSupplier;
55+
this.adminSupplier = adminSupplier;
56+
this.assignment = assignment;
57+
}
58+
59+
@Override
60+
public Map<String, Metric> getMetrics() {
61+
62+
Map<String, Metric> metrics = new HashMap<>();
63+
64+
for (TopicPartition topicPartition : assignment) {
65+
66+
String topic=topicPartition.topic();
67+
KafkaOffsetTopicMetrics topicMetrics = topicMetricsMap.get(topic);
68+
if (topicMetrics == null) {
69+
topicMetrics = new KafkaOffsetTopicMetrics(topic);
70+
topicMetricsMap.put(topic, topicMetrics);
71+
}
72+
73+
String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition();
74+
KafkaOffsetTopicMetrics finalTopicMetrics = topicMetrics;
75+
Gauge<Long> spoutLagGauge = () -> {
76+
Map<TopicPartition, Long> endOffsets = getEndOffsets(Collections.singleton(topicPartition));
77+
if (endOffsets == null || endOffsets.isEmpty()) {
78+
LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition);
79+
return 0L;
80+
}
81+
// add value to topic level metric
82+
OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition);
83+
Long ret = endOffsets.get(topicPartition) - offsetManager.getCommittedOffset();
84+
finalTopicMetrics.totalSpoutLag += ret;
85+
return ret;
86+
};
87+
88+
Gauge<Long> earliestTimeOffsetGauge = () -> {
89+
Map<TopicPartition, Long> beginningOffsets = getBeginningOffsets(Collections.singleton(topicPartition));
90+
if (beginningOffsets == null || beginningOffsets.isEmpty()) {
91+
LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition);
92+
return 0L;
93+
}
94+
// add value to topic level metric
95+
Long ret = beginningOffsets.get(topicPartition);
96+
finalTopicMetrics.totalEarliestTimeOffset += ret;
97+
return ret;
98+
};
99+
100+
Gauge<Long> latestTimeOffsetGauge = () -> {
101+
Map<TopicPartition, Long> endOffsets = getEndOffsets(Collections.singleton(topicPartition));
102+
if (endOffsets == null || endOffsets.isEmpty()) {
103+
LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition);
104+
return 0L;
105+
}
106+
// add value to topic level metric
107+
Long ret = endOffsets.get(topicPartition);
108+
finalTopicMetrics.totalLatestTimeOffset += ret;
109+
return ret;
110+
};
111+
112+
Gauge<Long> latestEmittedOffsetGauge = () -> {
113+
// add value to topic level metric
114+
OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition);
115+
Long ret = offsetManager.getLatestEmittedOffset();
116+
finalTopicMetrics.totalLatestEmittedOffset+=ret;
117+
return ret;
118+
};
119+
120+
Gauge<Long> latestCompletedOffsetGauge = () -> {
121+
// add value to topic level metric
122+
OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition);
123+
Long ret = offsetManager.getCommittedOffset();
124+
finalTopicMetrics.totalLatestCompletedOffset+=ret;
125+
return ret;
126+
};
127+
128+
Gauge<Long> recordsInPartitionGauge = () -> {
129+
Map<TopicPartition, Long> endOffsets = getEndOffsets(Collections.singleton(topicPartition));
130+
if (endOffsets == null || endOffsets.isEmpty()) {
131+
LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition);
132+
return 0L;
133+
}
134+
Map<TopicPartition, Long> beginningOffsets = getBeginningOffsets(Collections.singleton(topicPartition));
135+
if (beginningOffsets == null || beginningOffsets.isEmpty()) {
136+
LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition);
137+
return 0L;
138+
}
139+
// add value to topic level metric
140+
Long ret = endOffsets.get(topicPartition) - beginningOffsets.get(topicPartition);
141+
finalTopicMetrics.totalRecordsInPartitions+=ret;
142+
return ret;
143+
};
144+
145+
metrics.put(metricPath + "/" + "spoutLag", spoutLagGauge);
146+
metrics.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffsetGauge);
147+
metrics.put(metricPath + "/" + "latestTimeOffset", latestTimeOffsetGauge);
148+
metrics.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffsetGauge);
149+
metrics.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffsetGauge);
150+
metrics.put(metricPath + "/" + "recordsInPartition", recordsInPartitionGauge);
151+
152+
}
153+
154+
metrics.putAll(topicMetricsMap);
155+
156+
return metrics;
157+
158+
}
159+
160+
private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition> topicPartitions) {
161+
Admin admin = adminSupplier.get();
162+
if (admin == null) {
163+
LOG.error("Kafka admin object is null, returning 0.");
164+
return Collections.EMPTY_MAP;
165+
}
166+
167+
Map<TopicPartition, Long> beginningOffsets;
168+
try {
169+
beginningOffsets = getOffsets(admin, topicPartitions, OffsetSpec.earliest());
170+
} catch (RetriableException | ExecutionException | InterruptedException e) {
171+
LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartitions, e);
172+
return Collections.EMPTY_MAP;
173+
}
174+
return beginningOffsets;
175+
}
176+
177+
private Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition> topicPartitions) {
178+
Admin admin = adminSupplier.get();
179+
if (admin == null) {
180+
LOG.error("Kafka admin object is null, returning 0.");
181+
return Collections.EMPTY_MAP;
182+
}
183+
184+
Map<TopicPartition, Long> endOffsets;
185+
try {
186+
endOffsets = getOffsets(admin, topicPartitions, OffsetSpec.latest());
187+
} catch (RetriableException | ExecutionException | InterruptedException e) {
188+
LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartitions, e);
189+
return Collections.EMPTY_MAP;
190+
}
191+
return endOffsets;
192+
}
193+
194+
private static Map<TopicPartition, Long> getOffsets(Admin admin, Set<TopicPartition> topicPartitions, OffsetSpec offsetSpec)
195+
throws InterruptedException, ExecutionException {
196+
197+
Map<TopicPartition, OffsetSpec> offsetSpecMap = new HashMap<>();
198+
for (TopicPartition topicPartition : topicPartitions) {
199+
offsetSpecMap.put(topicPartition, offsetSpec);
200+
}
201+
Map<TopicPartition, Long> ret = new HashMap<>();
202+
ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap);
203+
KafkaFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> all = listOffsetsResult.all();
204+
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap = all.get();
205+
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> entry :
206+
topicPartitionListOffsetsResultInfoMap.entrySet()) {
207+
ret.put(entry.getKey(), entry.getValue().offset());
208+
}
209+
return ret;
210+
}
211+
}

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public Long getValue() {
101101
}
102102
// add value to topic level metric
103103
Long ret = beginningOffsets.get(topicPartition);
104-
topicMetrics.totalEarliestTimeOffset += beginningOffsets.get(topicPartition);
104+
topicMetrics.totalEarliestTimeOffset += ret;
105105
return ret;
106106
}
107107
};

0 commit comments

Comments
 (0)