Skip to content

Commit 36c04b6

Browse files
committed
tweaks to locking issue
1 parent 3827a91 commit 36c04b6

File tree

3 files changed

+32
-15
lines changed

3 files changed

+32
-15
lines changed

src/com/danga/MemCached/MemCachedClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2025,7 +2025,7 @@ public void loadItemsNIO( boolean asString, Map<String, StringBuilder> sockKeys,
20252025
timeRemaining = timeout;
20262026

20272027
while ( numConns > 0 && timeRemaining > 0 ) {
2028-
int n = selector.select( timeRemaining );
2028+
int n = selector.select( 3000 );
20292029
if ( n > 0 ) {
20302030
// we've got some activity; handle it
20312031
//
@@ -2038,6 +2038,7 @@ public void loadItemsNIO( boolean asString, Map<String, StringBuilder> sockKeys,
20382038
}
20392039
else {
20402040
// timeout likely... better check
2041+
log.error( "selector timed out waiting for activity" );
20412042
}
20422043

20432044
timeRemaining = timeout - (System.currentTimeMillis() - startTime);
@@ -2126,7 +2127,7 @@ public void readResponse( SelectionKey key ) throws IOException {
21262127
InetAddress remote = conn.channel.socket().getInetAddress();
21272128

21282129
ByteBuffer buf = conn.getBuffer();
2129-
int count = conn.channel.read(buf);
2130+
int count = conn.channel.read( buf );
21302131
if ( count > 0 ) {
21312132
if ( log.isDebugEnabled() )
21322133
log.debug( "read " + count + " from " + remote );

src/com/danga/MemCached/SockIOPool.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public class SockIOPool {
167167
// set to hold sockets to close
168168
private Map<String,Map<SockIO,Long>> availPool;
169169
private Map<String,Map<SockIO,Long>> busyPool;
170-
private Set<SockIO> deadPool;;
170+
private Map<SockIO,Integer> deadPool;;
171171

172172
// empty constructor
173173
protected SockIOPool() { }
@@ -498,7 +498,7 @@ public void initialize() {
498498
// pools
499499
availPool = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );
500500
busyPool = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );
501-
deadPool = new HashSet<SockIO>();
501+
deadPool = new IdentityHashMap<SockIO,Integer>();
502502

503503
hostDeadDur = new HashMap<String,Long>();
504504
hostDead = new HashMap<String,Date>();
@@ -893,7 +893,7 @@ public SockIO getConnection( String host ) {
893893
}
894894
else {
895895
// add to deadpool for later reaping
896-
deadPool.add( socket );
896+
deadPool.put( socket, new Integer(0) );
897897

898898
// remove from avail pool
899899
i.remove();
@@ -1136,7 +1136,6 @@ protected void stopMaintThread() {
11361136
protected void selfMaint() {
11371137
log.debug( "++++ Starting self maintenance...." );
11381138

1139-
11401139
// go through avail sockets and create sockets
11411140
// as needed to maintain pool settings
11421141
Map<String,Set<SockIO>> newSockets =
@@ -1208,8 +1207,15 @@ protected void selfMaint() {
12081207
if ( (expire + maxIdle) < System.currentTimeMillis() ) {
12091208
log.debug( "+++ removing stale entry from pool as it is past its idle timeout and pool is over max spare" );
12101209

1211-
// add to deadPool for later reaping
1212-
deadPool.add( socket );
1210+
if ( socket.isConnected() ) {
1211+
// add to busy pool
1212+
log.debug( "++++ moving socket for host (" + host + ") to busy pool ... socket: " + socket );
1213+
addSocketToPool( busyPool, host, socket );
1214+
}
1215+
else {
1216+
// add to deadPool for later reaping
1217+
deadPool.put( socket, new Integer( 0 ) );
1218+
}
12131219

12141220
// remove from the availPool
12151221
j.remove();
@@ -1240,8 +1246,15 @@ protected void selfMaint() {
12401246
if ( (hungTime + maxBusyTime) < System.currentTimeMillis() ) {
12411247
log.error( "+++ removing potentially hung connection from busy pool ... socket in pool for " + (System.currentTimeMillis() - hungTime) + "ms" );
12421248

1243-
// add to deadPool for later reaping
1244-
deadPool.add( socket );
1249+
if ( socket.isConnected() ) {
1250+
// add to busy pool
1251+
log.debug( "++++ moving socket for host (" + host + ") to busy pool ... socket: " + socket );
1252+
addSocketToPool( busyPool, host, socket );
1253+
}
1254+
else {
1255+
// add to deadPool for later reaping
1256+
deadPool.put( socket, new Integer( 0 ) );
1257+
}
12451258

12461259
// remove from the busy pool
12471260
j.remove();
@@ -1250,10 +1263,14 @@ protected void selfMaint() {
12501263
}
12511264
}
12521265

1253-
// finally clean out the deadPool -- no need to lock as these are not in any pool
1254-
for ( Iterator<SockIO> i = deadPool.iterator(); i.hasNext(); ) {
1266+
// finally clean out the deadPool
1267+
Set<SockIO> toClose;
1268+
synchronized( deadPool ) {
1269+
toClose = deadPool.keySet();
1270+
deadPool = new IdentityHashMap<SockIO,Integer>();
1271+
}
12551272

1256-
SockIO socket = i.next();
1273+
for ( SockIO socket : toClose ) {
12571274
try {
12581275
socket.trueClose();
12591276
}
@@ -1262,8 +1279,6 @@ protected void selfMaint() {
12621279
log.error( ex.getMessage(), ex );
12631280
socket = null;
12641281
}
1265-
1266-
i.remove();
12671282
}
12681283

12691284
log.debug( "+++ ending self maintenance." );

src/com/danga/MemCached/test/UnitTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ public static void main(String[] args) {
267267
pool.setServers( serverlist );
268268
pool.setMaxConn( 250 );
269269
pool.setNagle( false );
270+
pool.setMaintSleep( 1000 );
270271
pool.initialize();
271272

272273
mc = new MemCachedClient( "test" );

0 commit comments

Comments
 (0)