Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
328c44a
add kafka security module
ekawinataa Sep 2, 2024
1f6d09c
aDD SASL class callback config for producer and consumer
ekawinataa Sep 2, 2024
19574f9
Add config map
ekawinataa Sep 2, 2024
28e31de
remove build.gradle
ekawinataa Sep 2, 2024
6c33ff4
Add dynamic props
ekawinataa Sep 3, 2024
8ef873b
Update regex
ekawinataa Sep 3, 2024
c3d4556
rename var
ekawinataa Sep 3, 2024
f19b90b
Remove redundant imports
ekawinataa Sep 3, 2024
b731a38
Rename prefix
ekawinataa Sep 3, 2024
ef3d7e9
Remove unused import
ekawinataa Sep 3, 2024
e643dc6
Update test
ekawinataa Sep 3, 2024
d9cfc8a
Add implementation for sink dynamic props
ekawinataa Sep 5, 2024
fc26377
Add null checking for the additional props
ekawinataa Sep 5, 2024
d2c981d
Added validations on source config
ekawinataa Sep 10, 2024
11c8da5
Add docs and refactor pattern to enum
ekawinataa Sep 10, 2024
fb8695b
chECKSTYLE
ekawinataa Sep 10, 2024
cc2485f
Add readme
ekawinataa Sep 10, 2024
d110153
Make the pattern more specific and embedded to the enum
ekawinataa Sep 11, 2024
33b0cf9
Add more test
ekawinataa Sep 11, 2024
5f3a80c
bump version
ekawinataa Sep 11, 2024
62b4eca
Add unit tests
ekawinataa Sep 13, 2024
6c4e8ae
Use expected annotation
ekawinataa Sep 16, 2024
1063b25
Assert exception message. Add fail mechanism in case of not throwing …
ekawinataa Sep 16, 2024
07f5e53
Use rule for asserting exception
ekawinataa Sep 16, 2024
edf2e4d
Add more test case
ekawinataa Sep 16, 2024
0c2d621
add more unit test
ekawinataa Sep 16, 2024
a262868
feat: Enable multiple underscore parsing
ekawinataa Sep 16, 2024
d31f834
test: Add test on multiple underscore parsing
ekawinataa Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.gotocompany.dagger.core.enumeration;

import java.util.regex.Pattern;

public enum KafkaConnectorTypesMetadata {
SOURCE("SOURCE_KAFKA_CONSUMER_CONFIG_"), SINK("SINK_KAFKA_PRODUCER_CONFIG_");

KafkaConnectorTypesMetadata(String prefixPattern) {
this.prefixPattern = prefixPattern;
}

private final String prefixPattern;

public Pattern getConfigurationPattern() {
return Pattern.compile(String.format("^%s(.*)", prefixPattern), Pattern.CASE_INSENSITIVE);
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.gotocompany.dagger.core.sink;

import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata;
import com.gotocompany.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import com.gotocompany.dagger.core.metrics.telemetry.TelemetryPublisher;
import com.gotocompany.dagger.core.metrics.telemetry.TelemetryTypes;
import com.gotocompany.dagger.core.sink.bigquery.BigQuerySinkBuilder;
import com.gotocompany.dagger.core.sink.influx.ErrorHandler;
import com.gotocompany.dagger.core.sink.influx.InfluxDBFactoryWrapper;
import com.gotocompany.dagger.core.sink.influx.InfluxDBSink;
import com.gotocompany.dagger.core.utils.KafkaConfigUtil;
import com.gotocompany.dagger.core.utils.Constants;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
Expand All @@ -25,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

/**
Expand Down Expand Up @@ -109,7 +113,10 @@ protected Properties getProducerProperties(Configuration configuration) {
String lingerMs = configuration.getString(Constants.SINK_KAFKA_LINGER_MS_KEY, Constants.SINK_KAFKA_LINGER_MS_DEFAULT);
validateLingerMs(lingerMs);
kafkaProducerConfigs.setProperty(Constants.SINK_KAFKA_LINGER_MS_CONFIG_KEY, lingerMs);

Properties dynamicProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SINK, Optional.ofNullable(configuration.getParam())
.map(ParameterTool::getProperties)
.orElseGet(Properties::new));
kafkaProducerConfigs.putAll(dynamicProperties);
return kafkaProducerConfigs;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.gotocompany.dagger.core.source.config;

import com.google.gson.annotations.JsonAdapter;
import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata;
import com.gotocompany.dagger.core.source.config.adapter.DaggerKafkaConsumerAdditionalConfigurationsAdaptor;
import com.gotocompany.dagger.core.source.config.adapter.DaggerSASLMechanismAdaptor;
import com.gotocompany.dagger.core.source.config.adapter.DaggerSSLKeyStoreFileTypeAdaptor;
import com.gotocompany.dagger.core.source.config.adapter.DaggerSSLProtocolAdaptor;
Expand All @@ -14,6 +16,7 @@
import com.gotocompany.dagger.core.source.config.models.SourceDetails;
import com.gotocompany.dagger.core.source.config.models.SourceName;
import com.gotocompany.dagger.core.source.config.models.TimeRangePool;
import com.gotocompany.dagger.core.utils.KafkaConfigUtil;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import com.google.gson.Gson;
Expand All @@ -26,6 +29,7 @@

import java.io.StringReader;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand All @@ -34,7 +38,6 @@
import static com.gotocompany.dagger.common.core.Constants.STREAM_INPUT_SCHEMA_PROTO_CLASS;
import static com.gotocompany.dagger.common.core.Constants.STREAM_INPUT_SCHEMA_TABLE;
import static com.gotocompany.dagger.core.utils.Constants.*;
import static com.gotocompany.dagger.core.utils.Constants.STREAM_SOURCE_PARQUET_FILE_DATE_RANGE_KEY;

public class StreamConfig {
private static final Gson GSON = new GsonBuilder()
Expand Down Expand Up @@ -154,6 +157,11 @@ public class StreamConfig {
@Getter
private SourceParquetSchemaMatchStrategy parquetSchemaMatchStrategy;

@SerializedName(SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS)
@JsonAdapter(value = DaggerKafkaConsumerAdditionalConfigurationsAdaptor.class)
@Getter
private Map<String, String> additionalConsumerConfigurations;

@SerializedName(STREAM_SOURCE_PARQUET_FILE_DATE_RANGE_KEY)
@JsonAdapter(FileDateRangeAdaptor.class)
@Getter
Expand Down Expand Up @@ -208,7 +216,7 @@ public Properties getKafkaProps(Configuration configuration) {
.stream()
.filter(e -> e.getKey().toLowerCase().startsWith(KAFKA_PREFIX))
.forEach(e -> kafkaProps.setProperty(parseVarName(e.getKey(), KAFKA_PREFIX), e.getValue()));
setAdditionalConfigs(kafkaProps, configuration);
setAdditionalKafkaConsumerConfigs(kafkaProps, configuration);
return kafkaProps;
}

Expand All @@ -217,10 +225,15 @@ private String parseVarName(String varName, String kafkaPrefix) {
return String.join(".", names);
}

private void setAdditionalConfigs(Properties kafkaProps, Configuration configuration) {
private void setAdditionalKafkaConsumerConfigs(Properties kafkaProps, Configuration configuration) {
if (configuration.getBoolean(SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_KEY, SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_DEFAULT)) {
kafkaProps.setProperty(SOURCE_KAFKA_MAX_PARTITION_FETCH_BYTES_KEY, SOURCE_KAFKA_MAX_PARTITION_FETCH_BYTES_DEFAULT);
}
if (Objects.nonNull(this.additionalConsumerConfigurations)) {
Properties additionalKafkaProperties = new Properties();
additionalKafkaProperties.putAll(this.additionalConsumerConfigurations);
kafkaProps.putAll(KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, additionalKafkaProperties));
}
}

public Pattern getTopicPattern() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.gotocompany.dagger.core.source.config.adapter;

import com.google.gson.Gson;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class DaggerKafkaConsumerAdditionalConfigurationsAdaptor extends TypeAdapter<Map<String, String>> {

@Override
public void write(JsonWriter jsonWriter, Map<String, String> stringStringMap) throws IOException {
Gson gson = new Gson();
jsonWriter.jsonValue(gson.toJson(stringStringMap));
}

@Override
public Map<String, String> read(JsonReader jsonReader) throws IOException {
Gson gson = new Gson();
Map<String, String> map = gson.fromJson(jsonReader, Map.class);
List<String> invalidProps = map.keySet().stream()
.filter(key -> !KafkaConnectorTypesMetadata.SOURCE.getConfigurationPattern()
.matcher(key)
.matches())
.collect(Collectors.toList());
if (!invalidProps.isEmpty()) {
throw new IllegalArgumentException("Invalid additional kafka consumer configuration properties found: " + invalidProps);
}
return map;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class Constants {
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG";

public static final String SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS = "SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.gotocompany.dagger.core.utils;

import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata;

import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;

public class KafkaConfigUtil {

public static Properties parseKafkaConfiguration(KafkaConnectorTypesMetadata kafkaConnectorTypesMetadata, Properties properties) {
Properties kafkaProperties = new Properties();
Set<Object> configKeys = properties.keySet();

for (Object key : configKeys) {
Matcher matcher = kafkaConnectorTypesMetadata.getConfigurationPattern()
.matcher(key.toString());
if (matcher.find()) {
String kafkaConfigKey = matcher.group(1)
.toLowerCase()
.replaceAll("_", ".");
kafkaProperties.setProperty(kafkaConfigKey, properties.get(key).toString());
}
}
return kafkaProperties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

public class SinkOrchestratorTest {

private static final String SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS = "SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS";
private static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE = "com.gotocompany.dagger.core.utils.SinkKafkaConfigUtil";

private Configuration configuration;
private StencilClientOrchestrator stencilClientOrchestrator;
private SinkOrchestrator sinkOrchestrator;
Expand Down Expand Up @@ -71,14 +74,19 @@ public void shouldGiveInfluxWhenConfiguredToUseNothing() throws Exception {

@Test
public void shouldSetKafkaProducerConfigurations() throws Exception {
Map<String, String> additionalParameters = new HashMap<>();
additionalParameters.put(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE);
when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668");
when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true);
when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000");
when(configuration.getParam()).thenReturn(ParameterTool.fromMap(additionalParameters));
when(configuration.getString(eq(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS), anyString())).thenReturn(SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE);
Properties producerProperties = sinkOrchestrator.getProducerProperties(configuration);

assertEquals(producerProperties.getProperty("compression.type"), "snappy");
assertEquals(producerProperties.getProperty("max.request.size"), "20971520");
assertEquals(producerProperties.getProperty("linger.ms"), "1000");
assertEquals(producerProperties.getProperty("sasl.login.callback.handler.class"), SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE);
}

@Test
Expand Down
Loading