Skip to content

Commit 0693dc4

Browse files
author
Andrew Choi
authored
Kafka 2.4 KIP-455 - Update TopicFactory Interface and DefaultTopicFactory Implementation to use AdminClient
Kafka 2.4 KIP-455 - Update TopicFactory Interface and DefaultTopicFactory Implementation to use AdminClient Signed-off-by: Andrew Choi <[email protected]>
1 parent f9194e8 commit 0693dc4

File tree

4 files changed

+6
-6
lines changed

4 files changed

+6
-6
lines changed

src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ private void createDeleteClusterTopic() {
163163
try {
164164
int brokerCount = _adminClient.describeCluster().nodes().get().size();
165165

166-
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_zkConnect);
166+
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient);
167167
Set<BrokerMetadata> brokers = new HashSet<>();
168168
for (Node broker : _adminClient.describeCluster().nodes().get()) {
169169
BrokerMetadata brokerMetadata = new BrokerMetadata(broker.id(), null);

src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ void maybeAddPartitions(int minPartitionNum) throws ExecutionException, Interrup
313313
if (partitionNum < minPartitionNum) {
314314
LOGGER.info("{} will increase partition of the topic {} in the cluster from {}" + " to {}.",
315315
this.getClass().toString(), _topic, partitionNum, minPartitionNum);
316-
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_zkConnect);
316+
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient);
317317
Set<BrokerMetadata> brokers = new HashSet<>();
318318
for (Node broker : _adminClient.describeCluster().nodes().get()) {
319319
BrokerMetadata brokerMetadata = new BrokerMetadata(broker.id(), null);
@@ -409,7 +409,7 @@ int numPartitions() throws InterruptedException, ExecutionException {
409409

410410
private Set<Node> getAvailableBrokers() throws ExecutionException, InterruptedException {
411411
Set<Node> brokers = new HashSet<>(_adminClient.describeCluster().nodes().get());
412-
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_zkConnect);
412+
Set<Integer> blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient);
413413
brokers.removeIf(broker -> blackListedBrokers.contains(broker.id()));
414414
return brokers;
415415
}

src/main/java/com/linkedin/xinfra/monitor/topicfactory/DefaultTopicFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public int createTopicIfNotExist(String topic, short replicationFactor, double p
3232
}
3333

3434
@Override
35-
public Set<Integer> getBlackListedBrokers(String zkUrl) {
35+
public Set<Integer> getBlackListedBrokers(AdminClient adminClient) {
3636
return Collections.emptySet();
3737
}
3838
}

src/main/java/com/linkedin/xinfra/monitor/topicfactory/TopicFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ int createTopicIfNotExist(String topic, short replicationFactor, double partitio
3939
throws ExecutionException, InterruptedException;
4040

4141
/**
42-
* @param zkUrl zookeeper connection url
42+
* @param adminClient AdminClient object
4343
* @return A set of brokers that don't take new partitions or reassigned partitions for topics.
4444
*/
45-
Set<Integer> getBlackListedBrokers(String zkUrl);
45+
Set<Integer> getBlackListedBrokers(AdminClient adminClient);
4646

4747
}

0 commit comments

Comments
 (0)