Skip to content

Commit 62b2cf4

Browse files
committed
feature: code cleanup, plus set max time for a socket to live in busy pool (hung connection detection)
1 parent 03ee796 commit 62b2cf4

File tree

1 file changed

+102
-57
lines changed

1 file changed

+102
-57
lines changed

src/com/danga/MemCached/SockIOPool.java

Lines changed: 102 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -73,24 +73,26 @@
7373
* int minSpareConnections = 5;
7474
* int maxSpareConnections = 50;
7575
* long maxIdleTime = 1000 * 60 * 30; // 30 minutes
76+
* long maxBusyTime = 1000 * 60 * 5; // 5 minutes
7677
* long maintThreadSleep = 1000 * 5; // 5 seconds
7778
* int socketTimeOut = 1000 * 3; // 3 seconds to block on reads
7879
* int socketConnectTO = 1000 * 3; // 3 seconds to block on initial connections. If 0, then will use blocking connect (default)
7980
* boolean failover = false; // turn off auto-failover in event of server down
8081
* boolean nagleAlg = false; // turn off Nagle's algorithm on all sockets in pool
8182
*
8283
* SockIOPool pool = SockIOPool.getInstance();
83-
* pool.setServers(serverlist);
84-
* pool.setWeights(weights);
85-
* pool.setInitConn(initialConnections);
86-
* pool.setMinConn(minSpareConnections);
87-
* pool.setMaxConn(maxSpareConnections);
88-
* pool.setMaxIdle(maxIdleTime);
89-
* pool.setMaintSleep(maintThreadSleep);
90-
* pool.setSocketTO(socketTimeOut);
91-
* pool.setSocketConnectTO(socketConnectTO);
92-
* pool.setNagle(nagleAlg);
93-
* pool.setHashingAlg(SockIOPool.NEW_COMPAT_HASH);
84+
* pool.setServers( serverlist );
85+
* pool.setWeights( weights );
86+
* pool.setInitConn( initialConnections );
87+
* pool.setMinConn( minSpareConnections );
88+
* pool.setMaxConn( maxSpareConnections );
89+
* pool.setMaxIdle( maxIdleTime );
90+
* pool.setMaxBusyTime( maxBusyTime );
91+
* pool.setMaintSleep( maintThreadSleep );
92+
* pool.setSocketTO( socketTimeOut );
93+
* pool.setSocketConnectTO( socketConnectTO );
94+
* pool.setNagle( nagleAlg );
95+
* pool.setHashingAlg( SockIOPool.NEW_COMPAT_HASH );
9496
* pool.initialize();
9597
* }
9698
* </pre>
@@ -99,13 +101,13 @@
99101
* The client must always close the SockIO object when finished, which will return the connection back to the pool.<br/>
100102
* <h3>An example of retrieving a SockIO object:</h3>
101103
* <pre>
102-
* SockIOPool.SockIO sock = SockIOPool.getInstance().getSock(key);
104+
* SockIOPool.SockIO sock = SockIOPool.getInstance().getSock( key );
103105
* try {
104-
* sock.write("version\r\n");
106+
* sock.write( "version\r\n" );
105107
* sock.flush();
106-
* System.out.println("Version: " + sock.readLine());
108+
* System.out.println( "Version: " + sock.readLine() );
107109
* }
108-
* catch (IOException ioe) { System.out.println("io exception thrown") };
110+
* catch (IOException ioe) { System.out.println( "io exception thrown" ) };
109111
*
110112
* sock.close();
111113
* </pre>
@@ -139,6 +141,7 @@ public class SockIOPool {
139141
private int minConn = 3;
140142
private int maxConn = 10;
141143
private long maxIdle = 1000 * 60 * 3; // max idle time for avail sockets
144+
private long maxBusyTime = 1000 * 60 * 5; // max idle time for avail sockets
142145
private long maintSleep = 1000 * 5; // maintenance thread sleep time
143146
private int socketTO = 1000 * 10; // default timeout of socket reads
144147
private int socketConnectTO = 0; // default timeout of socket connections
@@ -278,6 +281,20 @@ public static synchronized SockIOPool getInstance() {
278281
*/
279282
public long getMaxIdle() { return this.maxIdle; }
280283

284+
/**
285+
* Sets the max busy time for threads in the busy pool.
286+
*
287+
* @param maxBusyTime idle time in ms
288+
*/
289+
public void setMaxBusyTime( long maxBusyTime ) { this.maxBusyTime = maxBusyTime; }
290+
291+
/**
292+
* Returns the current max busy setting.
293+
*
294+
* @return max busy setting in ms
295+
*/
296+
public long getMaxBusy() { return this.maxBusyTime; }
297+
281298
/**
282299
* Set the sleep time between runs of the pool maintenance thread.
283300
* If set to 0, then the maint thread will not be started.
@@ -988,9 +1005,7 @@ protected synchronized void selfMaint() {
9881005
for ( Iterator i = availPool.keySet().iterator(); i.hasNext(); ) {
9891006
String host = (String)i.next();
9901007
Map sockets = (Map)availPool.get(host);
991-
Map bSockets = (Map)busyPool.get(host);
992-
log.debug( "++++ Size of avail pool for host (" + host + ") = " + sockets.size() );
993-
log.debug( "++++ Size of busy pool for host (" + host + ") = " + bSockets.size() );
1008+
log.error( "++++ Size of avail pool for host (" + host + ") = " + sockets.size() );
9941009

9951010
// if pool is too small (n < minSpare)
9961011
if ( sockets.size() < minConn ) {
@@ -1006,7 +1021,6 @@ protected synchronized void selfMaint() {
10061021

10071022
addSocketToPool( availPool, host, socket );
10081023
}
1009-
10101024
}
10111025
else if ( sockets.size() > maxConn ) {
10121026
// need to close down some sockets
@@ -1022,7 +1036,7 @@ else if ( sockets.size() > maxConn ) {
10221036

10231037
// remove stale entries
10241038
SockIO socket = (SockIO)j.next();
1025-
long expire = ((Long)sockets.get(socket)).longValue();
1039+
long expire = ((Long)sockets.get( socket )).longValue();
10261040

10271041
// if past idle time
10281042
// then close socket
@@ -1048,6 +1062,38 @@ else if ( sockets.size() > maxConn ) {
10481062
createShift.put( host, new Integer( 0 ) );
10491063
}
10501064

1065+
// go through busy sockets and destroy sockets
1066+
// as needed to maintain pool settings
1067+
for ( Iterator i = busyPool.keySet().iterator(); i.hasNext(); ) {
1068+
String host = (String)i.next();
1069+
Map sockets = (Map)busyPool.get( host );
1070+
log.error( "++++ Size of busy pool for host (" + host + ") = " + sockets.size() );
1071+
1072+
// loop through all connections and check to see if we have any hung connections
1073+
for ( Iterator j = sockets.keySet().iterator(); j.hasNext(); ) {
1074+
// remove stale entries
1075+
SockIO socket = (SockIO)j.next();
1076+
long hungTime = ((Long)sockets.get( socket )).longValue();
1077+
1078+
// if past max busy time
1079+
// then close socket
1080+
// and remove from pool
1081+
if ( (hungTime + maxBusyTime) < System.currentTimeMillis() ) {
1082+
log.error( "+++ removing potentially hung connection from busy pool ... socket in pool for " + (System.currentTimeMillis() - hungTime) + "ms" );
1083+
try {
1084+
socket.trueClose();
1085+
}
1086+
catch ( IOException ioe ) {
1087+
log.error( "failed to close socket" );
1088+
log.error( ioe.getMessage(), ioe );
1089+
}
1090+
1091+
j.remove();
1092+
socket = null;
1093+
}
1094+
}
1095+
}
1096+
10511097
log.debug( "+++ ending self maintenance." );
10521098
}
10531099

@@ -1189,15 +1235,15 @@ public SockIO( SockIOPool pool, String host, int timeout, int connectTimeout, bo
11891235
? getSocket( ip[ 0 ], Integer.parseInt( ip[ 1 ] ), connectTimeout )
11901236
: new Socket( ip[ 0 ], Integer.parseInt( ip[ 1 ] ) );
11911237

1192-
if (timeout >= 0)
1193-
sock.setSoTimeout(timeout);
1238+
if ( timeout >= 0 )
1239+
sock.setSoTimeout( timeout );
11941240

11951241
// testing only
1196-
sock.setTcpNoDelay(noDelay);
1242+
sock.setTcpNoDelay( noDelay );
11971243

11981244
// wrap streams
1199-
in = new DataInputStream(sock.getInputStream());
1200-
out = new BufferedOutputStream(sock.getOutputStream());
1245+
in = new DataInputStream( sock.getInputStream() );
1246+
out = new BufferedOutputStream( sock.getOutputStream() );
12011247
this.host = host;
12021248
}
12031249

@@ -1216,38 +1262,38 @@ public SockIO( SockIOPool pool, String host, int timeout, int connectTimeout, bo
12161262
protected static Socket getSocket( String host, int port, int timeout ) throws IOException {
12171263

12181264
// Create a new thread which will attempt to connect to host:port, and start it running
1219-
ConnectThread thread = new ConnectThread(host, port);
1265+
ConnectThread thread = new ConnectThread( host, port );
12201266
thread.start();
12211267

12221268
Socket socket = null;
12231269
int timer = 0;
12241270
int sleep = 25;
12251271

1226-
while (timer < timeout) {
1272+
while ( timer < timeout ) {
12271273

12281274
// if the thread has a connected socket
12291275
// then return it
1230-
if (thread.isConnected())
1276+
if ( thread.isConnected() )
12311277
return thread.getSocket();
12321278

12331279
// if the thread had an error
12341280
// then throw a new IOException
1235-
if (thread.isError())
1281+
if ( thread.isError() )
12361282
throw new IOException();
12371283

12381284
try {
12391285
// sleep for short time before polling again
1240-
Thread.sleep(sleep);
1286+
Thread.sleep( sleep );
12411287
}
1242-
catch (InterruptedException ie) { }
1288+
catch ( InterruptedException ie ) { }
12431289

12441290
// Increment timer
12451291
timer += sleep;
12461292
}
12471293

12481294
// made it through loop without getting connection
12491295
// the connection thread will timeout on its own at OS timeout
1250-
throw new IOException("Could not connect for " + timeout + " milliseconds");
1296+
throw new IOException( "Could not connect for " + timeout + " milliseconds" );
12511297
}
12521298

12531299
/**
@@ -1263,51 +1309,51 @@ protected static Socket getSocket( String host, int port, int timeout ) throws I
12631309
* @throws IOException if fails to close streams or socket
12641310
*/
12651311
void trueClose() throws IOException {
1266-
log.debug("++++ Closing socket for real: " + toString());
1312+
log.debug( "++++ Closing socket for real: " + toString() );
12671313

12681314
boolean err = false;
12691315
StringBuffer errMsg = new StringBuffer();
12701316

1271-
if (in == null || out == null || sock == null) {
1317+
if ( in == null || out == null || sock == null ) {
12721318
err = true;
1273-
errMsg.append("++++ socket or its streams already null in trueClose call");
1319+
errMsg.append( "++++ socket or its streams already null in trueClose call" );
12741320
}
12751321

1276-
if (in != null) {
1322+
if ( in != null ) {
12771323
try {
12781324
in.close();
12791325
}
1280-
catch(IOException ioe) {
1281-
log.error("++++ error closing input stream for socket: " + toString() + " for host: " + getHost());
1282-
log.error(ioe.getMessage(), ioe);
1283-
errMsg.append("++++ error closing input stream for socket: " + toString() + " for host: " + getHost() + "\n");
1284-
errMsg.append(ioe.getMessage());
1326+
catch( IOException ioe ) {
1327+
log.error( "++++ error closing input stream for socket: " + toString() + " for host: " + getHost() );
1328+
log.error( ioe.getMessage(), ioe );
1329+
errMsg.append( "++++ error closing input stream for socket: " + toString() + " for host: " + getHost() + "\n" );
1330+
errMsg.append( ioe.getMessage() );
12851331
err = true;
12861332
}
12871333
}
12881334

1289-
if (out != null) {
1335+
if ( out != null ) {
12901336
try {
12911337
out.close();
12921338
}
1293-
catch (IOException ioe) {
1294-
log.error("++++ error closing output stream for socket: " + toString() + " for host: " + getHost());
1295-
log.error(ioe.getMessage(), ioe);
1296-
errMsg.append("++++ error closing output stream for socket: " + toString() + " for host: " + getHost() + "\n");
1297-
errMsg.append(ioe.getMessage());
1339+
catch ( IOException ioe ) {
1340+
log.error( "++++ error closing output stream for socket: " + toString() + " for host: " + getHost() );
1341+
log.error( ioe.getMessage(), ioe );
1342+
errMsg.append( "++++ error closing output stream for socket: " + toString() + " for host: " + getHost() + "\n" );
1343+
errMsg.append( ioe.getMessage() );
12981344
err = true;
12991345
}
13001346
}
13011347

1302-
if (sock != null) {
1348+
if ( sock != null ) {
13031349
try {
13041350
sock.close();
13051351
}
1306-
catch (IOException ioe) {
1307-
log.error("++++ error closing socket: " + toString() + " for host: " + getHost());
1308-
log.error(ioe.getMessage(), ioe);
1309-
errMsg.append("++++ error closing socket: " + toString() + " for host: " + getHost() + "\n");
1310-
errMsg.append(ioe.getMessage());
1352+
catch ( IOException ioe ) {
1353+
log.error( "++++ error closing socket: " + toString() + " for host: " + getHost() );
1354+
log.error( ioe.getMessage(), ioe );
1355+
errMsg.append( "++++ error closing socket: " + toString() + " for host: " + getHost() + "\n" );
1356+
errMsg.append( ioe.getMessage() );
13111357
err = true;
13121358
}
13131359
}
@@ -1320,18 +1366,17 @@ void trueClose() throws IOException {
13201366
out = null;
13211367
sock = null;
13221368

1323-
if (err)
1324-
throw new IOException(errMsg.toString());
1369+
if ( err )
1370+
throw new IOException( errMsg.toString() );
13251371
}
13261372

13271373
/**
13281374
* sets closed flag and checks in to connection pool
13291375
* but does not close connections
13301376
*/
13311377
void close() {
1332-
log.debug("++++ marking socket (" + this.toString() + ") as closed and available to return to avail pool");
1333-
13341378
// check in to pool
1379+
log.debug("++++ marking socket (" + this.toString() + ") as closed and available to return to avail pool");
13351380
pool.checkIn( this );
13361381
}
13371382

0 commit comments

Comments
 (0)