Skip to content

Commit 7dafe29

Browse files
committed
[change] refactor and move netty implementations to tars-netty.
1 parent c149f4a commit 7dafe29

35 files changed

+1197
-191
lines changed

Contributing.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ If you contributed but cannot find your ID here, please submit PR and add your G
1313
- woodwind
1414
- XenoAmess
1515
- yukkiball
16+
- Kongyuanyuan

core/pom.xml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@
2121
</properties>
2222

2323
<dependencies>
24-
<dependency>
25-
<groupId>io.netty</groupId>
26-
<artifactId>netty-tcnative</artifactId>
27-
<version>2.0.20.Final</version> <!-- see table for correct version -->
28-
<scope>runtime</scope>
29-
</dependency>
3024
<dependency>
3125
<groupId>junit</groupId>
3226
<artifactId>junit</artifactId>
@@ -36,7 +30,6 @@
3630
<dependency>
3731
<groupId>org.slf4j</groupId>
3832
<artifactId>slf4j-api</artifactId>
39-
<version>1.7.30</version>
4033
</dependency>
4134
<dependency>
4235
<groupId>com.google.guava</groupId>
@@ -52,7 +45,6 @@
5245
<dependency>
5346
<groupId>io.netty</groupId>
5447
<artifactId>netty-all</artifactId>
55-
<version>4.1.56.Final</version>
5648
</dependency>
5749
<dependency>
5850
<groupId>javax.servlet</groupId>
@@ -96,11 +88,5 @@
9688
<version>4.3.2</version>
9789
<scope>compile</scope>
9890
</dependency>
99-
<dependency>
100-
<groupId>org.apache.httpcomponents</groupId>
101-
<artifactId>httpcore</artifactId>
102-
<version>4.3.2</version>
103-
<scope>compile</scope>
104-
</dependency>
10591
</dependencies>
10692
</project>

core/src/main/java/com/qq/tars/client/rpc/NettyTransporter.java

Lines changed: 0 additions & 17 deletions
This file was deleted.

core/src/main/java/com/qq/tars/client/rpc/ServantProtocolInvoker.java

Lines changed: 4 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,10 @@
2525
import com.qq.tars.rpc.common.Url;
2626
import com.qq.tars.rpc.common.util.concurrent.ConcurrentHashSet;
2727
import com.qq.tars.rpc.exc.ClientException;
28-
import com.qq.tars.rpc.protocol.tars.TarsServantResponse;
2928
import com.qq.tars.support.log.LoggerFactory;
30-
import io.netty.channel.Channel;
3129
import org.slf4j.Logger;
3230

33-
import java.io.IOException;
34-
import java.util.ArrayList;
35-
import java.util.Collection;
36-
import java.util.Collections;
37-
import java.util.HashMap;
38-
import java.util.HashSet;
39-
import java.util.List;
40-
import java.util.Map;
41-
import java.util.Set;
31+
import java.util.*;
4232
import java.util.concurrent.TimeUnit;
4333
import java.util.stream.Collectors;
4434

@@ -89,23 +79,21 @@ public void refresh() {
8979
ScheduledExecutorManager.getInstance().schedule(() -> destroy(brokenInvokers), Math.max(servantProxyConfig.getAsyncTimeout(), servantProxyConfig.getSyncTimeout()), TimeUnit.MILLISECONDS);
9080
}
9181

92-
protected RPCClient[] getClients(Url url) throws IOException {
82+
protected RPCClient[] getClients(Url url) {
9383
int connections = url.getParameter(Constants.TARS_CLIENT_CONNECTIONS, Constants.DEFAULT_CONNECTION);
94-
RPCClient[] clients = new NettyServantClient[connections];
84+
RPCClient[] clients = new RPCClient[connections];
9585
for (int i = 0; i < clients.length; i++) {
9686
clients[i] = initClient(url);
9787
}
9888
return clients;
9989
}
10090

10191
protected RPCClient initClient(Url url) {
102-
RPCClient client = null;
10392
try {
104-
client = NettyTransporter.connect(url, servantProxyConfig, new InnerDefaultHandler());
93+
return TransporterAbstractFactory.getInstance().getTransporterFactory().connect(url, servantProxyConfig);
10594
} catch (Throwable e) {
10695
throw new ClientException(servantProxyConfig.getSimpleObjectName(), "Fail to create client|" + url.toIdentityString() + "|" + e.getLocalizedMessage(), e);
10796
}
108-
return client;
10997
}
11098

11199
protected ConcurrentHashSet<Invoker<T>> initInvoker() {
@@ -158,43 +146,4 @@ private void destroy(Collection<Invoker<T>> invokers) {
158146
}
159147
}
160148
}
161-
162-
private static class InnerDefaultHandler implements ChannelHandler {
163-
164-
@Override
165-
public void connected(Channel channel) {
166-
167-
}
168-
169-
@Override
170-
public void disconnected(Channel channel) {
171-
172-
}
173-
174-
@Override
175-
public void send(Channel channel, Object message) {
176-
177-
}
178-
179-
@Override
180-
public void received(Channel channel, Object message) {
181-
TarsServantResponse response = (TarsServantResponse) message;
182-
if (logger.isDebugEnabled()) {
183-
System.out.println();
184-
logger.debug("[tars]netty receive message id is " + response.getRequestId());
185-
}
186-
TicketFeature.getFeature(response.getRequestId()).complete(response);
187-
}
188-
189-
@Override
190-
public void caught(Channel channel, Throwable exception) {
191-
192-
}
193-
194-
@Override
195-
public void destroy() {
196-
197-
}
198-
}
199-
200149
}

core/src/main/java/com/qq/tars/client/rpc/TarsDecoder.java

100755100644
File mode changed.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.qq.tars.client.rpc;
2+
3+
import com.google.common.collect.ImmutableList;
4+
import com.qq.tars.support.log.LoggerFactory;
5+
import org.slf4j.Logger;
6+
7+
import java.util.List;
8+
import java.util.ServiceLoader;
9+
10+
/**
11+
* Abstract factory for transporter.
12+
* Used to load different transporter implementation(such as netty, reactor, restTemplate, etc) through SPI:{@code com.qq.tars.client.rpc.TransporterFactory}
13+
*
14+
* @author kongyuanyuan
15+
*/
16+
public class TransporterAbstractFactory {
17+
private static final Logger log = LoggerFactory.getTransporterLogger();
18+
19+
private static final TransporterAbstractFactory INSTANCE = new TransporterAbstractFactory();
20+
private final TransporterFactory defaultTransporterFactory;
21+
22+
private TransporterAbstractFactory() {
23+
ServiceLoader<TransporterFactory> transporterFactories = ServiceLoader.load(TransporterFactory.class, this.getClass().getClassLoader());
24+
List<TransporterFactory> transporterFactoryList = ImmutableList.copyOf(transporterFactories);
25+
if (transporterFactoryList.isEmpty()) {
26+
throw new IllegalStateException("No TransporterFactory implementation found on the classpath through SPI. Try to add tars-netty in pom.xml.");
27+
}
28+
defaultTransporterFactory = transporterFactoryList.get(0);
29+
if (transporterFactoryList.size() > 1) {
30+
log.warn("More than one transporter factory found. {} will be used.", defaultTransporterFactory.getClass().getCanonicalName());
31+
}
32+
}
33+
34+
public static TransporterAbstractFactory getInstance() {
35+
return INSTANCE;
36+
}
37+
38+
public TransporterFactory getTransporterFactory() {
39+
return defaultTransporterFactory;
40+
}
41+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.qq.tars.client.rpc;
2+
3+
import com.qq.tars.client.ServantProxyConfig;
4+
import com.qq.tars.rpc.common.Url;
5+
import com.qq.tars.server.config.ServantAdapterConfig;
6+
import com.qq.tars.server.core.Processor;
7+
8+
/**
9+
* Abstract factory for creating servers and clients in transportation。
10+
*
11+
* @author kongyuanyuan
12+
*/
13+
public interface TransporterFactory {
14+
15+
/**
16+
* Connect with a server node using the given servant config.
17+
*
18+
* @param url server node url
19+
* @param servantProxyConfig servant config
20+
* @return rpc client instance
21+
*/
22+
RPCClient connect(Url url, ServantProxyConfig servantProxyConfig);
23+
24+
/**
25+
* Connect with a server node using the given servant config and channel handler.
26+
*
27+
* @param url server node url
28+
* @param servantProxyConfig servant config
29+
* @param channelHandler custom channel handler. Can be used in unit tests.
30+
* @return rpc client instance
31+
*/
32+
RPCClient connect(Url url, ServantProxyConfig servantProxyConfig, ChannelHandler channelHandler);
33+
34+
/**
35+
* Get the server instance for transportation which can be used to bind ip/port with {@link TransporterServer#bind()} method.
36+
*
37+
* @param servantAdapterConfig the servant adapter config which is provided by the platform.
38+
* @param processor the request processor.
39+
* @return transporter server instance
40+
* @see com.qq.tars.server.core.TarsServantProcessor
41+
*/
42+
TransporterServer getTransporterServer(ServantAdapterConfig servantAdapterConfig, Processor processor);
43+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.qq.tars.client.rpc;
2+
3+
/**
4+
* 传输层服务器。需要实现{@code bind}方法来绑定服务地址与端口号。
5+
*
6+
* @author kongyuanyuan
7+
*/
8+
public interface TransporterServer {
9+
/**
10+
* 执行绑定动作。
11+
*/
12+
void bind();
13+
}

core/src/main/java/com/qq/tars/client/support/ClientPoolManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.qq.tars.client.support;
1818

1919
import com.qq.tars.client.CommunicatorConfig;
20-
import com.qq.tars.client.rpc.NettyServantClient;
20+
import com.qq.tars.client.rpc.RPCClient;
2121
import com.qq.tars.common.util.concurrent.TaskQueue;
2222
import com.qq.tars.common.util.concurrent.TaskThreadFactory;
2323
import com.qq.tars.common.util.concurrent.TaskThreadPoolExecutor;
@@ -35,7 +35,7 @@ public class ClientPoolManager {
3535
public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig communicatorConfig) {
3636
ThreadPoolExecutor clientPoolExecutor = clientThreadPoolMap.get(communicatorConfig);
3737
if (clientPoolExecutor == null) {
38-
synchronized (NettyServantClient.class) {
38+
synchronized (RPCClient.class) {
3939
clientPoolExecutor = clientThreadPoolMap.get(communicatorConfig);
4040
if (clientPoolExecutor == null) {
4141
clientThreadPoolMap.put(communicatorConfig, createThreadPool(communicatorConfig));

core/src/main/java/com/qq/tars/client/support/ClientThreadPoolManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.qq.tars.client.support;
1818

1919
import com.qq.tars.client.CommunicatorConfig;
20-
import com.qq.tars.client.rpc.NettyServantClient;
20+
import com.qq.tars.client.rpc.RPCClient;
2121
import com.qq.tars.common.util.concurrent.TaskQueue;
2222
import com.qq.tars.common.util.concurrent.TaskThreadFactory;
2323
import com.qq.tars.common.util.concurrent.TaskThreadPoolExecutor;
@@ -34,7 +34,7 @@ public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig
3434
String contextIdentity = resolveCurrentContextIdentity();
3535
ThreadPoolExecutor clientPoolExecutor = clientThreadPoolMap.get(contextIdentity);
3636
if (clientPoolExecutor == null) {
37-
synchronized (NettyServantClient.class) {
37+
synchronized (RPCClient.class) {
3838
clientPoolExecutor = clientThreadPoolMap.get(contextIdentity);
3939
if (clientPoolExecutor == null) {
4040
clientThreadPoolMap.put(contextIdentity, createThreadPool(communicatorConfig));

0 commit comments

Comments
 (0)