Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,21 @@ protected Properties getProducerProperties(Configuration configuration) {
kafkaProducerConfigs.setProperty(Constants.SINK_KAFKA_COMPRESSION_TYPE_KEY, Constants.SINK_KAFKA_COMPRESSION_TYPE_DEFAULT);
kafkaProducerConfigs.setProperty(Constants.SINK_KAFKA_MAX_REQUEST_SIZE_KEY, Constants.SINK_KAFKA_MAX_REQUEST_SIZE_DEFAULT);
}
String lingerMs = configuration.getString(Constants.SINK_KAFKA_LINGER_MS_KEY, Constants.SINK_KAFKA_LINGER_MS_DEFAULT);
validateLingerMs(lingerMs);
kafkaProducerConfigs.setProperty(Constants.SINK_KAFKA_LINGER_MS_CONFIG_KEY, lingerMs);

return kafkaProducerConfigs;
}

private void validateLingerMs(String lingerMs) {
try {
Integer.parseInt(lingerMs);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Provided value for Linger Ms : " + lingerMs+ " is not a valid integer , Error: " + e.getMessage() );
}
}

@Override
public Map<String, List<String>> getTelemetry() {
return metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ public class Constants {
public static final String SINK_KAFKA_JSON_SCHEMA_KEY = "SINK_KAFKA_JSON_SCHEMA";
public static final String SINK_KAFKA_DATA_TYPE = "SINK_KAFKA_DATA_TYPE";
public static final String SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY = "SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE";
public static final String SINK_KAFKA_LINGER_MS_KEY = "SINK_KAFKA_LINGER_MS";
public static final boolean SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_DEFAULT = false;
public static final String SINK_KAFKA_COMPRESSION_TYPE_KEY = "compression.type";
public static final String SINK_KAFKA_LINGER_MS_CONFIG_KEY = "linger.ms";
public static final String SINK_KAFKA_COMPRESSION_TYPE_DEFAULT = "snappy";
public static final String SINK_KAFKA_MAX_REQUEST_SIZE_KEY = "max.request.size";
public static final String SINK_KAFKA_MAX_REQUEST_SIZE_DEFAULT = "20971520";
public static final String SINK_KAFKA_LINGER_MS_DEFAULT = "0";

public static final String ES_TYPE = "ES";
public static final String HTTP_TYPE = "HTTP";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.gotocompany.dagger.common.configuration.Configuration;
import com.gotocompany.dagger.common.core.StencilClientOrchestrator;
import com.gotocompany.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand All @@ -22,6 +23,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

Expand Down Expand Up @@ -74,23 +76,21 @@ public void shouldGiveInfluxWhenConfiguredToUseNothing() throws Exception {
public void shouldSetKafkaProducerConfigurations() throws Exception {
when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668");
when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true);
when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000");
Properties producerProperties = sinkOrchestrator.getProducerProperties(configuration);

assertEquals(producerProperties.getProperty("compression.type"), "snappy");
assertEquals(producerProperties.getProperty("max.request.size"), "20971520");
assertEquals(producerProperties.getProperty("linger.ms"), "1000");
}

@Test
public void shouldGiveKafkaProducerWhenConfiguredToUseKafkaSink() throws Exception {
when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("kafka");
when(configuration.getString(eq("SINK_KAFKA_PROTO_MESSAGE"), anyString())).thenReturn("output_proto");
when(configuration.getString(eq("SINK_KAFKA_BROKERS"), anyString())).thenReturn("output_broker:2667");
when(configuration.getString(eq("SINK_KAFKA_TOPIC"), anyString())).thenReturn("output_topic");
when(configuration.getString(eq("SINK_KAFKA_DATA_TYPE"), anyString())).thenReturn("PROTO");

Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter);

assertThat(sinkFunction, instanceOf(KafkaSink.class));
public void shouldThrowIllegalArgumentExceptionForInvalidLingerMs() throws Exception {
when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668");
when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true);
when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("abc");
Assert.assertThrows("Expected Illegal ArgumentException", IllegalArgumentException.class,
() ->sinkOrchestrator.getProducerProperties(configuration));
}

@Test
Expand Down
8 changes: 8 additions & 0 deletions docs/docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,14 @@ Enable/Disable to produce large messages to Kafka. by default, it's configuratio
* Type: `optional`
* Default value: `false`

#### `SINK_KAFKA_LINGER_MS`

Defines the max interval in milliseconds, the producer will wait for the sink/producer buffer to fill.

* Example value: `1000`
* Type: `optional`
* Default value: `0`

### BigQuery Sink

A BigQuery sink Dagger (`SINK_TYPE=bigquery`) requires following env variables to be set along with the Generic Dagger env variables, as well as the
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.2
0.9.3