Skip to content

Commit f1fdc42

Browse files
author
Andrew Choi
authored
[ kafka-monitor ] close ZK connections cleanly (linkedin#223)
Removes a regression due to which ZK connections opened by MultiClusterTopicManagementService aren't recycled. This commit attempts to fix that issue by wrapping auto-closable KafkaZkClient with try-with-resources statement.
1 parent 9f1e4bf commit f1fdc42

File tree

1 file changed

+70
-59
lines changed

1 file changed

+70
-59
lines changed

src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java

Lines changed: 70 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -322,62 +322,71 @@ private Set<Node> getAvailableBrokers() throws ExecutionException, InterruptedEx
322322
}
323323

324324
void maybeReassignPartitionAndElectLeader() throws Exception {
325-
KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSecurityEnabled(), com.linkedin.kmf.common.Utils.ZK_SESSION_TIMEOUT_MS,
326-
com.linkedin.kmf.common.Utils.ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener", null);
327-
328-
List<TopicPartitionInfo> partitionInfoList = _adminClient.describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();
329-
Collection<Node> brokers = this.getAvailableBrokers();
330-
boolean partitionReassigned = false;
331-
if (partitionInfoList.size() == 0) {
332-
throw new IllegalStateException("Topic " + _topic + " does not exist in cluster.");
333-
}
325+
try (KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSecurityEnabled(), com.linkedin.kmf.common.Utils.ZK_SESSION_TIMEOUT_MS,
326+
com.linkedin.kmf.common.Utils.ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener", null)) {
327+
328+
List<TopicPartitionInfo> partitionInfoList = _adminClient
329+
.describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();
330+
Collection<Node> brokers = this.getAvailableBrokers();
331+
boolean partitionReassigned = false;
332+
if (partitionInfoList.size() == 0) {
333+
throw new IllegalStateException("Topic " + _topic + " does not exist in cluster.");
334+
}
334335

335-
int currentReplicationFactor = getReplicationFactor(partitionInfoList);
336-
int expectedReplicationFactor = Math.max(currentReplicationFactor, _replicationFactor);
336+
int currentReplicationFactor = getReplicationFactor(partitionInfoList);
337+
int expectedReplicationFactor = Math.max(currentReplicationFactor, _replicationFactor);
337338

338-
if (_replicationFactor < currentReplicationFactor)
339-
LOG.debug("Configured replication factor {} is smaller than the current replication factor {} of the topic {} in cluster.",
340-
_replicationFactor, currentReplicationFactor, _topic);
339+
if (_replicationFactor < currentReplicationFactor)
340+
LOG.debug(
341+
"Configured replication factor {} is smaller than the current replication factor {} of the topic {} in cluster.",
342+
_replicationFactor, currentReplicationFactor, _topic);
341343

342-
if (expectedReplicationFactor > currentReplicationFactor && !zkClient.reassignPartitionsInProgress()) {
343-
LOG.info("MultiClusterTopicManagementService will increase the replication factor of the topic {} in cluster"
344-
+ "from {} to {}", _topic, currentReplicationFactor, expectedReplicationFactor);
345-
reassignPartitions(zkClient, brokers, _topic, partitionInfoList.size(), expectedReplicationFactor);
346-
partitionReassigned = true;
347-
}
344+
if (expectedReplicationFactor > currentReplicationFactor && !zkClient
345+
.reassignPartitionsInProgress()) {
346+
LOG.info(
347+
"MultiClusterTopicManagementService will increase the replication factor of the topic {} in cluster"
348+
+ "from {} to {}", _topic, currentReplicationFactor, expectedReplicationFactor);
349+
reassignPartitions(zkClient, brokers, _topic, partitionInfoList.size(),
350+
expectedReplicationFactor);
351+
partitionReassigned = true;
352+
}
348353

349-
// Update the properties of the monitor topic if any config is different from the user-specified config
350-
Properties currentProperties = zkClient.getEntityConfigs(ConfigType.Topic(), _topic);
351-
Properties expectedProperties = new Properties();
352-
for (Object key: currentProperties.keySet())
353-
expectedProperties.put(key, currentProperties.get(key));
354-
for (Object key: _topicProperties.keySet())
355-
expectedProperties.put(key, _topicProperties.get(key));
356-
357-
if (!currentProperties.equals(expectedProperties)) {
358-
LOG.info("MultiClusterTopicManagementService will overwrite properties of the topic {} "
359-
+ "in cluster from {} to {}.", _topic, currentProperties, expectedProperties);
360-
zkClient.setOrCreateEntityConfigs(ConfigType.Topic(), _topic, expectedProperties);
361-
}
354+
// Update the properties of the monitor topic if any config is different from the user-specified config
355+
Properties currentProperties = zkClient.getEntityConfigs(ConfigType.Topic(), _topic);
356+
Properties expectedProperties = new Properties();
357+
for (Object key : currentProperties.keySet())
358+
expectedProperties.put(key, currentProperties.get(key));
359+
for (Object key : _topicProperties.keySet())
360+
expectedProperties.put(key, _topicProperties.get(key));
361+
362+
if (!currentProperties.equals(expectedProperties)) {
363+
LOG.info("MultiClusterTopicManagementService will overwrite properties of the topic {} "
364+
+ "in cluster from {} to {}.", _topic, currentProperties, expectedProperties);
365+
zkClient.setOrCreateEntityConfigs(ConfigType.Topic(), _topic, expectedProperties);
366+
}
362367

363-
if (partitionInfoList.size() >= brokers.size() &&
364-
someBrokerNotPreferredLeader(partitionInfoList, brokers) && !zkClient.reassignPartitionsInProgress()) {
365-
LOG.info("{} will reassign partitions of the topic {} in cluster.", this.getClass().toString(), _topic);
366-
reassignPartitions(zkClient, brokers, _topic, partitionInfoList.size(), expectedReplicationFactor);
367-
partitionReassigned = true;
368-
}
368+
if (partitionInfoList.size() >= brokers.size() &&
369+
someBrokerNotPreferredLeader(partitionInfoList, brokers) && !zkClient
370+
.reassignPartitionsInProgress()) {
371+
LOG.info("{} will reassign partitions of the topic {} in cluster.",
372+
this.getClass().toString(), _topic);
373+
reassignPartitions(zkClient, brokers, _topic, partitionInfoList.size(),
374+
expectedReplicationFactor);
375+
partitionReassigned = true;
376+
}
369377

370-
if (partitionInfoList.size() >= brokers.size() &&
371-
someBrokerNotElectedLeader(partitionInfoList, brokers)) {
372-
if (!partitionReassigned || !zkClient.reassignPartitionsInProgress()) {
373-
LOG.info(
374-
"MultiClusterTopicManagementService will trigger preferred leader election for the topic {} in "
375-
+ "cluster.", _topic
376-
);
377-
triggerPreferredLeaderElection(partitionInfoList, _topic);
378-
_preferredLeaderElectionRequested = false;
379-
} else {
380-
_preferredLeaderElectionRequested = true;
378+
if (partitionInfoList.size() >= brokers.size() &&
379+
someBrokerNotElectedLeader(partitionInfoList, brokers)) {
380+
if (!partitionReassigned || !zkClient.reassignPartitionsInProgress()) {
381+
LOG.info(
382+
"MultiClusterTopicManagementService will trigger preferred leader election for the topic {} in "
383+
+ "cluster.", _topic
384+
);
385+
triggerPreferredLeaderElection(partitionInfoList, _topic);
386+
_preferredLeaderElectionRequested = false;
387+
} else {
388+
_preferredLeaderElectionRequested = true;
389+
}
381390
}
382391
}
383392
}
@@ -387,15 +396,17 @@ void maybeElectLeader() throws Exception {
387396
return;
388397
}
389398

390-
KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSecurityEnabled(), com.linkedin.kmf.common.Utils.ZK_SESSION_TIMEOUT_MS,
391-
com.linkedin.kmf.common.Utils.ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener", null);
392-
393-
if (!zkClient.reassignPartitionsInProgress()) {
394-
List<TopicPartitionInfo> partitionInfoList = _adminClient.describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();
395-
LOG.info("MultiClusterTopicManagementService will trigger requested preferred leader election for the"
396-
+ " topic {} in cluster.", _topic);
397-
triggerPreferredLeaderElection(partitionInfoList, _topic);
398-
_preferredLeaderElectionRequested = false;
399+
try (KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSecurityEnabled(), com.linkedin.kmf.common.Utils.ZK_SESSION_TIMEOUT_MS,
400+
com.linkedin.kmf.common.Utils.ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener", null)) {
401+
if (!zkClient.reassignPartitionsInProgress()) {
402+
List<TopicPartitionInfo> partitionInfoList = _adminClient
403+
.describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();
404+
LOG.info(
405+
"MultiClusterTopicManagementService will trigger requested preferred leader election for the"
406+
+ " topic {} in cluster.", _topic);
407+
triggerPreferredLeaderElection(partitionInfoList, _topic);
408+
_preferredLeaderElectionRequested = false;
409+
}
399410
}
400411
}
401412

0 commit comments

Comments
 (0)