-
Notifications
You must be signed in to change notification settings - Fork 2
Add support for kafka producer config linger.ms in kafka sink #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lavkesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Description: Add three constants to support linger ms. The default value set is 5 which is the default for the kafka producer client.
Description: Integrate configuration support for linger ms in the kafka producer initialisation.
Description: The default of 5 ms is for librd kafka clients. Java client has default as 0. Hence, updating it.
Description: Import reference now use the static constant class.
09c259f to
e1b0079
Compare
dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java
Show resolved
Hide resolved
dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java
Show resolved
Hide resolved
docs/docs/reference/configuration.md
Outdated
|
|
||
| #### `SINK_KAFKA_LINGER_MS` | ||
|
|
||
| Defines the max interval the sink should we wait for the sink/producer buffer to fill |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just add "in milliseconds" after "max interval"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
sumitaich1998
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, shouldn't we also add the batch.size config for the kafka producer along with linger.ms?
The default batch.size works well for most cases. Reason for custom batch.size config would be ? |
just so that we can tune it later for a particular dagger if we want to, we can just keep the default value in odin, we wont expose it in UI |
That can be out of the scope of the PR and can be picked independently. The scope of this PR is to expose this certain config and this helps with efficient produce of messages. There are a multiple other producer configs present as well which are currently not exposed. And can be optionally exposed in a future PR. WDYT ? |
yeah ok should be fine |
dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java
Outdated
Show resolved
Hide resolved
sumitaich1998
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
linger.ms defines the maximum interval for buffering messages in the client before being produced to kafka. This improves efficiency of the producer and also greatly reduces the load on kafka.
The interval and buffering also ensures better compression when enabled.
More Reading: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#performance
Open Questions:
Todo: