1414import com .linkedin .kmf .producer .KMBaseProducer ;
1515import com .linkedin .kmf .producer .BaseProducerRecord ;
1616import com .linkedin .kmf .producer .NewProducer ;
17+ import java .util .concurrent .ConcurrentHashMap ;
18+ import java .util .concurrent .ConcurrentMap ;
19+ import java .util .concurrent .atomic .AtomicInteger ;
1720import org .apache .kafka .clients .producer .ProducerConfig ;
1821import org .apache .kafka .clients .producer .RecordMetadata ;
1922import org .apache .kafka .common .MetricName ;
3942import java .util .concurrent .atomic .AtomicBoolean ;
4043import java .util .concurrent .atomic .AtomicLong ;
4144
45+
46+ /**
47+ * This sets up the producers used by Kafka Monitoring and some of the reported metrics.
48+ */
4249public class ProduceService implements Service {
4350 private static final Logger LOG = LoggerFactory .getLogger (ProduceService .class );
4451 private static final String METRIC_GROUP_NAME = "produce-service" ;
@@ -49,31 +56,57 @@ public class ProduceService implements Service {
4956 private final ScheduledExecutorService _executor ;
5057 private final int _produceDelayMs ;
5158 private final boolean _sync ;
52- private final Map <Integer , AtomicLong > _nextIndexPerPartition ;
53- private final int _partitionNum ;
59+ /** This can be updated while running when new partitions are added to the monitored topic. */
60+ private final ConcurrentMap <Integer , AtomicLong > _nextIndexPerPartition ;
61+ /** This is the last thing that should be updated after adding new partitions. */
62+ private final AtomicInteger _partitionNum ;
5463 private final int _recordSize ;
5564 private final String _topic ;
5665 private final String _producerId ;
5766 private final AtomicBoolean _running ;
67+ private final String _zkConnect ;
68+ private final String _brokerList ;
69+ private final double _rebalanceThreshold ;
5870
5971 public ProduceService (Map <String , Object > props , String name ) throws Exception {
6072 _name = name ;
6173 Map producerPropsOverride = (Map ) props .get (ProduceServiceConfig .PRODUCER_PROPS_CONFIG );
6274 ProduceServiceConfig config = new ProduceServiceConfig (props );
63- String zkConnect = config .getString (ProduceServiceConfig .ZOOKEEPER_CONNECT_CONFIG );
64- String brokerList = config .getString (ProduceServiceConfig .BOOTSTRAP_SERVERS_CONFIG );
75+ _zkConnect = config .getString (ProduceServiceConfig .ZOOKEEPER_CONNECT_CONFIG );
76+ _brokerList = config .getString (ProduceServiceConfig .BOOTSTRAP_SERVERS_CONFIG );
77+ _rebalanceThreshold = config .getDouble (ProduceServiceConfig .REBALANCE_THRESHOLD_CONFIG );
6578 String producerClass = config .getString (ProduceServiceConfig .PRODUCER_CLASS_CONFIG );
6679 int threadsNum = config .getInt (ProduceServiceConfig .PRODUCE_THREAD_NUM_CONFIG );
6780 _topic = config .getString (ProduceServiceConfig .TOPIC_CONFIG );
6881 _producerId = config .getString (ProduceServiceConfig .PRODUCER_ID_CONFIG );
6982 _produceDelayMs = config .getInt (ProduceServiceConfig .PRODUCE_RECORD_DELAY_MS_CONFIG );
7083 _recordSize = config .getInt (ProduceServiceConfig .PRODUCE_RECORD_SIZE_BYTE_CONFIG );
7184 _sync = config .getBoolean (ProduceServiceConfig .PRODUCE_SYNC_CONFIG );
72- _partitionNum = Utils .getPartitionNumForTopic (zkConnect , _topic );
85+ _partitionNum = new AtomicInteger (0 );
86+
87+ if (_rebalanceThreshold < 1 ) {
88+ throw new IllegalArgumentException ("Rebalance threshold must be greater than one but is set to " + _rebalanceThreshold + "." );
89+ }
90+
91+ int existingPartitionCount = Utils .getPartitionNumForTopic (_zkConnect , _topic );
7392
74- if (_partitionNum <= 0 )
75- throw new RuntimeException ("Can not find valid partition number for topic " + _topic +
76- ". Please verify that the topic has been created. Ideally the partition number should be a multiple of number of brokers in the cluster." );
93+ if (existingPartitionCount <= 0 ) {
94+ if (config .getBoolean (ProduceServiceConfig .AUTO_TOPIC_CREATION_ENABLED_CONFIG )) {
95+ int autoTopicReplicationFactor = config .getInt (ProduceServiceConfig .AUTO_TOPIC_REPLICATION_FACTOR_CONFIG );
96+ int autoTopicPartitionFactor = config .getInt (ProduceServiceConfig .REBALANCE_PARTITION_MULTIPLE_CONFIG );
97+ _partitionNum .set (
98+ Utils .createMonitoringTopicIfNotExists (_zkConnect , _topic , autoTopicReplicationFactor ,
99+ autoTopicPartitionFactor ));
100+ } else {
101+ throw new RuntimeException ("Can not find valid partition number for topic " + _topic +
102+ ". Please verify that the topic \" " + _topic + "\" has been created. Ideally the partition number should be" +
103+ " a multiple of number" +
104+ " of brokers in the cluster. Or else configure " + ProduceServiceConfig .AUTO_TOPIC_CREATION_ENABLED_CONFIG +
105+ " to be true." );
106+ }
107+ } else {
108+ _partitionNum .set (existingPartitionCount );
109+ }
77110
78111 Properties producerProps = new Properties ();
79112
@@ -92,7 +125,7 @@ public ProduceService(Map<String, Object> props, String name) throws Exception {
92125
93126 // Assign config specified for ProduceService.
94127 producerProps .put (ProducerConfig .CLIENT_ID_CONFIG , _producerId );
95- producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , brokerList );
128+ producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , _brokerList );
96129
97130 // Assign config specified for producer. This has the highest priority.
98131 if (producerPropsOverride != null )
@@ -101,7 +134,7 @@ public ProduceService(Map<String, Object> props, String name) throws Exception {
101134 _producer = (KMBaseProducer ) Class .forName (producerClass ).getConstructor (Properties .class ).newInstance (producerProps );
102135
103136 _running = new AtomicBoolean (false );
104- _nextIndexPerPartition = new HashMap <>();
137+ _nextIndexPerPartition = new ConcurrentHashMap <>();
105138 _executor = Executors .newScheduledThreadPool (threadsNum );
106139
107140 MetricConfig metricConfig = new MetricConfig ().samples (60 ).timeWindow (1000 , TimeUnit .MILLISECONDS );
@@ -111,20 +144,26 @@ public ProduceService(Map<String, Object> props, String name) throws Exception {
111144 Map <String , String > tags = new HashMap <>();
112145 tags .put ("name" , _name );
113146 _sensors = new ProduceMetrics (metrics , tags );
147+
114148 }
115149
116150 @ Override
117151 public void start () {
118152 if (_running .compareAndSet (false , true )) {
119- for (int partition = 0 ; partition < _partitionNum ; partition ++) {
120- _nextIndexPerPartition .put (partition , new AtomicLong (0 ));
121- _executor .scheduleWithFixedDelay (new ProduceRunnable (partition ),
122- _produceDelayMs , _produceDelayMs , TimeUnit .MILLISECONDS );
153+ int partitionNum = _partitionNum .get ();
154+ for (int partition = 0 ; partition < partitionNum ; partition ++) {
155+ scheduleProduceRunnable (partition );
123156 }
124157 LOG .info (_name + "/ProduceService started" );
125158 }
126159 }
127160
161+ private void scheduleProduceRunnable (int partition ) {
162+ _nextIndexPerPartition .put (partition , new AtomicLong (0 ));
163+ _executor .scheduleWithFixedDelay (new ProduceRunnable (partition ),
164+ _produceDelayMs , _produceDelayMs , TimeUnit .MILLISECONDS );
165+ }
166+
128167 @ Override
129168 public void stop () {
130169 if (_running .compareAndSet (true , false )) {
@@ -153,24 +192,20 @@ private class ProduceMetrics {
153192 public final Metrics metrics ;
154193 private final Sensor _recordsProduced ;
155194 private final Sensor _produceError ;
156- private final Map <Integer , Sensor > _recordsProducedPerPartition ;
157- private final Map <Integer , Sensor > _produceErrorPerPartition ;
195+ private final ConcurrentMap <Integer , Sensor > _recordsProducedPerPartition ;
196+ private final ConcurrentMap <Integer , Sensor > _produceErrorPerPartition ;
197+ private final Map <String , String > _tags ;
158198
159199 public ProduceMetrics (Metrics metrics , final Map <String , String > tags ) {
160200 this .metrics = metrics ;
201+ this ._tags = tags ;
161202
162- _recordsProducedPerPartition = new HashMap <>();
163- for (int partition = 0 ; partition < _partitionNum ; partition ++) {
164- Sensor sensor = metrics .sensor ("records-produced-partition-" + partition );
165- sensor .add (new MetricName ("records-produced-rate-partition-" + partition , METRIC_GROUP_NAME , "The average number of records per second that are produced to this partition" , tags ), new Rate ());
166- _recordsProducedPerPartition .put (partition , sensor );
167- }
203+ _recordsProducedPerPartition = new ConcurrentHashMap <>();
204+ _produceErrorPerPartition = new ConcurrentHashMap <>();
168205
169- _produceErrorPerPartition = new HashMap <>();
170- for (int partition = 0 ; partition < _partitionNum ; partition ++) {
171- Sensor sensor = metrics .sensor ("produce-error-partition-" + partition );
172- sensor .add (new MetricName ("produce-error-rate-partition-" + partition , METRIC_GROUP_NAME , "The average number of errors per second when producing to this partition" , tags ), new Rate ());
173- _produceErrorPerPartition .put (partition , sensor );
206+ int partitionNum = _partitionNum .get ();
207+ for (int partition = 0 ; partition < partitionNum ; partition ++) {
208+ addPartitionSensors (partition );
174209 }
175210
176211 _recordsProduced = metrics .sensor ("records-produced" );
@@ -186,11 +221,12 @@ public ProduceMetrics(Metrics metrics, final Map<String, String> tags) {
186221 @ Override
187222 public double measure (MetricConfig config , long now ) {
188223 double availabilitySum = 0.0 ;
189- for (int partition = 0 ; partition < _partitionNum ; partition ++) {
224+ int partitionNum = _partitionNum .get ();
225+ for (int partition = 0 ; partition < partitionNum ; partition ++) {
190226 double recordsProduced = _sensors .metrics .metrics ().get (new MetricName ("records-produced-rate-partition-" + partition , METRIC_GROUP_NAME , tags )).value ();
191227 double produceError = _sensors .metrics .metrics ().get (new MetricName ("produce-error-rate-partition-" + partition , METRIC_GROUP_NAME , tags )).value ();
192228 // If there is no error, error rate sensor may expire and the value may be NaN. Treat NaN as 0 for error rate.
193- if (new Double (produceError ). isNaN ( )) {
229+ if (Double . isNaN (produceError ) || Double . isInfinite ( produceError )) {
194230 produceError = 0 ;
195231 }
196232 // If there is either succeeded or failed produce to a partition, consider its availability as 0.
@@ -199,11 +235,23 @@ public double measure(MetricConfig config, long now) {
199235 }
200236 }
201237 // Assign equal weight to per-partition availability when calculating overall availability
202- return availabilitySum / _partitionNum ;
238+ return availabilitySum / partitionNum ;
203239 }
204240 }
205241 );
206242 }
243+
244+ void addPartitionSensors (int partition ) {
245+ Sensor recordsProducedSensor = metrics .sensor ("records-produced-partition-" + partition );
246+ recordsProducedSensor .add (new MetricName ("records-produced-rate-partition-" + partition , METRIC_GROUP_NAME ,
247+ "The average number of records per second that are produced to this partition" , _tags ), new Rate ());
248+ _recordsProducedPerPartition .put (partition , recordsProducedSensor );
249+
250+ Sensor errorsSensor = metrics .sensor ("produce-error-partition-" + partition );
251+ errorsSensor .add (new MetricName ("produce-error-rate-partition-" + partition , METRIC_GROUP_NAME ,
252+ "The average number of errors per second when producing to this partition" , _tags ), new Rate ());
253+ _produceErrorPerPartition .put (partition , errorsSensor );
254+ }
207255 }
208256
209257 private class ProduceRunnable implements Runnable {
0 commit comments