Skip to content

Commit c1455c0

Browse files
bysphbysph
andauthored
fix: support to set kafka version (resmoio#146)
Co-authored-by: sph <[email protected]>
1 parent ff7498c commit c1455c0

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

pkg/sinks/kafka.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type KafkaConfig struct {
1919
Layout map[string]interface{} `yaml:"layout"`
2020
ClientId string `yaml:"clientId"`
2121
CompressionCodec string `yaml:"compressionCodec" default:"none"`
22+
Version string `yaml:"version"`
2223
TLS struct {
2324
Enable bool `yaml:"enable"`
2425
CaFile string `yaml:"caFile"`
@@ -126,7 +127,15 @@ func (k *KafkaSink) Close() {
126127
func createSaramaProducer(cfg *KafkaConfig) (sarama.SyncProducer, error) {
127128
// Default Sarama config
128129
saramaConfig := sarama.NewConfig()
129-
saramaConfig.Version = sarama.MaxVersion
130+
if cfg.Version != "" {
131+
version, err := sarama.ParseKafkaVersion(cfg.Version)
132+
if err != nil {
133+
return nil, err
134+
}
135+
saramaConfig.Version = version
136+
} else {
137+
saramaConfig.Version = sarama.MaxVersion
138+
}
130139
saramaConfig.Metadata.Full = true
131140
saramaConfig.ClientID = cfg.ClientId
132141

0 commit comments

Comments
 (0)