Skip to content

Commit 3e0d7f6

Browse files
committed
interface add
1 parent 3f8d7f0 commit 3e0d7f6

26 files changed

+2427
-33
lines changed

pom.xml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@
3939
<artifactId>log4j</artifactId>
4040
<version>${log4j_version}</version>
4141
</dependency>
42+
<dependency>
43+
<groupId>org.slf4j</groupId>
44+
<artifactId>slf4j-api</artifactId>
45+
<version>${slf4j_version}</version>
46+
</dependency>
4247
<dependency>
4348
<groupId>junit</groupId>
4449
<artifactId>junit</artifactId>
@@ -67,11 +72,6 @@
6772
</dependencies>
6873
</dependencyManagement>
6974
<dependencies>
70-
<dependency>
71-
<groupId>log4j</groupId>
72-
<artifactId>log4j</artifactId>
73-
<scope>test</scope>
74-
</dependency>
7575
<dependency>
7676
<groupId>junit</groupId>
7777
<artifactId>junit</artifactId>
@@ -91,6 +91,10 @@
9191
<groupId>log4j</groupId>
9292
<artifactId>log4j</artifactId>
9393
</dependency>
94+
<dependency>
95+
<groupId>org.slf4j</groupId>
96+
<artifactId>slf4j-api</artifactId>
97+
</dependency>
9498
<dependency>
9599
<groupId>org.javassist</groupId>
96100
<artifactId>javassist</artifactId>

src/main/java/com/redcreen/rpcplus/channel/ChannelHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,37 +32,37 @@ public interface ChannelHandler {
3232
*
3333
* @param channel channel.
3434
*/
35-
void connected(Attributeable channel) throws ChannelException;
35+
void connected(Channel channel) throws ChannelException;
3636

3737
/**
3838
* on channel disconnected.
3939
*
4040
* @param channel channel.
4141
*/
42-
void disconnected(Attributeable channel) throws ChannelException;
42+
void disconnected(Channel channel) throws ChannelException;
4343

4444
/**
4545
* on message sent.
4646
*
4747
* @param channel channel.
4848
* @param message message.
4949
*/
50-
void sent(Attributeable channel, Object message) throws ChannelException;
50+
void sent(Channel channel, Object message) throws ChannelException;
5151

5252
/**
5353
* on message received.
5454
*
5555
* @param channel channel.
5656
* @param message message.
5757
*/
58-
void received(Attributeable channel, Object message) throws ChannelException;
58+
void received(Channel channel, Object message) throws ChannelException;
5959

6060
/**
6161
* on exception caught.
6262
*
6363
* @param channel channel.
6464
* @param exception exception.
6565
*/
66-
void caught(Attributeable channel, Throwable exception) throws ChannelException;
66+
void caught(Channel channel, Throwable exception) throws ChannelException;
6767

6868
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.redcreen.rpcplus.handler;
2+
3+
import com.redcreen.rpcplus.channel.Channel;
4+
import com.redcreen.rpcplus.channel.ChannelException;
5+
import com.redcreen.rpcplus.channel.ChannelHandler;
6+
7+
public class AbstractChannelHandler implements ChannelHandler {
8+
9+
@Override
10+
public void connected(Channel channel) throws ChannelException {
11+
//do nothing
12+
}
13+
14+
@Override
15+
public void disconnected(Channel channel) throws ChannelException {
16+
//do nothing
17+
}
18+
19+
@Override
20+
public void sent(Channel channel, Object message) throws ChannelException {
21+
//do nothing
22+
}
23+
24+
@Override
25+
public void received(Channel channel, Object message) throws ChannelException {
26+
//do nothing
27+
}
28+
29+
@Override
30+
public void caught(Channel channel, Throwable exception) throws ChannelException {
31+
//do nothing
32+
}
33+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.redcreen.rpcplus.handler;
2+
3+
import com.redcreen.rpcplus.channel.Channel;
4+
import com.redcreen.rpcplus.channel.ChannelException;
5+
import com.redcreen.rpcplus.channel.ChannelHandler;
6+
7+
public class AbstractChannelHandlerWrapper implements ChannelHandler {
8+
9+
protected ChannelHandler handler ;
10+
11+
12+
public AbstractChannelHandlerWrapper(ChannelHandler handler) {
13+
super();
14+
this.handler = handler;
15+
}
16+
17+
@Override
18+
public void connected(Channel channel) throws ChannelException {
19+
handler.connected(channel);
20+
}
21+
22+
@Override
23+
public void disconnected(Channel channel) throws ChannelException {
24+
handler.disconnected(channel);
25+
}
26+
27+
@Override
28+
public void sent(Channel channel, Object message) throws ChannelException {
29+
handler.sent(channel, message);
30+
}
31+
32+
@Override
33+
public void received(Channel channel, Object message) throws ChannelException {
34+
handler.received(channel, message);
35+
}
36+
37+
@Override
38+
public void caught(Channel channel, Throwable exception) throws ChannelException {
39+
handler.caught(channel, exception);
40+
}
41+
42+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package com.redcreen.rpcplus.handler.codec;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
6+
import com.redcreen.rpcplus.channel.Channel;
7+
import com.redcreen.rpcplus.channel.ChannelException;
8+
import com.redcreen.rpcplus.channel.ChannelHandler;
9+
import com.redcreen.rpcplus.handler.AbstractChannelHandlerWrapper;
10+
import com.redcreen.rpcplus.support.Constants.ChannelConstants;
11+
import com.redcreen.rpcplus.support.URL;
12+
import com.redcreen.rpcplus.util.io.Bytes;
13+
import com.redcreen.rpcplus.util.io.UnsafeByteArrayInputStream;
14+
15+
public abstract class AbstractCodecHandlerWrapper extends AbstractChannelHandlerWrapper {
16+
17+
private final int bufferSize;
18+
19+
private int mOffset = 0, mLimit = 0;
20+
21+
private byte[] mBuffer = null;
22+
23+
public AbstractCodecHandlerWrapper(URL url, ChannelHandler handler) {
24+
super(handler);
25+
//TODO
26+
this.bufferSize = 8096;
27+
}
28+
29+
public void sent(Channel channel, Object message) throws ChannelException {
30+
handler.sent(channel, encode(channel, message));
31+
}
32+
33+
public void received(Channel channel, Object message) throws ChannelException {
34+
if (!(message instanceof InputStream)) {
35+
handler.received(channel, message);
36+
return ;
37+
}
38+
try{
39+
doReceived(channel, (InputStream)message);
40+
}catch (IOException e) {
41+
throw new ChannelException(channel,e);
42+
}
43+
}
44+
45+
public void doReceived(Channel channel, InputStream input) throws ChannelException, IOException {
46+
int readable = input.read();
47+
if (readable <= 0) {
48+
return;
49+
}
50+
51+
int off, limit;
52+
byte[] buf = mBuffer;
53+
if (buf == null) {
54+
buf = new byte[bufferSize];
55+
off = limit = 0;
56+
} else {
57+
off = mOffset;
58+
limit = mLimit;
59+
}
60+
61+
boolean remaining = true;
62+
Object msg;
63+
UnsafeByteArrayInputStream bis;
64+
try {
65+
do {
66+
// read data into buffer.
67+
int read = Math.min(readable, buf.length - limit);
68+
// input.readBytes(buf, limit, read);
69+
input.read(buf, limit, read);
70+
limit += read;
71+
readable -= read;
72+
bis = new UnsafeByteArrayInputStream(buf, off, limit - off); // 不需要关闭
73+
// decode object.
74+
do {
75+
try {
76+
msg = decode(channel, bis);
77+
} catch (IOException t) {
78+
remaining = false;
79+
throw t;
80+
}
81+
if (msg == null) {
82+
if (off == 0) {
83+
if (readable > 0) {
84+
buf = Bytes.copyOf(buf, buf.length << 1);
85+
}
86+
} else {
87+
int len = limit - off;
88+
System.arraycopy(buf, off, buf, 0, len); // adjust buffer.
89+
off = 0;
90+
limit = len;
91+
}
92+
break;
93+
} else {
94+
int pos = bis.position();
95+
if (off == pos) {
96+
remaining = false;
97+
throw new IOException("Decode without read data.");
98+
}
99+
// null represent need more .do nothing waitfor next data package.
100+
if (msg != null) {
101+
handler.received(channel, msg);
102+
}
103+
off = pos;
104+
}
105+
} while (bis.available() > 0);
106+
} while (readable > 0);
107+
} finally {
108+
if (remaining) {
109+
int len = limit - off;
110+
if (len < buf.length / 2) {
111+
System.arraycopy(buf, off, buf, 0, len);
112+
off = 0;
113+
limit = len;
114+
}
115+
mBuffer = buf;
116+
mOffset = off;
117+
mLimit = limit;
118+
} else {
119+
mBuffer = null;
120+
mOffset = mLimit = 0;
121+
}
122+
}
123+
124+
125+
}
126+
127+
protected void checkPayload(Channel channel, long size) throws IOException {
128+
int payload = ChannelConstants.PAYLOAD_DEFAULT;
129+
if (channel != null && channel.getUrl() != null) {
130+
payload = channel.getUrl().getPositiveParameter(ChannelConstants.PAYLOAD_KEY, ChannelConstants.PAYLOAD_DEFAULT);
131+
}
132+
if (size > payload) {
133+
throw new IOException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
134+
}
135+
}
136+
137+
// TODO IOEXCEPTION
138+
protected Object decode(Channel channel, InputStream message) throws IOException {
139+
return message;
140+
}
141+
142+
protected Object encode(Channel channel, Object message) throws ChannelException{
143+
return message;
144+
}
145+
146+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.redcreen.rpcplus.handler.codec;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
6+
import com.redcreen.rpcplus.channel.Channel;
7+
import com.redcreen.rpcplus.channel.ChannelHandler;
8+
import com.redcreen.rpcplus.codec.Codec;
9+
import com.redcreen.rpcplus.codec.Frame;
10+
import com.redcreen.rpcplus.support.Constants.ChannelConstants;
11+
import com.redcreen.rpcplus.support.ServiceContext;
12+
import com.redcreen.rpcplus.support.URL;
13+
14+
public class PortunificationHandler extends AbstractCodecHandlerWrapper{
15+
private final Codec codec ;
16+
17+
/**
18+
* @param url
19+
* @param handler
20+
*/
21+
public PortunificationHandler(URL url, ChannelHandler handler) {
22+
super(url, handler);
23+
codec = ServiceContext.getExtensionLoader(Codec.class).getExtension(
24+
url.getParameter(ChannelConstants.CODEC_KEY, ChannelConstants.CODEC_DEFAULT));
25+
}
26+
27+
@Override
28+
public Object decode(Channel channel, InputStream is) throws IOException{
29+
Frame frame = codec.decode(channel, is);
30+
return frame;
31+
}
32+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.redcreen.rpcplus.handler.execution;
2+
3+
import com.redcreen.rpcplus.channel.ChannelHandler;
4+
import com.redcreen.rpcplus.handler.AbstractChannelHandlerWrapper;
5+
6+
public class AbstractExecutionChannelHandlerWrapper extends AbstractChannelHandlerWrapper {
7+
8+
public AbstractExecutionChannelHandlerWrapper(ChannelHandler handler) {
9+
super(handler);
10+
}
11+
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.redcreen.rpcplus.handler.serialize;
2+
3+
import com.redcreen.rpcplus.channel.ChannelHandler;
4+
import com.redcreen.rpcplus.handler.AbstractChannelHandlerWrapper;
5+
6+
public class AbstractSerializeChannelHandlerWrapper extends AbstractChannelHandlerWrapper {
7+
8+
public AbstractSerializeChannelHandlerWrapper(ChannelHandler handler) {
9+
super(handler);
10+
}
11+
12+
}

0 commit comments

Comments
 (0)