1616import com .linkedin .kmf .consumer .BaseConsumerRecord ;
1717import com .linkedin .kmf .consumer .NewConsumer ;
1818import com .linkedin .kmf .consumer .OldConsumer ;
19+ import com .linkedin .kmf .services .configs .ProduceServiceConfig ;
1920import org .apache .avro .generic .GenericRecord ;
2021import org .apache .kafka .clients .consumer .ConsumerConfig ;
2122import org .apache .kafka .common .MetricName ;
23+ import org .apache .kafka .common .config .ConfigException ;
2224import org .apache .kafka .common .metrics .JmxReporter ;
2325import org .apache .kafka .common .metrics .Measurable ;
2426import org .apache .kafka .common .metrics .MetricConfig ;
4749public class ConsumeService implements Service {
4850 private static final Logger LOG = LoggerFactory .getLogger (ConsumeService .class );
4951 private static final String METRIC_GROUP_NAME = "consume-service" ;
52+ private static final String [] NONOVERRIDABLE_PROPERTIES = new String []{ ConsumeServiceConfig .BOOTSTRAP_SERVERS_CONFIG ,
53+ ConsumeServiceConfig .ZOOKEEPER_CONNECT_CONFIG };
5054
5155 private final String _name ;
5256 private final ConsumeMetrics _sensors ;
@@ -59,7 +63,8 @@ public class ConsumeService implements Service {
5963
6064 public ConsumeService (Map <String , Object > props , String name ) throws Exception {
6165 _name = name ;
62- Map consumerPropsOverride = (Map ) props .get (ConsumeServiceConfig .CONSUMER_PROPS_CONFIG );
66+ Map consumerPropsOverride = props .containsKey (ConsumeServiceConfig .CONSUMER_PROPS_CONFIG ) ?
67+ (Map ) props .get (ConsumeServiceConfig .CONSUMER_PROPS_CONFIG ) : new HashMap <>();
6368 ConsumeServiceConfig config = new ConsumeServiceConfig (props );
6469 String topic = config .getString (ConsumeServiceConfig .TOPIC_CONFIG );
6570 String zkConnect = config .getString (ConsumeServiceConfig .ZOOKEEPER_CONNECT_CONFIG );
@@ -70,6 +75,12 @@ public ConsumeService(Map<String, Object> props, String name) throws Exception {
7075 _latencyPercentileGranularityMs = config .getInt (ConsumeServiceConfig .LATENCY_PERCENTILE_GRANULARITY_MS_CONFIG );
7176 _running = new AtomicBoolean (false );
7277
78+ for (String property : NONOVERRIDABLE_PROPERTIES ) {
79+ if (consumerPropsOverride .containsKey (property )) {
80+ throw new ConfigException ("Override must not contain " + property + " config." );
81+ }
82+ }
83+
7384 Properties consumerProps = new Properties ();
7485
7586 // Assign default config. This has the lowest priority.
@@ -94,8 +105,7 @@ public ConsumeService(Map<String, Object> props, String name) throws Exception {
94105 consumerProps .put ("zookeeper.connect" , zkConnect );
95106
96107 // Assign config specified for consumer. This has the highest priority.
97- if (consumerPropsOverride != null )
98- consumerProps .putAll (consumerPropsOverride );
108+ consumerProps .putAll (consumerPropsOverride );
99109
100110 _consumer = (KMBaseConsumer ) Class .forName (consumerClassName ).getConstructor (String .class , Properties .class ).newInstance (topic , consumerProps );
101111
0 commit comments