Skip to content

Commit 709857b

Browse files
mateuszmrozewskiadamd1985
authored andcommitted
BAEL-427 Examples for topic and fanout exchanges. (eugenp#1648)
* BAEL-427 Examples for topic and fanout exchanges. * BAEL-427 Separating code for the new article from the old one
1 parent f674f12 commit 709857b

File tree

8 files changed

+209
-6
lines changed

8 files changed

+209
-6
lines changed

spring-amqp-simple/src/main/java/com/baeldung/springamqpsimple/MessageConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22

33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
5-
5+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
66
import org.springframework.stereotype.Component;
77

88
@Component
99
public class MessageConsumer {
1010

1111
private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
1212

13+
@RabbitListener(queues = {SpringAmqpConfig.queueName})
1314
public void receiveMessage(String message) {
1415
logger.info("Received Message: " + message);
1516
}

spring-amqp-simple/src/main/java/com/baeldung/springamqpsimple/SpringAmqpConfig.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package com.baeldung.springamqpsimple;
22

3-
import org.springframework.amqp.core.*;
3+
import org.springframework.amqp.core.Binding;
4+
import org.springframework.amqp.core.BindingBuilder;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.Exchange;
7+
import org.springframework.amqp.core.Queue;
48
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
59
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
610
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
@@ -32,7 +36,7 @@ Binding binding(Queue queue, DirectExchange exchange) {
3236

3337
@Bean
3438
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
35-
MessageListenerAdapter listenerAdapter) {
39+
MessageListenerAdapter listenerAdapter) {
3640
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
3741
container.setConnectionFactory(connectionFactory);
3842
container.setQueueNames(queueName);
@@ -44,5 +48,4 @@ SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
4448
MessageListenerAdapter listenerAdapter(MessageConsumer messageReceiver) {
4549
return new MessageListenerAdapter(messageReceiver, "receiveMessage");
4650
}
47-
4851
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.baeldung.springamqpsimple.broadcast;
2+
3+
import org.springframework.amqp.core.BindingBuilder;
4+
import org.springframework.amqp.core.Declarable;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.FanoutExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.TopicExchange;
9+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
10+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
11+
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
import org.springframework.context.annotation.Profile;
15+
16+
import java.util.Arrays;
17+
import java.util.List;
18+
19+
@Configuration
20+
@Profile("!test")
21+
public class BroadcastConfig {
22+
23+
public final static String fanoutQueue1Name = "com.baeldung.spring-amqp-simple.fanout.queue1";
24+
public final static String fanoutQueue2Name = "com.baeldung.spring-amqp-simple.fanout.queue2";
25+
public final static String fanoutExchangeName = "com.baeldung.spring-amqp-simple.fanout.exchange";
26+
27+
public final static String topicQueue1Name = "com.baeldung.spring-amqp-simple.topic.queue1";
28+
public final static String topicQueue2Name = "com.baeldung.spring-amqp-simple.topic.queue2";
29+
public final static String topicExchangeName = "com.baeldung.spring-amql-simple.topic.exchange";
30+
31+
@Bean
32+
public List<Declarable> topicBindings() {
33+
Queue topicQueue1 = new Queue(topicQueue1Name, false);
34+
Queue topicQueue2 = new Queue(topicQueue2Name, false);
35+
36+
TopicExchange topicExchange = new TopicExchange(topicExchangeName);
37+
38+
return Arrays.asList(
39+
topicQueue1,
40+
topicQueue2,
41+
topicExchange,
42+
BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.important.*"),
43+
BindingBuilder.bind(topicQueue2).to(topicExchange).with("user.#")
44+
);
45+
}
46+
47+
@Bean
48+
public List<Declarable> fanoutBindings() {
49+
Queue fanoutQueue1 = new Queue(fanoutQueue1Name, false);
50+
Queue fanoutQueue2 = new Queue(fanoutQueue2Name, false);
51+
52+
FanoutExchange fanoutExchange = new FanoutExchange(fanoutExchangeName);
53+
54+
return Arrays.asList(
55+
fanoutQueue1,
56+
fanoutQueue2,
57+
fanoutExchange,
58+
BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
59+
BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)
60+
);
61+
}
62+
63+
@Bean
64+
public SimpleRabbitListenerContainerFactory container(ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) {
65+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
66+
configurer.configure(factory, connectionFactory);
67+
return factory;
68+
}
69+
70+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.baeldung.springamqpsimple.broadcast;
2+
3+
import com.baeldung.springamqpsimple.MessageConsumer;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
7+
import org.springframework.stereotype.Component;
8+
9+
@Component
10+
public class BroadcastMessageConsumers {
11+
private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
12+
13+
@RabbitListener(queues = {BroadcastConfig.fanoutQueue1Name})
14+
public void receiveMessageFromFanout1(String message) {
15+
logger.info("Received fanout 1 message: " + message);
16+
}
17+
18+
@RabbitListener(queues = {BroadcastConfig.fanoutQueue2Name})
19+
public void receiveMessageFromFanout2(String message) {
20+
logger.info("Received fanout 2 message: " + message);
21+
}
22+
23+
@RabbitListener(queues = {BroadcastConfig.topicQueue1Name})
24+
public void receiveMessageFromTopic1(String message) {
25+
logger.info("Received topic 1 message: " + message);
26+
}
27+
28+
@RabbitListener(queues = {BroadcastConfig.topicQueue2Name})
29+
public void receiveMessageFromTopic2(String message) {
30+
logger.info("Received topic 2 message: " + message);
31+
}
32+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.baeldung.springamqpsimple.broadcast;
2+
3+
import org.springframework.beans.factory.annotation.Autowired;
4+
import org.springframework.http.HttpStatus;
5+
import org.springframework.stereotype.Controller;
6+
import org.springframework.web.bind.annotation.RequestBody;
7+
import org.springframework.web.bind.annotation.RequestMapping;
8+
import org.springframework.web.bind.annotation.RequestMethod;
9+
import org.springframework.web.bind.annotation.ResponseStatus;
10+
11+
@Controller
12+
public class BroadcastMessageController {
13+
14+
private final BroadcastMessageProducer messageProducer;
15+
16+
@Autowired
17+
public BroadcastMessageController(BroadcastMessageProducer messageProducer) {
18+
this.messageProducer = messageProducer;
19+
}
20+
21+
@RequestMapping(value="/broadcast", method= RequestMethod.POST)
22+
@ResponseStatus(value= HttpStatus.CREATED)
23+
public void sendMessage(@RequestBody String message) {
24+
messageProducer.sendMessages(message);
25+
}
26+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.baeldung.springamqpsimple.broadcast;
2+
3+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.stereotype.Component;
6+
7+
@Component
8+
public class BroadcastMessageProducer {
9+
10+
private final RabbitTemplate rabbitTemplate;
11+
12+
@Autowired
13+
public BroadcastMessageProducer(RabbitTemplate rabbitTemplate) {
14+
this.rabbitTemplate = rabbitTemplate;
15+
}
16+
17+
public void sendMessages(String message) {
18+
rabbitTemplate.convertAndSend(BroadcastConfig.fanoutExchangeName, "", message);
19+
rabbitTemplate.convertAndSend(BroadcastConfig.topicExchangeName, "user.not-important.info", message);
20+
rabbitTemplate.convertAndSend(BroadcastConfig.topicExchangeName, "user.important.error", message);
21+
}
22+
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
spring:
22
rabbitmq:
3-
username: baeldung
4-
password: baeldung
3+
username: guest
4+
password: guest
5+
host: 10.10.10.105
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package broadcast;
2+
3+
import com.baeldung.springamqpsimple.broadcast.BroadcastConfig;
4+
import org.junit.Test;
5+
import org.junit.runner.RunWith;
6+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.boot.test.context.SpringBootTest;
9+
import org.springframework.boot.test.mock.mockito.MockBean;
10+
import org.springframework.boot.test.web.client.TestRestTemplate;
11+
import org.springframework.http.HttpStatus;
12+
import org.springframework.http.ResponseEntity;
13+
import org.springframework.test.context.ActiveProfiles;
14+
import org.springframework.test.context.junit4.SpringRunner;
15+
16+
import static org.junit.Assert.*;
17+
import static org.mockito.Mockito.*;
18+
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
19+
20+
@RunWith(SpringRunner.class)
21+
@ActiveProfiles("test")
22+
@SpringBootTest(webEnvironment = RANDOM_PORT)
23+
public class BroadcastMessageControllerIntegrationTest {
24+
25+
@Autowired
26+
private TestRestTemplate restTemplate;
27+
28+
@MockBean
29+
private RabbitTemplate rabbitTemplate;
30+
31+
@Test
32+
public void whenPostingMessage_thenMessageIsCreated() {
33+
final String message = "Hello World!";
34+
ResponseEntity<Void> responseEntity = restTemplate.postForEntity("/broadcast", message, Void.class);
35+
36+
assertEquals(HttpStatus.CREATED, responseEntity.getStatusCode());
37+
}
38+
39+
@Test
40+
public void whenPostingMessage_thenMessageIsSentToBroker() {
41+
final String message = "Hello World!";
42+
restTemplate.postForEntity("/broadcast", message, Void.class);
43+
44+
verify(rabbitTemplate).convertAndSend(BroadcastConfig.fanoutExchangeName, "", message);
45+
verify(rabbitTemplate).convertAndSend(BroadcastConfig.topicExchangeName, "user.not-important.info", message);
46+
verify(rabbitTemplate).convertAndSend(BroadcastConfig.topicExchangeName, "user.important.error", message);
47+
}
48+
}

0 commit comments

Comments
 (0)