Skip to content

Commit 01491da

Browse files
committed
checkin some fixes and add health check on checkout of socket
1 parent b41940f commit 01491da

File tree

4 files changed

+171
-31
lines changed

4 files changed

+171
-31
lines changed

src/com/danga/MemCached/SockIOPool.java

Lines changed: 107 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
* int socketConnectTO = 1000 * 3; // 3 seconds to block on initial connections. If 0, then will use blocking connect (default)
8181
* boolean failover = false; // turn off auto-failover in event of server down
8282
* boolean nagleAlg = false; // turn off Nagle's algorithm on all sockets in pool
83+
* boolean aliveCheck = false; // disable health check of socket on checkout
8384
*
8485
* SockIOPool pool = SockIOPool.getInstance();
8586
* pool.setServers( serverlist );
@@ -92,7 +93,8 @@
9293
* pool.setMaintSleep( maintThreadSleep );
9394
* pool.setSocketTO( socketTimeOut );
9495
* pool.setNagle( nagleAlg );
95-
* pool.setHashingAlg( SockIOPool.NEW_COMPAT_HASH );
96+
* pool.setHashingAlg( SockIOPool.NEW_COMPAT_HASH );
97+
* pool.setAliveCheck( true );
9698
* pool.initialize();
9799
* }
98100
* </pre>
@@ -146,6 +148,7 @@ public class SockIOPool {
146148
private long maintSleep = 1000 * 5; // maintenance thread sleep time
147149
private int socketTO = 1000 * 10; // default timeout of socket reads
148150
private int socketConnectTO = 1000 * 3; // default timeout of socket connections
151+
private boolean aliveCheck = false; // default to not check each connection for being alive
149152
private boolean failover = true; // default to failover in event of cache server dead
150153
private boolean failback = true; // only used if failover is also set ... controls putting a dead server back into rotation
151154
private boolean nagle = true; // enable/disable Nagle's algorithm
@@ -366,6 +369,27 @@ public static SockIOPool getInstance() {
366369
*/
367370
public boolean getFailback() { return this.failback; }
368371

372+
/**
373+
* Sets the aliveCheck flag for the pool.
374+
*
375+
* When true, this will attempt to talk to the server on
376+
* every connection checkout to make sure the connection is
377+
* still valid. This adds extra network chatter and thus is
378+
* defaulted off. May be useful if you want to ensure you do
379+
* not have any problems talking to the server on a dead connection.
380+
*
381+
* @param aliveCheck true/false
382+
*/
383+
public void setAliveCheck( boolean aliveCheck ) { this.aliveCheck = aliveCheck; }
384+
385+
386+
/**
387+
* Returns the current status of the aliveCheck flag.
388+
*
389+
* @return true / false
390+
*/
391+
public boolean getAliveCheck() { return this.aliveCheck; }
392+
369393
/**
370394
* Sets the Nagle alg flag for the pool.
371395
*
@@ -667,8 +691,31 @@ public SockIO getSock( String key, Integer hashCode ) {
667691
return null;
668692

669693
// if only one server, return it
670-
if ( buckets.size() == 1 )
671-
return getConnection( (String) buckets.get( 0 ) );
694+
if ( buckets.size() == 1 ) {
695+
SockIO sock = getConnection( (String) buckets.get( 0 ) );
696+
if ( sock != null && sock.isConnected() ) {
697+
698+
if ( aliveCheck ) {
699+
if ( sock.isAlive() ) {
700+
return sock;
701+
}
702+
else {
703+
sock.close();
704+
try { sock.trueClose(); } catch ( IOException ioe ) { log.error( "failed to close dead socket" ); }
705+
sock = null;
706+
}
707+
}
708+
else {
709+
return sock;
710+
}
711+
}
712+
else {
713+
sock = null;
714+
}
715+
716+
if ( !failover )
717+
return null;
718+
}
672719

673720
int tries = 0;
674721

@@ -719,8 +766,24 @@ public SockIO getSock( String key, Integer hashCode ) {
719766

720767
log.debug( "cache choose " + buckets.get( bucket ) + " for " + key );
721768

722-
if ( sock != null )
723-
return sock;
769+
if ( sock != null && sock.isConnected() ) {
770+
if ( aliveCheck ) {
771+
if ( sock.isAlive() ) {
772+
return sock;
773+
}
774+
else {
775+
sock.close();
776+
try { sock.trueClose(); } catch ( IOException ioe ) { log.error( "failed to close dead socket" ); }
777+
sock = null;
778+
}
779+
}
780+
else {
781+
return sock;
782+
}
783+
}
784+
else {
785+
sock = null;
786+
}
724787

725788
// if we do not want to failover, then bail here
726789
if ( !failover )
@@ -993,27 +1056,24 @@ public void checkIn( SockIO socket ) {
9931056
* @param pool pool to close
9941057
*/
9951058
protected void closePool( Map<String,Map<SockIO,Long>> pool ) {
1059+
for ( Iterator<String> i = pool.keySet().iterator(); i.hasNext(); ) {
1060+
String host = i.next();
1061+
Map<SockIO,Long> sockets = pool.get( host );
9961062

997-
synchronized( this ) {
998-
for ( Iterator<String> i = pool.keySet().iterator(); i.hasNext(); ) {
999-
String host = i.next();
1000-
Map<SockIO,Long> sockets = pool.get( host );
1001-
1002-
for ( Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext(); ) {
1003-
SockIO socket = j.next();
1004-
1005-
try {
1006-
socket.trueClose();
1007-
}
1008-
catch ( IOException ioe ) {
1009-
log.error( "++++ failed to trueClose socket: " + socket.toString() + " for host: " + host );
1010-
}
1011-
1012-
j.remove();
1013-
socket = null;
1063+
for ( Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext(); ) {
1064+
SockIO socket = j.next();
1065+
1066+
try {
1067+
socket.trueClose();
1068+
}
1069+
catch ( IOException ioe ) {
1070+
log.error( "++++ failed to trueClose socket: " + socket.toString() + " for host: " + host );
10141071
}
1072+
1073+
j.remove();
1074+
socket = null;
10151075
}
1016-
}
1076+
}
10171077
}
10181078

10191079
/**
@@ -1441,7 +1501,30 @@ void close() {
14411501
* @return true if connected
14421502
*/
14431503
boolean isConnected() {
1444-
return (sock != null && sock.isConnected());
1504+
return ( sock != null && sock.isConnected() );
1505+
}
1506+
1507+
/*
1508+
* checks to see that the connection is still working
1509+
*
1510+
* @return true if still alive
1511+
*/
1512+
boolean isAlive() {
1513+
1514+
if ( !isConnected() )
1515+
return false;
1516+
1517+
// try to talk to the server w/ a dumb query to ask its version
1518+
try {
1519+
this.write( "version\r\n".getBytes() );
1520+
this.flush();
1521+
String response = this.readLine();
1522+
}
1523+
catch ( IOException ex ) {
1524+
return false;
1525+
}
1526+
1527+
return true;
14451528
}
14461529

14471530
/**

src/com/danga/MemCached/test/MemCachedBench.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,16 @@
3333

3434
import com.danga.MemCached.*;
3535
import java.util.*;
36-
import org.apache.log4j.*;
3736

3837
public class MemCachedBench {
3938

4039
public static void main(String[] args) {
4140

42-
BasicConfigurator.configure();
4341

4442
int runs = Integer.parseInt(args[0]);
4543
int start = Integer.parseInt(args[1]);
4644

47-
String[] serverlist = { "192.168.1.20:1624" };
45+
String[] serverlist = { "192.168.1.1:1624" };
4846

4947
// initialize the pool for memcache servers
5048
SockIOPool pool = SockIOPool.getInstance( "test" );
@@ -54,13 +52,15 @@ public static void main(String[] args) {
5452
pool.setMinConn( 100 );
5553
pool.setMaxConn( 500 );
5654
pool.setMaintSleep( 30 );
55+
pool.setAliveCheck( true );
5756

5857
pool.setNagle( false );
5958
pool.initialize();
6059

6160
// get client instance
6261
MemCachedClient mc = new MemCachedClient();
6362
mc.setCompressEnable( false );
63+
mc.setPoolName( "test" );
6464

6565
String keyBase = "testKey";
6666
String object = "This is a test of an object blah blah es, serialization does not seem to slow things down so much. The gzip compression is horrible horrible performance, so we only use it for very large objects. I have not done any heavy benchmarking recently";
@@ -71,24 +71,24 @@ public static void main(String[] args) {
7171
}
7272
long end = System.currentTimeMillis();
7373
long time = end - begin;
74-
System.out.println(runs + " gets: " + time + "ms");
74+
System.out.println(runs + " sets: " + time + "ms");
7575

7676
begin = System.currentTimeMillis();
7777
for (int i = start; i < start+runs; i++) {
7878
String str = (String) mc.get(keyBase + i);
7979
}
8080
end = System.currentTimeMillis();
8181
time = end - begin;
82-
System.out.println(runs + " deletes: " + time + "ms");
82+
System.out.println(runs + " gets: " + time + "ms");
8383

8484
begin = System.currentTimeMillis();
8585
for (int i = start; i < start+runs; i++) {
8686
mc.delete( keyBase + i );
8787
}
8888
end = System.currentTimeMillis();
8989
time = end - begin;
90-
System.out.println(runs + " gets: " + time + "ms");
90+
System.out.println(runs + " deletes: " + time + "ms");
9191

92-
SockIOPool.getInstance().shutDown();
92+
SockIOPool.getInstance( "test" ).shutDown();
9393
}
9494
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* * Test case to test memcached Java client failover when server list includes a server that is down
3+
* */
4+
package com.danga.MemCached.test;
5+
6+
import com.danga.MemCached.*;
7+
import org.apache.log4j.*;
8+
9+
public class TestMemcached {
10+
public static void main(String[] args) {
11+
// memcached should be running on port 11211 but NOT on 11212
12+
13+
BasicConfigurator.configure();
14+
String[] servers = { "192.168.1.1:1624", "192.168.1.1:1625" };
15+
SockIOPool pool = SockIOPool.getInstance();
16+
pool.setServers( servers );
17+
pool.setFailover( true );
18+
pool.setInitConn( 10 );
19+
pool.setMinConn( 5 );
20+
pool.setMaxConn( 250 );
21+
pool.setMaintSleep( 30 );
22+
pool.setNagle( false );
23+
pool.setSocketTO( 3000 );
24+
pool.setAliveCheck( true );
25+
pool.initialize();
26+
27+
MemCachedClient memCachedClient = new MemCachedClient();
28+
29+
// turn off most memcached client logging:
30+
com.danga.MemCached.Logger.getLogger( MemCachedClient.class.getName() ).setLevel( com.danga.MemCached.Logger.LEVEL_WARN );
31+
32+
for ( int i = 0; i < 10; i++ ) {
33+
boolean success = memCachedClient.set( "" + i, "Hello!" );
34+
String result = (String)memCachedClient.get( "" + i );
35+
System.out.println( String.format( "set( %d ): %s", i, success ) );
36+
System.out.println( String.format( "get( %d ): %s", i, result ) );
37+
}
38+
39+
System.out.println( "\n\t -- sleeping --\n" );
40+
try { Thread.sleep( 10000 ); } catch ( Exception ex ) { }
41+
42+
for ( int i = 0; i < 10; i++ ) {
43+
boolean success = memCachedClient.set( "" + i, "Hello!" );
44+
String result = (String)memCachedClient.get( "" + i );
45+
System.out.println( String.format( "set( %d ): %s", i, success ) );
46+
System.out.println( String.format( "get( %d ): %s", i, result ) );
47+
}
48+
}
49+
}

src/com/danga/MemCached/test/UnitTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,14 @@
3535

3636
import com.danga.MemCached.*;
3737
import java.util.*;
38+
import org.apache.log4j.Logger;
39+
import org.apache.log4j.BasicConfigurator;
3840

3941
public class UnitTests {
42+
43+
// logger
44+
private static Logger log =
45+
Logger.getLogger( UnitTests.class.getName() );
4046

4147
public static MemCachedClient mc = null;
4248

@@ -159,6 +165,8 @@ public static void test16() {
159165
*/
160166
public static void main(String[] args) {
161167

168+
BasicConfigurator.configure();
169+
162170
String[] serverlist = { "192.168.1.1:1624" };
163171

164172
// initialize the pool for memcache servers

0 commit comments

Comments
 (0)