Skip to content

Commit b41940f

Browse files
committed
some more synch work as well as code review
1 parent 8c566bb commit b41940f

File tree

1 file changed

+80
-86
lines changed

1 file changed

+80
-86
lines changed

src/com/danga/MemCached/SockIOPool.java

Lines changed: 80 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.zip.*;
3333
import java.net.*;
3434
import java.io.*;
35+
import java.util.concurrent.locks.ReentrantLock;
3536
import org.apache.log4j.Logger;
3637

3738
/**
@@ -150,6 +151,9 @@ public class SockIOPool {
150151
private boolean nagle = true; // enable/disable Nagle's algorithm
151152
private int hashingAlg = NATIVE_HASH; // default to using the native hash as it is the fastest
152153

154+
// locks
155+
private final ReentrantLock hostDeadLock = new ReentrantLock();
156+
153157
// list of all servers
154158
private String[] servers;
155159
private Integer[] weights;
@@ -455,11 +459,11 @@ public void initialize() {
455459

456460
// initialize empty maps
457461
buckets = new ArrayList<String>();
458-
availPool = new Hashtable<String,Map<SockIO,Long>>( servers.length * initConn );
459-
busyPool = new Hashtable<String,Map<SockIO,Long>>( servers.length * initConn );
460-
hostDeadDur = new Hashtable<String,Long>();
461-
hostDead = new Hashtable<String,Date>();
462-
createShift = new Hashtable<String,Integer>();
462+
availPool = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );
463+
busyPool = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );
464+
hostDeadDur = new HashMap<String,Long>();
465+
hostDead = new HashMap<String,Date>();
466+
createShift = new HashMap<String,Integer>();
463467
maxCreate = (poolMultiplier > minConn) ? minConn : minConn / poolMultiplier; // only create up to maxCreate connections at once
464468

465469
log.debug( "++++ initializing pool with following settings:" );
@@ -508,7 +512,7 @@ public void initialize() {
508512
this.initialized = true;
509513

510514
// start maint thread
511-
if (this.maintSleep > 0)
515+
if ( this.maintSleep > 0 )
512516
this.startMaintThread();
513517
}
514518
}
@@ -539,13 +543,20 @@ protected SockIO createSocket( String host ) {
539543
// if host is dead, then we don't need to try again
540544
// until the dead status has expired
541545
// we do not try to put back in if failback is off
542-
if ( failback && hostDead.containsKey( host ) && hostDeadDur.containsKey( host ) ) {
543546

544-
Date store = hostDead.get( host );
545-
long expire = hostDeadDur.get( host ).longValue();
547+
hostDeadLock.lock();
548+
try {
549+
if ( failback && hostDead.containsKey( host ) && hostDeadDur.containsKey( host ) ) {
546550

547-
if ( (store.getTime() + expire) > System.currentTimeMillis() )
548-
return null;
551+
Date store = hostDead.get( host );
552+
long expire = hostDeadDur.get( host ).longValue();
553+
554+
if ( (store.getTime() + expire) > System.currentTimeMillis() )
555+
return null;
556+
}
557+
}
558+
finally {
559+
hostDeadLock.unlock();
549560
}
550561

551562
try {
@@ -571,20 +582,28 @@ protected SockIO createSocket( String host ) {
571582

572583
// if we failed to get socket, then mark
573584
// host dead for a duration which falls off
574-
if ( socket == null ) {
575-
Date now = new Date();
576-
hostDead.put( host, now );
577-
long expire = ( hostDeadDur.containsKey( host ) ) ? (((Long)hostDeadDur.get( host )).longValue() * 2) : 1000;
578-
hostDeadDur.put( host, new Long( expire ) );
579-
log.debug( "++++ ignoring dead host: " + host + " for " + expire + " ms" );
580-
581-
// also clear all entries for this host from availPool
582-
clearHostFromPool( availPool, host );
585+
hostDeadLock.lock();
586+
try {
587+
if ( socket == null ) {
588+
Date now = new Date();
589+
hostDead.put( host, now );
590+
long expire = ( hostDeadDur.containsKey( host ) ) ? (((Long)hostDeadDur.get( host )).longValue() * 2) : 1000;
591+
hostDeadDur.put( host, new Long( expire ) );
592+
log.debug( "++++ ignoring dead host: " + host + " for " + expire + " ms" );
593+
594+
// also clear all entries for this host from availPool
595+
clearHostFromPool( availPool, host );
596+
}
597+
else {
598+
log.debug( "++++ created socket (" + socket.toString() + ") for host: " + host );
599+
if ( hostDead.containsKey( host ) || hostDeadDur.containsKey( host ) ) {
600+
hostDead.remove( host );
601+
hostDeadDur.remove( host );
602+
}
603+
}
583604
}
584-
else {
585-
log.debug( "++++ created socket (" + socket.toString() + ") for host: " + host );
586-
hostDead.remove( host );
587-
hostDeadDur.remove( host );
605+
finally {
606+
hostDeadLock.unlock();
588607
}
589608

590609
return socket;
@@ -686,16 +705,16 @@ public SockIO getSock( String key, Integer hashCode ) {
686705

687706
// keep trying different servers until we find one
688707
int bucketSize = buckets.size();
689-
Integer[] triedBucket = new Integer[ bucketSize ];
708+
boolean[] triedBucket = new boolean[ bucketSize ];
709+
Arrays.fill( triedBucket, false );
690710

691-
while ( tries++ < bucketSize ) {
711+
// get initial bucket
712+
int bucket = hv % bucketSize;
713+
if ( bucket < 0 ) bucket *= -1;
692714

693-
// get bucket using hashcode
694-
// get one from factory
695-
int bucket = hv % bucketSize;
696-
if ( bucket < 0 )
697-
bucket += bucketSize;
715+
while ( tries++ < bucketSize ) {
698716

717+
// try to get socket from bucket
699718
SockIO sock = getConnection( (String)buckets.get( bucket ) );
700719

701720
log.debug( "cache choose " + buckets.get( bucket ) + " for " + key );
@@ -708,13 +727,13 @@ public SockIO getSock( String key, Integer hashCode ) {
708727
return null;
709728

710729
// log that we tried
711-
triedBucket[ bucket ] = new Integer( 1 );
730+
triedBucket[ bucket ] = true;
712731

713732
// if we failed to get a socket from this server
714733
// then we try again by adding an incrementer to the
715734
// current key and then rehashing
716735
int rehashTries = 0;
717-
while ( triedBucket[ hv % bucketSize ] != null ) {
736+
while ( triedBucket[ bucket ] ) {
718737

719738
int keyTry = tries + rehashTries;
720739
String newKey = String.format( "%s%s", keyTry, key );
@@ -741,6 +760,10 @@ public SockIO getSock( String key, Integer hashCode ) {
741760
}
742761

743762
rehashTries++;
763+
764+
// new bucket
765+
bucket = hv % bucketSize;
766+
if ( bucket < 0 ) bucket *= -1;
744767
}
745768
}
746769

@@ -846,6 +869,7 @@ public SockIO getConnection( String host ) {
846869

847870
/**
848871
* Adds a socket to a given pool for the given host.
872+
* THIS METHOD IS NOT THREADSAFE, SO BE CAREFUL WHEN USING!
849873
*
850874
* Internal utility method.
851875
*
@@ -865,14 +889,15 @@ protected void addSocketToPool( Map<String,Map<SockIO,Long>> pool, String host,
865889
}
866890

867891
Map<SockIO,Long> sockets =
868-
new Hashtable<SockIO,Long>();
892+
new HashMap<SockIO,Long>();
869893

870894
sockets.put( socket, new Long( System.currentTimeMillis() ) );
871895
pool.put( host, sockets );
872896
}
873897

874898
/**
875899
* Removes a socket from specified pool for host.
900+
* THIS METHOD IS NOT THREADSAFE, SO BE CAREFUL WHEN USING!
876901
*
877902
* Internal utility method.
878903
*
@@ -890,6 +915,7 @@ protected void removeSocketFromPool( Map<String,Map<SockIO,Long>> pool, String h
890915

891916
/**
892917
* Closes and removes all sockets from specified pool for host.
918+
* THIS METHOD IS NOT THREADSAFE, SO BE CAREFUL WHEN USING!
893919
*
894920
* Internal utility method.
895921
*
@@ -946,32 +972,6 @@ public void checkIn( SockIO socket, boolean addToAvail ) {
946972
}
947973
}
948974

949-
/**
950-
* Freshens a busy socket if it is in the busy pool.
951-
*
952-
* @param socket SockIO object to freshen
953-
*/
954-
public void touchInUseSockIO( SockIO socket ) {
955-
956-
String host = socket.getHost();
957-
log.debug( "++++ freshening busy socket: " + socket.toString() + " for host: " + host );
958-
959-
if ( busyPool.containsKey( host ) ) {
960-
961-
synchronized( busyPool ) {
962-
Map<SockIO,Long> sockets = busyPool.get( host );
963-
964-
if ( sockets != null )
965-
sockets.put( socket, new Long( System.currentTimeMillis() ) );
966-
else
967-
log.error( "++++ failed to freshen socket: " + socket.toString() + " for host: " + host + " as not found in busy pool" );
968-
}
969-
}
970-
else {
971-
log.error( "++++ failed to freshen socket: " + socket.toString() + " for host: " + host + " as not found in busy pool" );
972-
}
973-
}
974-
975975
/**
976976
* Returns a socket to the avail pool.
977977
*
@@ -994,7 +994,7 @@ public void checkIn( SockIO socket ) {
994994
*/
995995
protected void closePool( Map<String,Map<SockIO,Long>> pool ) {
996996

997-
synchronized( pool ) {
997+
synchronized( this ) {
998998
for ( Iterator<String> i = pool.keySet().iterator(); i.hasNext(); ) {
999999
String host = i.next();
10001000
Map<SockIO,Long> sockets = pool.get( host );
@@ -1023,23 +1023,24 @@ protected void closePool( Map<String,Map<SockIO,Long>> pool ) {
10231023
* Stops the maint thread.<br/>
10241024
* Nulls out all internal maps<br/>
10251025
*/
1026-
public synchronized void shutDown() {
1027-
log.debug( "++++ SockIOPool shutting down..." );
1028-
1029-
1030-
if ( maintThread != null && maintThread.isRunning() )
1031-
stopMaintThread();
1032-
1033-
log.debug( "++++ closing all internal pools." );
1034-
closePool( availPool );
1035-
closePool( busyPool );
1036-
availPool = null;
1037-
busyPool = null;
1038-
buckets = null;
1039-
hostDeadDur = null;
1040-
hostDead = null;
1041-
initialized = false;
1042-
log.debug( "++++ SockIOPool finished shutting down." );
1026+
public void shutDown() {
1027+
synchronized( this ) {
1028+
log.debug( "++++ SockIOPool shutting down..." );
1029+
1030+
if ( maintThread != null && maintThread.isRunning() )
1031+
stopMaintThread();
1032+
1033+
log.debug( "++++ closing all internal pools." );
1034+
closePool( availPool );
1035+
closePool( busyPool );
1036+
availPool = null;
1037+
busyPool = null;
1038+
buckets = null;
1039+
hostDeadDur = null;
1040+
hostDead = null;
1041+
initialized = false;
1042+
log.debug( "++++ SockIOPool finished shutting down." );
1043+
}
10431044
}
10441045

10451046
/**
@@ -1089,7 +1090,7 @@ protected void selfMaint() {
10891090
// as needed to maintain pool settings
10901091
for ( Iterator<String> i = availPool.keySet().iterator(); i.hasNext(); ) {
10911092
String host = i.next();
1092-
Map<SockIO,Long> sockets = availPool.get(host);
1093+
Map<SockIO,Long> sockets = availPool.get( host );
10931094
log.debug( "++++ Size of avail pool for host (" + host + ") = " + sockets.size() );
10941095

10951096
// if pool is too small (n < minSpare)
@@ -1434,13 +1435,6 @@ void close() {
14341435
pool.checkIn( this );
14351436
}
14361437

1437-
/**
1438-
* Freshens this socket in the busy pool.
1439-
*/
1440-
void touch() {
1441-
pool.touchInUseSockIO( this );
1442-
}
1443-
14441438
/**
14451439
* checks if the connection is open
14461440
*

0 commit comments

Comments
 (0)