Skip to content

Commit 7113e3f

Browse files
author
Maik Toepfer
committed
use lambdas
1 parent faad4fd commit 7113e3f

File tree

5 files changed

+38
-55
lines changed

5 files changed

+38
-55
lines changed

src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,16 +152,13 @@ record = _baseConsumer.receive();
152152
int partition = record.partition();
153153
/* Commit availability and commit latency service */
154154
/* Call commitAsync, wait for a NON-NULL return value (see https://issues.apache.org/jira/browse/KAFKA-6183) */
155-
OffsetCommitCallback commitCallback = new OffsetCommitCallback() {
156-
@Override
157-
public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap, Exception kafkaException) {
158-
if (kafkaException != null) {
159-
LOG.error("Exception while trying to perform an asynchronous commit.", kafkaException);
160-
_commitAvailabilityMetrics._failedCommitOffsets.record();
161-
} else {
162-
_commitAvailabilityMetrics._offsetsCommitted.record();
163-
_commitLatencyMetrics.recordCommitComplete();
164-
}
155+
OffsetCommitCallback commitCallback = (topicPartitionOffsetAndMetadataMap, kafkaException) -> {
156+
if (kafkaException != null) {
157+
LOG.error("Exception while trying to perform an asynchronous commit.", kafkaException);
158+
_commitAvailabilityMetrics._failedCommitOffsets.record();
159+
} else {
160+
_commitAvailabilityMetrics._offsetsCommitted.record();
161+
_commitLatencyMetrics.recordCommitComplete();
165162
}
166163
};
167164

src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,14 @@ public GraphiteMetricsReporterService(Map<String, Object> props, String name)
5252

5353
@Override
5454
public synchronized void start() {
55-
_executor.scheduleAtFixedRate(new Runnable() {
56-
@Override
57-
public void run() {
58-
try {
59-
GraphiteMetricsReporterService.this.reportMetrics();
60-
} catch (Exception e) {
61-
LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics",
62-
e);
63-
}
55+
_executor.scheduleAtFixedRate(() -> {
56+
try {
57+
GraphiteMetricsReporterService.this.reportMetrics();
58+
} catch (Exception e) {
59+
LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics",
60+
e);
6461
}
65-
}, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS
62+
}, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS
6663
);
6764
LOG.info("{}/GraphiteMetricsReporterService started", _name);
6865
}

src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,7 @@ public class OffsetCommitService implements Service {
134134
_consumerNetworkClient = new ConsumerNetworkClient(logContext, kafkaClient, metadata, _time, retryBackoffMs,
135135
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), heartbeatIntervalMs);
136136

137-
ThreadFactory threadFactory = new ThreadFactory() {
138-
@Override
139-
public Thread newThread(Runnable runnable) {
140-
return new Thread(runnable, serviceName + SERVICE_SUFFIX);
141-
}
142-
};
137+
ThreadFactory threadFactory = runnable -> new Thread(runnable, serviceName + SERVICE_SUFFIX);
143138
_scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
144139

145140
LOGGER.info("OffsetCommitService's ConsumerConfig - {}", Utils.prettyPrint(config.values()));

src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,24 @@ public OffsetCommitServiceMetrics(final Metrics metrics, final Map<String, Strin
5959
"The total count of group coordinator unsuccessfully receiving consumer offset commit requests.", tags),
6060
new CumulativeSum());
6161

62-
Measurable measurable = new Measurable() {
63-
@Override
64-
public double measure(MetricConfig config, long now) {
65-
double offsetCommitSuccessRate = (double) metrics.metrics()
66-
.get(metrics.metricName(SUCCESS_RATE_METRIC, METRIC_GROUP_NAME, tags))
67-
.metricValue();
68-
double offsetCommitFailureRate = (double) metrics.metrics()
69-
.get(metrics.metricName(FAILURE_RATE_METRIC, METRIC_GROUP_NAME, tags))
70-
.metricValue();
71-
72-
if (new Double(offsetCommitSuccessRate).isNaN()) {
73-
offsetCommitSuccessRate = 0;
74-
}
75-
76-
if (new Double(offsetCommitFailureRate).isNaN()) {
77-
offsetCommitFailureRate = 0;
78-
}
79-
80-
return offsetCommitSuccessRate + offsetCommitFailureRate > 0 ? offsetCommitSuccessRate / (
81-
offsetCommitSuccessRate + offsetCommitFailureRate) : 0;
62+
Measurable measurable = (config, now) -> {
63+
double offsetCommitSuccessRate = (double) metrics.metrics()
64+
.get(metrics.metricName(SUCCESS_RATE_METRIC, METRIC_GROUP_NAME, tags))
65+
.metricValue();
66+
double offsetCommitFailureRate = (double) metrics.metrics()
67+
.get(metrics.metricName(FAILURE_RATE_METRIC, METRIC_GROUP_NAME, tags))
68+
.metricValue();
69+
70+
if (new Double(offsetCommitSuccessRate).isNaN()) {
71+
offsetCommitSuccessRate = 0;
8272
}
73+
74+
if (new Double(offsetCommitFailureRate).isNaN()) {
75+
offsetCommitFailureRate = 0;
76+
}
77+
78+
return offsetCommitSuccessRate + offsetCommitFailureRate > 0 ? offsetCommitSuccessRate / (
79+
offsetCommitSuccessRate + offsetCommitFailureRate) : 0;
8380
};
8481

8582
metrics.addMetric(new MetricName("offset-commit-availability-avg", METRIC_GROUP_NAME,

src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,13 @@ private ConsumeService consumeService() throws Exception {
144144
/* define return value */
145145
Mockito.when(kmBaseConsumer.lastCommitted()).thenReturn(MOCK_LAST_COMMITTED_OFFSET);
146146
Mockito.when(kmBaseConsumer.committed(Mockito.any())).thenReturn(new OffsetAndMetadata(FIRST_OFFSET));
147-
Mockito.doAnswer(new Answer<Void>() {
148-
@Override
149-
public Void answer(InvocationOnMock invocationOnMock) {
150-
OffsetCommitCallback callback = invocationOnMock.getArgument(0);
151-
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
152-
committedOffsets.put(new TopicPartition(TOPIC, PARTITION), new OffsetAndMetadata(FIRST_OFFSET));
153-
callback.onComplete(committedOffsets, null);
147+
Mockito.doAnswer((Answer<Void>) invocationOnMock -> {
148+
OffsetCommitCallback callback = invocationOnMock.getArgument(0);
149+
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
150+
committedOffsets.put(new TopicPartition(TOPIC, PARTITION), new OffsetAndMetadata(FIRST_OFFSET));
151+
callback.onComplete(committedOffsets, null);
154152

155-
return null;
156-
}
153+
return null;
157154
}).when(kmBaseConsumer).commitAsync(Mockito.any(OffsetCommitCallback.class));
158155

159156

0 commit comments

Comments
 (0)