Skip to content

Commit dd0f3f1

Browse files
author
mengli
committed
Fix OutofMemory bug when the client can't connect to the server.
1 parent 84c4698 commit dd0f3f1

File tree

3 files changed

+47
-25
lines changed

3 files changed

+47
-25
lines changed

src/main/com/schooner/MemCached/AscIIClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ private boolean set(String cmdname, String key, Object value, Date expiry, Integ
442442
*/
443443
return true;
444444
}
445-
} catch (IOException e) {
445+
} catch (Exception e) {
446446
// if we have an errorHandler, use its hook
447447
if (errorHandler != null)
448448
errorHandler.handleErrorOnSet(this, e, key);
@@ -599,7 +599,7 @@ private long incrdecr(String cmdname, String key, long inc, Integer hashCode) {
599599
log.error(new StringBuffer().append("++++ error incr/decr key: ").append(key).toString());
600600
log.error(new StringBuffer().append("++++ server response: ").append(line).toString());
601601
}
602-
} catch (IOException e) {
602+
} catch (Exception e) {
603603

604604
// if we have an errorHandler, use its hook
605605
if (errorHandler != null)
@@ -789,7 +789,7 @@ private Object get(String cmd, String key, Integer hashCode, boolean asString) {
789789
}
790790
sock.readBuf.clear();
791791
return o;
792-
} catch (IOException ce) {
792+
} catch (Exception ce) {
793793
// if we have an errorHandler, use its hook
794794
if (errorHandler != null)
795795
errorHandler.handleErrorOnGet(this, ce, key);
@@ -930,7 +930,7 @@ public MemcachedItem gets(String cmd, String key, Integer hashCode, boolean asSt
930930
item.value = o;
931931
return item;
932932

933-
} catch (IOException ce) {
933+
} catch (Exception ce) {
934934
// if we have an errorHandler, use its hook
935935
if (errorHandler != null)
936936
errorHandler.handleErrorOnGet(this, ce, key);
@@ -1417,7 +1417,7 @@ private Map<String, Map<String, String>> stats(String[] servers, String command,
14171417

14181418
statsMaps.put(servers[i], stats);
14191419
}
1420-
} catch (IOException e) {
1420+
} catch (Exception e) {
14211421

14221422
// if we have an errorHandler, use its hook
14231423
if (errorHandler != null)

src/main/com/schooner/MemCached/SchoonerSockIO.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.net.UnknownHostException;
3333
import java.nio.ByteBuffer;
3434
import java.nio.channels.ByteChannel;
35+
import java.util.concurrent.ConcurrentLinkedQueue;
36+
import java.util.concurrent.atomic.AtomicInteger;
3537

3638
import com.danga.MemCached.SockIOPool;
3739

@@ -48,7 +50,6 @@ public abstract class SchoonerSockIO extends SockIOPool.SockIO {
4850
public SchoonerSockIO(int bufferSize) throws UnknownHostException, IOException {
4951
super(null, null, 0, 0, false);
5052
this.bufferSize = bufferSize;
51-
writeBuf = ByteBuffer.allocateDirect(bufferSize);
5253
}
5354

5455
private int bufferSize = 1024 * 1025;
@@ -58,6 +59,10 @@ public SchoonerSockIO(int bufferSize) throws UnknownHostException, IOException {
5859
public ByteBuffer writeBuf;
5960

6061
protected boolean isPooled = true;
62+
63+
protected ConcurrentLinkedQueue<SchoonerSockIO> sockets;
64+
65+
protected AtomicInteger sockNum;
6166

6267
public abstract short preWrite();
6368

@@ -86,4 +91,13 @@ public void setPooled(boolean isPooled) {
8691
public boolean isPooled() {
8792
return isPooled;
8893
}
94+
95+
public AtomicInteger getSockNum() {
96+
return sockNum;
97+
}
98+
99+
public void setSockNum(AtomicInteger sockNum) {
100+
this.sockNum = sockNum;
101+
}
102+
89103
}

src/main/com/schooner/MemCached/SchoonerSockIOPool.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -304,10 +304,10 @@ private void populateBuckets() {
304304

305305
// Create a socket pool for each host
306306
socketPool.put(servers[i], new ConcurrentLinkedQueue<SchoonerSockIO>());
307-
poolCurrentConn.put(servers[i], new AtomicInteger(initConn));
308307

309308
// Create the initial connections
310-
for (int j = 0; j < initConn; j++) {
309+
int j;
310+
for (j = 0; j < initConn; j++) {
311311
SchoonerSockIO socket = createSocket(servers[i]);
312312
if (socket == null) {
313313
log.error("++++ failed to create connection to: " + servers[i] + " -- only " + j + " created.");
@@ -317,6 +317,13 @@ private void populateBuckets() {
317317
// Add this new connection to socket pool
318318
addSocketToPool(servers[i], socket);
319319
}
320+
321+
ConcurrentLinkedQueue<SchoonerSockIO> sockets = socketPool.get(servers[i]);
322+
AtomicInteger num = new AtomicInteger(j);
323+
for (SchoonerSockIO schoonerSockIO : sockets) {
324+
schoonerSockIO.setSockNum(num);
325+
}
326+
poolCurrentConn.put(servers[i], num);
320327
}
321328
}
322329

@@ -351,9 +358,9 @@ private void populateConsistentBuckets() {
351358

352359
// Create a socket pool for each host
353360
socketPool.put(servers[i], new ConcurrentLinkedQueue<SchoonerSockIO>());
354-
poolCurrentConn.put(servers[i], new AtomicInteger(initConn));
355361

356-
for (int j = 0; j < initConn; j++) {
362+
int j;
363+
for (j = 0; j < initConn; j++) {
357364
SchoonerSockIO socket = createSocket(servers[i]);
358365
if (socket == null) {
359366
log.error("++++ failed to create connection to: " + servers[i] + " -- only " + j + " created.");
@@ -363,6 +370,13 @@ private void populateConsistentBuckets() {
363370
// Add this new connection to socket pool
364371
addSocketToPool(servers[i], socket);
365372
}
373+
374+
ConcurrentLinkedQueue<SchoonerSockIO> sockets = socketPool.get(servers[i]);
375+
AtomicInteger num = new AtomicInteger(j);
376+
for (SchoonerSockIO schoonerSockIO : sockets) {
377+
schoonerSockIO.setSockNum(num);
378+
}
379+
poolCurrentConn.put(servers[i], num);
366380
}
367381
}
368382

@@ -409,6 +423,7 @@ protected final SchoonerSockIO createSocketWithAdd(String host) {
409423
log.error("++++ failed to get SockIO obj for: " + host);
410424
// log.error(ex.getMessage(), ex);
411425
socket = null;
426+
poolCurrentConn.get(host).decrementAndGet();
412427
}
413428

414429
return socket;
@@ -539,6 +554,7 @@ public final SchoonerSockIO getConnection(String host) {
539554
// if we have items in the pool then we can return it
540555
ConcurrentLinkedQueue<SchoonerSockIO> sockets = socketPool.get(host);
541556
SchoonerSockIO socket = sockets.poll();
557+
System.out.println(poolCurrentConn.get(host).get());
542558
if (socket == null) {
543559
if (poolCurrentConn.get(host).get() < maxConn) {
544560
socket = createSocketWithAdd(host);
@@ -1049,11 +1065,6 @@ public int getBufferSize() {
10491065

10501066
public static class UDPSockIO extends SchoonerSockIO {
10511067

1052-
// logger
1053-
// private static Logger log =
1054-
// Logger.getLogger(UDPSockIO.class.getName());
1055-
private ConcurrentLinkedQueue<SchoonerSockIO> sockets;
1056-
10571068
/**
10581069
*
10591070
* <p>
@@ -1130,8 +1141,6 @@ public synchronized void addLength(int alength) {
11301141

11311142
private Selector selector;
11321143

1133-
private AtomicInteger sockNum;
1134-
11351144
@Override
11361145
public void trueClose() throws IOException {
11371146
if (selector != null) {
@@ -1154,8 +1163,9 @@ public UDPSockIO(SchoonerSockIOPool pool, String host, int bufferSize, int timeo
11541163
channel.socket().setSoTimeout(timeout);
11551164
selector = Selector.open();
11561165
((DatagramChannel) channel).register(selector, SelectionKey.OP_READ);
1157-
this.sockets = pool.socketPool.get(host);
1158-
this.sockNum = pool.poolCurrentConn.get(host);
1166+
writeBuf = ByteBuffer.allocateDirect(bufferSize);
1167+
sockets = pool.socketPool.get(host);
1168+
sockNum = pool.poolCurrentConn.get(host);
11591169
}
11601170

11611171
@Override
@@ -1341,10 +1351,6 @@ public static class TCPSockIO extends SchoonerSockIO {
13411351

13421352
private int hash = 0;
13431353

1344-
private ConcurrentLinkedQueue<SchoonerSockIO> sockets;
1345-
1346-
private AtomicInteger sockNum;
1347-
13481354
/**
13491355
* creates a new SockIO object wrapping a socket connection to
13501356
* host:port, and its input and output streams
@@ -1372,6 +1378,8 @@ public TCPSockIO(SchoonerSockIOPool pool, String host, int bufferSize, int timeo
13721378

13731379
// get socket: default is to use non-blocking connect
13741380
sock = getSocket(ip[0], Integer.parseInt(ip[1]), connectTimeout);
1381+
1382+
writeBuf = ByteBuffer.allocateDirect(bufferSize);
13751383

13761384
if (timeout >= 0)
13771385
this.sock.setSoTimeout(timeout);
@@ -1383,8 +1391,8 @@ public TCPSockIO(SchoonerSockIOPool pool, String host, int bufferSize, int timeo
13831391
sockChannel = sock.getChannel();
13841392
hash = sock.hashCode();
13851393
this.host = host;
1386-
this.sockets = pool.socketPool.get(host);
1387-
this.sockNum = pool.poolCurrentConn.get(host);
1394+
sockets = pool.socketPool.get(host);
1395+
sockNum = pool.poolCurrentConn.get(host);
13881396
}
13891397

13901398
/**

0 commit comments

Comments
 (0)