@@ -304,10 +304,10 @@ private void populateBuckets() {
304304
305305 // Create a socket pool for each host
306306 socketPool.put(servers[i], new ConcurrentLinkedQueue<SchoonerSockIO>());
307- poolCurrentConn.put(servers[i], new AtomicInteger(initConn));
308307
309308 // Create the initial connections
310- for (int j = 0; j < initConn; j++) {
309+ int j;
310+ for (j = 0; j < initConn; j++) {
311311 SchoonerSockIO socket = createSocket(servers[i]);
312312 if (socket == null) {
313313 log.error("++++ failed to create connection to: " + servers[i] + " -- only " + j + " created.");
@@ -317,6 +317,13 @@ private void populateBuckets() {
317317 // Add this new connection to socket pool
318318 addSocketToPool(servers[i], socket);
319319 }
320+
321+ ConcurrentLinkedQueue<SchoonerSockIO> sockets = socketPool.get(servers[i]);
322+ AtomicInteger num = new AtomicInteger(j);
323+ for (SchoonerSockIO schoonerSockIO : sockets) {
324+ schoonerSockIO.setSockNum(num);
325+ }
326+ poolCurrentConn.put(servers[i], num);
320327 }
321328 }
322329
@@ -351,9 +358,9 @@ private void populateConsistentBuckets() {
351358
352359 // Create a socket pool for each host
353360 socketPool.put(servers[i], new ConcurrentLinkedQueue<SchoonerSockIO>());
354- poolCurrentConn.put(servers[i], new AtomicInteger(initConn));
355361
356- for (int j = 0; j < initConn; j++) {
362+ int j;
363+ for (j = 0; j < initConn; j++) {
357364 SchoonerSockIO socket = createSocket(servers[i]);
358365 if (socket == null) {
359366 log.error("++++ failed to create connection to: " + servers[i] + " -- only " + j + " created.");
@@ -363,6 +370,13 @@ private void populateConsistentBuckets() {
363370 // Add this new connection to socket pool
364371 addSocketToPool(servers[i], socket);
365372 }
373+
374+ ConcurrentLinkedQueue<SchoonerSockIO> sockets = socketPool.get(servers[i]);
375+ AtomicInteger num = new AtomicInteger(j);
376+ for (SchoonerSockIO schoonerSockIO : sockets) {
377+ schoonerSockIO.setSockNum(num);
378+ }
379+ poolCurrentConn.put(servers[i], num);
366380 }
367381 }
368382
@@ -409,6 +423,7 @@ protected final SchoonerSockIO createSocketWithAdd(String host) {
409423 log.error("++++ failed to get SockIO obj for: " + host);
410424 // log.error(ex.getMessage(), ex);
411425 socket = null;
426+ poolCurrentConn.get(host).decrementAndGet();
412427 }
413428
414429 return socket;
@@ -539,6 +554,7 @@ public final SchoonerSockIO getConnection(String host) {
539554 // if we have items in the pool then we can return it
540555 ConcurrentLinkedQueue<SchoonerSockIO> sockets = socketPool.get(host);
541556 SchoonerSockIO socket = sockets.poll();
557+ System.out.println(poolCurrentConn.get(host).get());
542558 if (socket == null) {
543559 if (poolCurrentConn.get(host).get() < maxConn) {
544560 socket = createSocketWithAdd(host);
@@ -1049,11 +1065,6 @@ public int getBufferSize() {
10491065
10501066 public static class UDPSockIO extends SchoonerSockIO {
10511067
1052- // logger
1053- // private static Logger log =
1054- // Logger.getLogger(UDPSockIO.class.getName());
1055- private ConcurrentLinkedQueue<SchoonerSockIO> sockets;
1056-
10571068 /**
10581069 *
10591070 * <p>
@@ -1130,8 +1141,6 @@ public synchronized void addLength(int alength) {
11301141
11311142 private Selector selector;
11321143
1133- private AtomicInteger sockNum;
1134-
11351144 @Override
11361145 public void trueClose() throws IOException {
11371146 if (selector != null) {
@@ -1154,8 +1163,9 @@ public UDPSockIO(SchoonerSockIOPool pool, String host, int bufferSize, int timeo
11541163 channel.socket().setSoTimeout(timeout);
11551164 selector = Selector.open();
11561165 ((DatagramChannel) channel).register(selector, SelectionKey.OP_READ);
1157- this.sockets = pool.socketPool.get(host);
1158- this.sockNum = pool.poolCurrentConn.get(host);
1166+ writeBuf = ByteBuffer.allocateDirect(bufferSize);
1167+ sockets = pool.socketPool.get(host);
1168+ sockNum = pool.poolCurrentConn.get(host);
11591169 }
11601170
11611171 @Override
@@ -1341,10 +1351,6 @@ public static class TCPSockIO extends SchoonerSockIO {
13411351
13421352 private int hash = 0;
13431353
1344- private ConcurrentLinkedQueue<SchoonerSockIO> sockets;
1345-
1346- private AtomicInteger sockNum;
1347-
13481354 /**
13491355 * creates a new SockIO object wrapping a socket connection to
13501356 * host:port, and its input and output streams
@@ -1372,6 +1378,8 @@ public TCPSockIO(SchoonerSockIOPool pool, String host, int bufferSize, int timeo
13721378
13731379 // get socket: default is to use non-blocking connect
13741380 sock = getSocket(ip[0], Integer.parseInt(ip[1]), connectTimeout);
1381+
1382+ writeBuf = ByteBuffer.allocateDirect(bufferSize);
13751383
13761384 if (timeout >= 0)
13771385 this.sock.setSoTimeout(timeout);
@@ -1383,8 +1391,8 @@ public TCPSockIO(SchoonerSockIOPool pool, String host, int bufferSize, int timeo
13831391 sockChannel = sock.getChannel();
13841392 hash = sock.hashCode();
13851393 this.host = host;
1386- this. sockets = pool.socketPool.get(host);
1387- this. sockNum = pool.poolCurrentConn.get(host);
1394+ sockets = pool.socketPool.get(host);
1395+ sockNum = pool.poolCurrentConn.get(host);
13881396 }
13891397
13901398 /**
0 commit comments