Skip to content

Commit 3beb0c8

Browse files
committed
More work on cloud
1 parent 05bf729 commit 3beb0c8

File tree

6 files changed

+110
-165
lines changed

6 files changed

+110
-165
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package org.encog.cloud;
2+
3+
public interface CloudListener {
4+
void notifyPacket();
5+
void notifyConnections();
6+
}

src/main/java/org/encog/cloud/basic/CloudPacket.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,28 @@
2323
*/
2424
package org.encog.cloud.basic;
2525

26+
import java.util.List;
27+
2628
public class CloudPacket {
2729

28-
public static final int PACKET_STATUS = 0;
29-
public static final int PACKET_LOGIN = 1;
30-
public static final int PACKET_LOGOUT = 2;
31-
public static final int PACKET_IDENTIFY = 3;
32-
33-
private final int command;
30+
private final String command;
3431
private final String[] args;
3532

36-
public CloudPacket(int c, String[] a) {
37-
this.command = c;
33+
public CloudPacket(List<String> list) {
34+
this.command = list.get(0).toUpperCase();
3835

39-
if( a==null ) {
36+
if( list.size() == 1 ) {
4037
this.args = new String[0];
4138
} else {
42-
this.args = new String[a.length];
39+
this.args = new String[list.size()-1];
4340
}
4441

45-
for(int i=0;i<this.args.length;i++) {
46-
this.args[i] = a[i];
42+
for(int i=0;i<list.size()-1;i++) {
43+
this.args[i] = list.get(i+1);
4744
}
4845
}
4946

50-
public int getCommand() {
47+
public String getCommand() {
5148
return command;
5249
}
5350

src/main/java/org/encog/cloud/basic/CommunicationLink.java

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@
3030
import java.io.InputStreamReader;
3131
import java.io.OutputStream;
3232
import java.net.Socket;
33-
import java.net.SocketTimeoutException;
34-
import java.util.ArrayList;
3533
import java.util.List;
3634

3735
import org.encog.EncogError;
36+
import org.encog.cloud.node.CloudNode;
37+
import org.encog.util.csv.CSVFormat;
38+
import org.encog.util.csv.ParseCSVLine;
3839
import org.encog.util.logging.EncogLogging;
3940

4041
public class CommunicationLink {
@@ -44,6 +45,9 @@ public class CommunicationLink {
4445
private DataOutputStream outputToRemote;
4546
private BufferedReader inputFromRemote;
4647
private OutputStream socketOut;
48+
private ParseCSVLine parseLine = new ParseCSVLine(CSVFormat.EG_FORMAT);
49+
private int packets;
50+
private CloudNode parentNode;
4751

4852
public static String simpleHash(String str) {
4953
int result = 0;
@@ -56,8 +60,9 @@ public static String simpleHash(String str) {
5660
return Integer.toHexString(result).toLowerCase();
5761
}
5862

59-
public CommunicationLink(Socket s) {
63+
public CommunicationLink(CloudNode node, Socket s) {
6064
try {
65+
this.parentNode = node;
6166
this.socket = s;
6267
this.socket.setSoTimeout(SOCKET_TIMEOUT);
6368

@@ -71,15 +76,6 @@ public CommunicationLink(Socket s) {
7176

7277
}
7378

74-
public void writePacket(int command) {
75-
String[] args = new String[0];
76-
writePacket(command,args);
77-
}
78-
79-
public void writePacketLogin(String uid, String pwd) {
80-
String[] args = { uid, pwd };
81-
writePacket(CloudPacket.PACKET_LOGIN,args);
82-
}
8379

8480
public void writePacket(int command, String[] args) {
8581
try {
@@ -123,24 +119,19 @@ public void writePacket(int command, String[] args) {
123119
}
124120
}
125121

126-
public CloudPacket readPacket() throws IOException {
122+
public CloudPacket readPacket() {
127123

128-
String[] args = null;
129-
String str = this.inputFromRemote.readLine();
130-
EncogLogging.log(EncogLogging.LEVEL_DEBUG, "Received Packet: " + str);
131-
return null;
132-
}
133-
134-
public void writeStatus(boolean s, String message) {
135-
String[] args = new String[2];
136-
args[0] = s ? "1":"0";
137-
args[1] = message;
138-
writePacket(CloudPacket.PACKET_STATUS,args);
139-
}
140-
141-
public void writePacketIdentify(String name) {
142-
String[] args = { name };
143-
writePacket(CloudPacket.PACKET_IDENTIFY,args);
124+
try {
125+
String str = this.inputFromRemote.readLine();
126+
List<String> list = parseLine.parse(str);
127+
this.packets++;
128+
this.parentNode.notifyListenersPacket();
129+
130+
EncogLogging.log(EncogLogging.LEVEL_DEBUG, "Received Packet: " + str);
131+
return new CloudPacket(list);
132+
} catch(IOException ex) {
133+
throw new CloudError(ex);
134+
}
144135

145136
}
146137

@@ -153,4 +144,12 @@ private void flushOutput() {
153144
}
154145

155146
}
147+
148+
public Socket getSocket() {
149+
return this.socket;
150+
}
151+
152+
public int getPackets() {
153+
return this.packets;
154+
}
156155
}

src/main/java/org/encog/cloud/client/CloudClient.java

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public CloudClient(String n, String h, int p) {
4343
this.port = p;
4444
this.host = h;
4545
Socket clientSocket = new Socket(h, port);
46-
this.link = new CommunicationLink(clientSocket);
46+
//this.link = new CommunicationLink(clientSocket);
4747
} catch (IOException e) {
4848
throw new CloudError(e);
4949
}
@@ -56,61 +56,4 @@ public CloudClient(String n, String h) {
5656
public CloudClient(String n) {
5757
this(n, "localhost");
5858
}
59-
60-
private void expectSuccessPacket() throws IOException {
61-
CloudPacket packet = this.link.readPacket();
62-
63-
if( packet==null ) {
64-
throw new CloudError("Timeout waiting for a response packet.");
65-
}
66-
67-
if (packet.getCommand() != CloudPacket.PACKET_STATUS) {
68-
throw new CloudError("Wrong packet type: " + packet.getCommand());
69-
}
70-
71-
String[] args = packet.getArgs();
72-
73-
if (args.length < 2) {
74-
throw new CloudError("Wrong number of string arguments: "
75-
+ args.length);
76-
}
77-
78-
if (!args[0].equals("1")) {
79-
throw new CloudError("Error: " + args[1]);
80-
}
81-
82-
83-
}
84-
85-
public void login(String uid, String pwd) {
86-
try {
87-
this.link.writePacketLogin(uid, CommunicationLink.simpleHash(pwd));
88-
expectSuccessPacket();
89-
} catch (IOException ex) {
90-
throw new CloudError(ex);
91-
}
92-
93-
}
94-
95-
public void logout() {
96-
try {
97-
this.link.writePacket(CloudPacket.PACKET_LOGOUT);
98-
this.link.readPacket();
99-
} catch (IOException ex) {
100-
throw new CloudError(ex);
101-
}
102-
}
103-
104-
public static void main(String argv[]) throws Exception {
105-
CloudClient client = new CloudClient("local");
106-
client.login("test", "test");
107-
client.identify();
108-
client.logout();
109-
}
110-
111-
public void identify() throws IOException {
112-
this.link.writePacketIdentify(this.name);
113-
expectSuccessPacket();
114-
115-
}
11659
}

src/main/java/org/encog/cloud/node/CloudNode.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@
3131
import java.util.List;
3232
import java.util.Map;
3333

34-
import org.encog.cloud.basic.CommunicationLink;
34+
import org.encog.cloud.CloudListener;
3535
import org.encog.cloud.basic.CloudError;
36+
import org.encog.cloud.basic.CommunicationLink;
3637
import org.encog.util.logging.EncogLogging;
3738

3839
public class CloudNode implements Runnable {
@@ -43,6 +44,8 @@ public class CloudNode implements Runnable {
4344
private Map<String, String> accounts = new HashMap<String, String>();
4445
private Map<String, List<CloudSignal>> signals = new HashMap<String, List<CloudSignal>>();
4546
private boolean running;
47+
private final List<HandleClient> connections = new ArrayList<HandleClient>();
48+
private final List<CloudListener> listeners = new ArrayList<CloudListener>();
4649

4750
public CloudNode(int p) {
4851
this.port = p;
@@ -68,8 +71,10 @@ public void run() {
6871
EncogLogging.log(EncogLogging.LEVEL_DEBUG, "Begin listen");
6972
Socket connectionSocket = listenSocket.accept();
7073
EncogLogging.log(EncogLogging.LEVEL_DEBUG, "Connection from: " + connectionSocket.getRemoteSocketAddress().toString());
71-
CommunicationLink link = new CommunicationLink(connectionSocket);
74+
CommunicationLink link = new CommunicationLink(this,connectionSocket);
75+
notifyListenersConnections();
7276
HandleClient hc = new HandleClient(this, link);
77+
this.connections.add(hc);
7378
Thread t = new Thread(hc);
7479
t.start();
7580
} catch (IOException ex) {
@@ -115,4 +120,40 @@ public void shutdown() {
115120
public int getPort() {
116121
return this.port;
117122
}
123+
124+
public List<HandleClient> getConnections() {
125+
return this.connections;
126+
}
127+
128+
/**
129+
* @return the listeners
130+
*/
131+
public List<CloudListener> getListeners() {
132+
return listeners;
133+
}
134+
135+
public void addListener(CloudListener listener) {
136+
this.listeners.add(listener);
137+
}
138+
139+
public void removeListener(CloudListener listener) {
140+
this.listeners.remove(listener);
141+
}
142+
143+
public void clearListeners() {
144+
this.listeners.clear();
145+
}
146+
147+
public void notifyListenersConnections() {
148+
for( CloudListener listener : this.listeners ) {
149+
listener.notifyConnections();
150+
}
151+
}
152+
153+
public void notifyListenersPacket() {
154+
for( CloudListener listener : this.listeners ) {
155+
listener.notifyPacket();
156+
}
157+
}
158+
118159
}

0 commit comments

Comments
 (0)