Skip to content

Commit f8e958f

Browse files
author
Xiong Neng
committed
add rabbitmq rpc sample
1 parent 66c4d7a commit f8e958f

File tree

21 files changed

+933
-0
lines changed

21 files changed

+933
-0
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2018 Xiong Neng
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy of
6+
this software and associated documentation files (the "Software"), to deal in
7+
the Software without restriction, including without limitation the rights to
8+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9+
the Software, and to permit persons to whom the Software is furnished to do so,
10+
subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
17+
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
19+
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20+
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
## RabbitMQ实现RPC调用客户端
2+
3+
消息队列RabbitMQ的使用例子,演示了RPC调用的客户端例子。
4+
5+
## 许可证
6+
7+
Copyright (c) 2018 Xiong Neng
8+
9+
基于 MIT 协议发布: <http://www.opensource.org/licenses/MIT>
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.xncoding</groupId>
8+
<artifactId>springboot-rabbitmq-rpc-client</artifactId>
9+
<version>1.0.0-SNAPSHOT</version>
10+
<packaging>jar</packaging>
11+
12+
<name>springboot-rabbitmq-rpc-client</name>
13+
<description>集成消息队列RabbitMQ RPC调用 - 客户端</description>
14+
15+
<parent>
16+
<groupId>org.springframework.boot</groupId>
17+
<artifactId>spring-boot-starter-parent</artifactId>
18+
<version>1.5.10.RELEASE</version>
19+
<relativePath/>
20+
</parent>
21+
22+
<properties>
23+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
24+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
25+
<java.version>1.8</java.version>
26+
</properties>
27+
28+
<dependencies>
29+
<dependency>
30+
<groupId>org.springframework.boot</groupId>
31+
<artifactId>spring-boot-starter-amqp</artifactId>
32+
</dependency>
33+
34+
<dependency>
35+
<groupId>org.springframework.boot</groupId>
36+
<artifactId>spring-boot-starter-test</artifactId>
37+
<exclusions>
38+
<exclusion>
39+
<groupId>com.vaadin.external.google</groupId>
40+
<artifactId>android-json</artifactId>
41+
</exclusion>
42+
</exclusions>
43+
</dependency>
44+
</dependencies>
45+
46+
<build>
47+
<plugins>
48+
<plugin>
49+
<groupId>org.apache.maven.plugins</groupId>
50+
<artifactId>maven-compiler-plugin</artifactId>
51+
<version>3.6.1</version>
52+
<configuration>
53+
<!--<proc>none</proc>-->
54+
<source>1.8</source>
55+
<target>1.8</target>
56+
</configuration>
57+
</plugin>
58+
<plugin>
59+
<groupId>org.apache.maven.plugins</groupId>
60+
<artifactId>maven-surefire-plugin</artifactId>
61+
<version>2.20</version>
62+
<configuration>
63+
<skip>true</skip>
64+
</configuration>
65+
</plugin>
66+
<plugin>
67+
<groupId>org.springframework.boot</groupId>
68+
<artifactId>spring-boot-maven-plugin</artifactId>
69+
<executions>
70+
</executions>
71+
</plugin>
72+
</plugins>
73+
74+
<resources>
75+
<resource>
76+
<directory>src/main/resources</directory>
77+
</resource>
78+
<resource>
79+
<directory>src/main/java</directory>
80+
<includes>
81+
<include>**/*.xml</include>
82+
</includes>
83+
</resource>
84+
</resources>
85+
</build>
86+
87+
</project>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.xncoding.pos;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class Application {
8+
public static void main(String[] args) {
9+
SpringApplication.run(Application.class, args);
10+
}
11+
12+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package com.xncoding.pos.config;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.core.AmqpTemplate;
6+
import org.springframework.amqp.core.AnonymousQueue;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
9+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
10+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
11+
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
12+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
13+
import org.springframework.amqp.support.converter.MessageConverter;
14+
import org.springframework.context.annotation.Bean;
15+
import org.springframework.context.annotation.Configuration;
16+
import org.springframework.context.annotation.Primary;
17+
import org.springframework.scheduling.annotation.EnableAsync;
18+
19+
import javax.annotation.Resource;
20+
21+
/**
22+
* RabbitConfig
23+
*
24+
* @author XiongNeng
25+
* @version 1.0
26+
* @since 2018/3/1
27+
*/
28+
@Configuration
29+
public class RabbitConfig {
30+
/**
31+
* 同步RPC队列
32+
*/
33+
public static final String QUEUE_SYNC_RPC = "rpc.sync";
34+
35+
/**
36+
* 异步RPC队列,使用临时回复队列,或者使用“Direct reply-to”特性
37+
*/
38+
public static final String QUEUE_ASYNC_RPC = "rpc.async";
39+
40+
/**
41+
* 异步RPC队列,每个客户端使用不同的固定回复队列,需要额外提供correlationId以关联请求和响应
42+
*/
43+
public static final String QUEUE_ASYNC_RPC_WITH_FIXED_REPLY = "rpc.with.fixed.reply";
44+
45+
@Bean
46+
public Queue syncRPCQueue() {
47+
return new Queue(QUEUE_SYNC_RPC);
48+
}
49+
50+
@Bean
51+
public Queue asyncRPCQueue() {
52+
return new Queue(QUEUE_ASYNC_RPC);
53+
}
54+
55+
@Bean
56+
public Queue fixedReplyRPCQueue() {
57+
return new Queue(QUEUE_ASYNC_RPC_WITH_FIXED_REPLY);
58+
}
59+
60+
@Bean
61+
public Queue repliesQueue() {
62+
return new AnonymousQueue();
63+
}
64+
65+
@Bean
66+
public MessageConverter jsonMessageConverter(){
67+
return new Jackson2JsonMessageConverter();
68+
}
69+
70+
@Bean
71+
@Primary
72+
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
73+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
74+
container.setQueueNames(repliesQueue().getName());
75+
return container;
76+
}
77+
78+
@Bean
79+
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
80+
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
81+
rabbitTemplate.setMessageConverter(jsonMessageConverter());
82+
return rabbitTemplate;
83+
}
84+
85+
@Bean
86+
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate template, SimpleMessageListenerContainer container) {
87+
return new AsyncRabbitTemplate(template, container);
88+
}
89+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.xncoding.pos.model;
2+
3+
/**
4+
* User
5+
*
6+
* @author XiongNeng
7+
* @version 1.0
8+
* @since 2018/5/17
9+
*/
10+
public class User {
11+
private String name;
12+
private Integer age;
13+
14+
public String getName() {
15+
return name;
16+
}
17+
18+
public void setName(String name) {
19+
this.name = name;
20+
}
21+
22+
public Integer getAge() {
23+
return age;
24+
}
25+
26+
public void setAge(Integer age) {
27+
this.age = age;
28+
}
29+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.xncoding.pos.service;
2+
3+
import com.xncoding.pos.model.User;
4+
import com.xncoding.pos.util.StringUtil;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.amqp.core.AmqpTemplate;
8+
import org.springframework.amqp.core.MessagePostProcessor;
9+
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.scheduling.annotation.Async;
12+
import org.springframework.scheduling.annotation.AsyncResult;
13+
import org.springframework.stereotype.Service;
14+
import org.springframework.util.concurrent.ListenableFuture;
15+
16+
import java.util.concurrent.Future;
17+
18+
import static com.xncoding.pos.config.RabbitConfig.QUEUE_ASYNC_RPC;
19+
import static com.xncoding.pos.config.RabbitConfig.QUEUE_ASYNC_RPC_WITH_FIXED_REPLY;
20+
21+
/**
22+
* 消息发送服务
23+
*/
24+
@Service
25+
public class SenderService {
26+
private Logger logger = LoggerFactory.getLogger(this.getClass());
27+
@Autowired
28+
AsyncRabbitTemplate asyncRabbitTemplate;
29+
30+
@Autowired
31+
AmqpTemplate amqpTemplate;
32+
33+
@Async
34+
public Future<String> sendAsync(User message) {
35+
// String result = (String) amqpTemplate.convertSendAndReceive(QUEUE_ASYNC_RPC, message, m -> {
36+
// m.getMessageProperties().setCorrelationIdString(StringUtil.generateUUId());
37+
// return m;
38+
// });
39+
String result = (String) amqpTemplate.convertSendAndReceive(QUEUE_ASYNC_RPC, message);
40+
return new AsyncResult<>(result);
41+
}
42+
43+
public Future<String> sendWithFixedReplay(User message) {
44+
// ListenableFuture<String> future = asyncRabbitTemplate.convertSendAndReceive(QUEUE_ASYNC_RPC_WITH_FIXED_REPLY, message, m -> {
45+
// m.getMessageProperties().setCorrelationIdString(StringUtil.generateUUId());
46+
// return m;
47+
// });
48+
return asyncRabbitTemplate.convertSendAndReceive(QUEUE_ASYNC_RPC_WITH_FIXED_REPLY, message);
49+
}
50+
51+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.xncoding.pos.util;
2+
3+
import java.util.UUID;
4+
5+
/**
6+
* StringUtil
7+
*
8+
* @author XiongNeng
9+
* @version 1.0
10+
* @since 2018/5/17
11+
*/
12+
public class StringUtil {
13+
public static String generateUUId() {
14+
return UUID.randomUUID().toString();
15+
}
16+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
##########################################################
2+
################## 所有profile共有的配置 #################
3+
##########################################################
4+
5+
################### spring配置 ###################
6+
spring:
7+
profiles:
8+
active: dev
9+
10+
---
11+
12+
#####################################################################
13+
######################## 开发环境profile ##########################
14+
#####################################################################
15+
spring:
16+
profiles: dev
17+
rabbitmq:
18+
host: 119.29.12.177
19+
port: 5672
20+
username: guest
21+
password: guest
22+
23+
logging:
24+
level:
25+
ROOT: INFO
26+
com:
27+
xncoding: DEBUG
28+
file: D:/logs/rabbitmq-rpc-client.log
29+

0 commit comments

Comments
 (0)