Skip to content

Commit e884e3f

Browse files
vivekkr12pivovarit
authored andcommitted
update spring-kafka project with support for multiple partitions and JSON serializer (eugenp#1472)
1 parent b01c2c5 commit e884e3f

File tree

7 files changed

+227
-15
lines changed

7 files changed

+227
-15
lines changed

spring-kafka/README.md

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,28 @@
22

33
This is a simple Spring Boot app to demonstrate sending and receiving of messages in Kafka using spring-kafka.
44

5-
As Kafka topics are not created automatically by default, this application requires that a topic named 'baeldung' is created manually.
5+
As Kafka topics are not created automatically by default, this application requires that you create the following topics manually.
66

7-
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic baeldung`
7+
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic baeldung`<br>
8+
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic partitioned`<br>
9+
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic filtered`<br>
10+
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic greeting`<br>
811

9-
Two listeners with group Ids **foo** and **bar** are configured. When run successfully, the *Hello World!* message will be received by both the listeners and logged on console.
12+
When the application runs successfully, following output is logged on to console (along with spring logs):
13+
14+
#### Message received from the 'baeldung' topic by the basic listeners with groups foo and bar
15+
>Received Messasge in group 'foo': Hello, World!<br>
16+
Received Messasge in group 'bar': Hello, World!
17+
18+
#### Message received from the 'baeldung' topic, with the partition info
19+
>Received Messasge: Hello, World! from partition: 0
20+
21+
#### Message received from the 'partitioned' topic, only from specific partitions
22+
>Received Message: Hello To Partioned Topic! from partition: 0<br>
23+
Received Message: Hello To Partioned Topic! from partition: 3
24+
25+
#### Message received from the 'filtered' topic after filtering
26+
>Recieved Message in filtered listener: Hello Baeldung!
27+
28+
#### Message (Serialized Java Object) received from the 'greeting' topic
29+
>Recieved greeting message: Greetings, World!!

spring-kafka/pom.xml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
<properties>
1313
<java.version>1.8</java.version>
1414
<spring-kafka.version>1.1.3.RELEASE</spring-kafka.version>
15+
<jackson.version>2.6.7</jackson.version>
1516
</properties>
1617

1718
<parent>
@@ -21,17 +22,22 @@
2122
</parent>
2223

2324
<dependencies>
24-
25+
2526
<dependency>
2627
<groupId>org.springframework.boot</groupId>
2728
<artifactId>spring-boot-starter</artifactId>
2829
</dependency>
29-
30+
3031
<dependency>
3132
<groupId>org.springframework.kafka</groupId>
3233
<artifactId>spring-kafka</artifactId>
3334
</dependency>
34-
35+
36+
<dependency>
37+
<groupId>com.fasterxml.jackson.core</groupId>
38+
<artifactId>jackson-databind</artifactId>
39+
</dependency>
40+
3541
</dependencies>
3642

3743
<build>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.baeldung.spring.kafka;
2+
3+
public class Greeting {
4+
5+
private String msg;
6+
private String name;
7+
8+
public Greeting() {
9+
10+
}
11+
12+
public Greeting(String msg, String name) {
13+
this.msg = msg;
14+
this.name = name;
15+
}
16+
17+
public String getMsg() {
18+
return msg;
19+
}
20+
21+
public void setMsg(String msg) {
22+
this.msg = msg;
23+
}
24+
25+
public String getName() {
26+
return name;
27+
}
28+
29+
public void setName(String name) {
30+
this.name = name;
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return msg + ", " + name + "!";
36+
}
37+
}

spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,61 @@
1010
import org.springframework.context.ConfigurableApplicationContext;
1111
import org.springframework.context.annotation.Bean;
1212
import org.springframework.kafka.annotation.KafkaListener;
13+
import org.springframework.kafka.annotation.TopicPartition;
1314
import org.springframework.kafka.core.KafkaTemplate;
15+
import org.springframework.kafka.support.KafkaHeaders;
16+
import org.springframework.messaging.handler.annotation.Header;
17+
import org.springframework.messaging.handler.annotation.Payload;
1418

1519
@SpringBootApplication
1620
public class KafkaApplication {
1721

1822
public static void main(String[] args) throws Exception {
23+
1924
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
25+
2026
MessageProducer producer = context.getBean(MessageProducer.class);
27+
MessageListener listener = context.getBean(MessageListener.class);
28+
/*
29+
* Sending a Hello World message to topic 'baeldung'.
30+
* Must be recieved by both listeners with group foo
31+
* and bar with containerFactory fooKafkaListenerContainerFactory
32+
* and barKafkaListenerContainerFactory respectively.
33+
* It will also be recieved by the listener with
34+
* headersKafkaListenerContainerFactory as container factory
35+
*/
2136
producer.sendMessage("Hello, World!");
37+
listener.latch.await(10, TimeUnit.SECONDS);
38+
39+
/*
40+
* Sending message to a topic with 5 partition,
41+
* each message to a different partition. But as per
42+
* listener configuration, only the messages from
43+
* partition 0 and 3 will be consumed.
44+
*/
45+
for (int i = 0; i < 5; i++) {
46+
producer.sendMessageToPartion("Hello To Partioned Topic!", i);
47+
}
48+
listener.partitionLatch.await(10, TimeUnit.SECONDS);
49+
50+
/*
51+
* Sending message to 'filtered' topic. As per listener
52+
* configuration, all messages with char sequence
53+
* 'World' will be discarded.
54+
*/
55+
producer.sendMessageToFiltered("Hello Baeldung!");
56+
producer.sendMessageToFiltered("Hello World!");
57+
listener.filterLatch.await(10, TimeUnit.SECONDS);
58+
59+
/*
60+
* Sending message to 'greeting' topic. This will send
61+
* and recieved a java object with the help of
62+
* greetingKafkaListenerContainerFactory.
63+
*/
64+
producer.sendGreetingMessage(new Greeting("Greetings", "World!"));
65+
listener.greetingLatch.await(10, TimeUnit.SECONDS);
2266

23-
MessageListener listener = context.getBean(MessageListener.class);
24-
listener.latch.await(20, TimeUnit.SECONDS);
25-
Thread.sleep(60000);
2667
context.close();
27-
2868
}
2969

3070
@Bean
@@ -42,18 +82,47 @@ public static class MessageProducer {
4282
@Autowired
4383
private KafkaTemplate<String, String> kafkaTemplate;
4484

85+
@Autowired
86+
private KafkaTemplate<String, Greeting> greetingKafkaTemplate;
87+
4588
@Value(value = "${message.topic.name}")
4689
private String topicName;
4790

91+
@Value(value = "${partitioned.topic.name}")
92+
private String partionedTopicName;
93+
94+
@Value(value = "${filtered.topic.name}")
95+
private String filteredTopicName;
96+
97+
@Value(value = "${greeting.topic.name}")
98+
private String greetingTopicName;
99+
48100
public void sendMessage(String message) {
49101
kafkaTemplate.send(topicName, message);
50102
}
51103

104+
public void sendMessageToPartion(String message, int partition) {
105+
kafkaTemplate.send(partionedTopicName, partition, message);
106+
}
107+
108+
public void sendMessageToFiltered(String message) {
109+
kafkaTemplate.send(filteredTopicName, message);
110+
}
111+
112+
public void sendGreetingMessage(Greeting greeting) {
113+
greetingKafkaTemplate.send(greetingTopicName, greeting);
114+
}
52115
}
53116

54117
public static class MessageListener {
55118

56-
private CountDownLatch latch = new CountDownLatch(2);
119+
private CountDownLatch latch = new CountDownLatch(3);
120+
121+
private CountDownLatch partitionLatch = new CountDownLatch(2);
122+
123+
private CountDownLatch filterLatch = new CountDownLatch(2);
124+
125+
private CountDownLatch greetingLatch = new CountDownLatch(1);
57126

58127
@KafkaListener(topics = "${message.topic.name}", group = "foo", containerFactory = "fooKafkaListenerContainerFactory")
59128
public void listenGroupFoo(String message) {
@@ -67,6 +136,30 @@ public void listenGroupBar(String message) {
67136
latch.countDown();
68137
}
69138

139+
@KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory")
140+
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
141+
System.out.println("Received Messasge: " + message + " from partition: " + partition);
142+
latch.countDown();
143+
}
144+
145+
@KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }))
146+
public void listenToParition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
147+
System.out.println("Received Message: " + message + " from partition: " + partition);
148+
this.partitionLatch.countDown();
149+
}
150+
151+
@KafkaListener(topics = "${filtered.topic.name}", containerFactory = "filterKafkaListenerContainerFactory")
152+
public void listenWithFilter(String message) {
153+
System.out.println("Recieved Message in filtered listener: " + message);
154+
this.filterLatch.countDown();
155+
}
156+
157+
@KafkaListener(topics = "${greeting.topic.name}", containerFactory = "greetingKafkaListenerContainerFactory")
158+
public void greetingListener(Greeting greeting) {
159+
System.out.println("Recieved greeting message: " + greeting);
160+
this.greetingLatch.countDown();
161+
}
162+
70163
}
71164

72165
}

spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
1313
import org.springframework.kafka.core.ConsumerFactory;
1414
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
15+
import org.springframework.kafka.support.serializer.JsonDeserializer;
1516

1617
@EnableKafka
1718
@Configuration
@@ -35,11 +36,49 @@ public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerC
3536
factory.setConsumerFactory(consumerFactory("foo"));
3637
return factory;
3738
}
38-
39+
3940
@Bean
4041
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
4142
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
4243
factory.setConsumerFactory(consumerFactory("bar"));
4344
return factory;
4445
}
46+
47+
@Bean
48+
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
49+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
50+
factory.setConsumerFactory(consumerFactory("headers"));
51+
return factory;
52+
}
53+
54+
@Bean
55+
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
56+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
57+
factory.setConsumerFactory(consumerFactory("partitions"));
58+
return factory;
59+
}
60+
61+
@Bean
62+
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
63+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
64+
factory.setConsumerFactory(consumerFactory("filter"));
65+
factory.setRecordFilterStrategy(record -> record.value()
66+
.contains("World"));
67+
return factory;
68+
}
69+
70+
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
71+
Map<String, Object> props = new HashMap<>();
72+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
73+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
74+
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
75+
}
76+
77+
@Bean
78+
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
79+
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
80+
factory.setConsumerFactory(greetingConsumerFactory());
81+
return factory;
82+
}
83+
4584
}

spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
1212
import org.springframework.kafka.core.KafkaTemplate;
1313
import org.springframework.kafka.core.ProducerFactory;
14+
import org.springframework.kafka.support.serializer.JsonSerializer;
1415

1516
@Configuration
1617
public class KafkaProducerConfig {
@@ -29,8 +30,21 @@ public ProducerFactory<String, String> producerFactory() {
2930

3031
@Bean
3132
public KafkaTemplate<String, String> kafkaTemplate() {
32-
KafkaTemplate<String, String> template =
33-
new KafkaTemplate<String, String>(producerFactory());
34-
return template;
33+
return new KafkaTemplate<String, String>(producerFactory());
3534
}
35+
36+
@Bean
37+
public ProducerFactory<String, Greeting> greetingProducerFactory() {
38+
Map<String, Object> configProps = new HashMap<String, Object>();
39+
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
40+
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
41+
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
42+
return new DefaultKafkaProducerFactory<String, Greeting>(configProps);
43+
}
44+
45+
@Bean
46+
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
47+
return new KafkaTemplate<String, Greeting>(greetingProducerFactory());
48+
}
49+
3650
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
kafka.bootstrapAddress=localhost:9092
22
message.topic.name=baeldung
3+
greeting.topic.name=greeting
4+
filtered.topic.name=filtered
5+
partitioned.topic.name=partitioned

0 commit comments

Comments
 (0)