5353 * This service periodically checks and rebalances the monitor topics across a pipeline of Kafka clusters so that
5454 * leadership of the partitions of the monitor topic in each cluster is distributed evenly across brokers in the cluster.
5555 *
56- * It also makes sure that the number of partitions of the monitor topics is same across the monitored clusters.
56+ * More specifically, this service may do some or all of the following tasks depending on the config:
57+ *
58+ * - Create the monitor topic using the user-specified replication factor and partition number
59+ * - Increase partition number of the monitor topic if either partitionsToBrokersRatio or minPartitionNum is not satisfied
60+ * - Increase replication factor of the monitor topic if the user-specified replicationFactor is not satisfied
61+ * - Reassign partition across brokers to make sure each broker acts as preferred leader of at least one partition of the monitor topic
62+ * - Trigger preferred leader election to make sure each broker acts as leader of at least one partition of the monitor topic.
63+ * - Make sure the number of partitions of the monitor topic is same across all monitored custers.
64+ *
5765 */
5866public class MultiClusterTopicManagementService implements Service {
5967 private static final Logger LOG = LoggerFactory .getLogger (MultiClusterTopicManagementService .class );
@@ -134,6 +142,13 @@ public void run() {
134142 helper .maybeCreateTopic ();
135143 }
136144
145+ /*
146+ * The partition number of the monitor topics should be the minimum partition number that satisifies the following conditions:
147+ * - partition number of the monitor topics across all monitored clusters should be the same
148+ * - partitionNum / brokerNum >= user-configured partitionsToBrokersRatio.
149+ * - partitionNum >= user-configured minPartitionNum
150+ */
151+
137152 int minPartitionNum = 0 ;
138153 for (TopicManagementHelper helper : _topicManagementByCluster .values ()) {
139154 minPartitionNum = Math .max (minPartitionNum , helper .minPartitionNum ());
@@ -164,7 +179,8 @@ static class TopicManagementHelper {
164179 private final String _topic ;
165180 private final String _zkConnect ;
166181 private final int _replicationFactor ;
167- private final double _partitionsToBrokerRatio ;
182+ private final double _minPartitionsToBrokersRatio ;
183+ private final int _minPartitionNum ;
168184 private final TopicFactory _topicFactory ;
169185
170186 TopicManagementHelper (Map <String , Object > props ) throws Exception {
@@ -173,7 +189,8 @@ static class TopicManagementHelper {
173189 _topic = config .getString (TopicManagementServiceConfig .TOPIC_CONFIG );
174190 _zkConnect = config .getString (TopicManagementServiceConfig .ZOOKEEPER_CONNECT_CONFIG );
175191 _replicationFactor = config .getInt (TopicManagementServiceConfig .TOPIC_REPLICATION_FACTOR_CONFIG );
176- _partitionsToBrokerRatio = config .getDouble (TopicManagementServiceConfig .PARTITIONS_TO_BROKERS_RATIO_CONFIG );
192+ _minPartitionsToBrokersRatio = config .getDouble (TopicManagementServiceConfig .PARTITIONS_TO_BROKERS_RATIO_CONFIG );
193+ _minPartitionNum = config .getInt (TopicManagementServiceConfig .MIN_PARTITION_NUM_CONFIG );
177194 String topicFactoryClassName = config .getString (TopicManagementServiceConfig .TOPIC_FACTORY_CLASS_CONFIG );
178195 Map topicFactoryConfig = props .containsKey (TopicManagementServiceConfig .TOPIC_FACTORY_PROPS_CONFIG ) ?
179196 (Map ) props .get (TopicManagementServiceConfig .TOPIC_FACTORY_PROPS_CONFIG ) : new HashMap ();
@@ -182,13 +199,13 @@ static class TopicManagementHelper {
182199
183200 void maybeCreateTopic () throws Exception {
184201 if (_topicCreationEnabled ) {
185- _topicFactory .createTopicIfNotExist (_zkConnect , _topic , _replicationFactor , _partitionsToBrokerRatio , new Properties ());
202+ _topicFactory .createTopicIfNotExist (_zkConnect , _topic , _replicationFactor , _minPartitionsToBrokersRatio , new Properties ());
186203 }
187204 }
188205
189206 int minPartitionNum () {
190207 int brokerCount = Utils .getBrokerCount (_zkConnect );
191- return ( int ) Math .ceil (_partitionsToBrokerRatio * brokerCount );
208+ return Math . max (( int ) Math .ceil (_minPartitionsToBrokersRatio * brokerCount ), _minPartitionNum );
192209 }
193210
194211 void maybeAddPartitions (int minPartitionNum ) {
0 commit comments