Skip to content

Commit 3f9e24c

Browse files
committed
feature: support nonblocking writes using thread pool per host
1 parent acd7149 commit 3f9e24c

File tree

6 files changed

+456
-55
lines changed

6 files changed

+456
-55
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* src/com/danga/MemCached/CacheTask.java
3+
*
4+
* @author $Author: $
5+
* @version $Revision: $ $Date: $
6+
* copyright (c) 2005 meetup, inc
7+
*
8+
* $Id: $
9+
*/
10+
package com.danga.MemCached;
11+
12+
import java.io.IOException;
13+
import org.apache.log4j.Logger;
14+
15+
public class CacheTask implements Runnable {
16+
17+
// logger
18+
private static Logger log =
19+
Logger.getLogger( CacheTask.class.getName() );
20+
21+
// this should get set by the threadpool
22+
// before it executes the task
23+
private SockIOPool.SockIO socket;
24+
25+
private String cmd;
26+
private byte[] val;
27+
28+
public CacheTask( String cmd, byte[] val ) {
29+
this.cmd = cmd;
30+
this.val = val;
31+
}
32+
33+
public void setSocket( SockIOPool.SockIO socket ) {
34+
this.socket = socket;
35+
}
36+
37+
public SockIOPool.SockIO getSocket() {
38+
return this.socket;
39+
}
40+
41+
public void run() {
42+
try {
43+
socket.write( cmd.getBytes() );
44+
if ( val != null ) {
45+
socket.write( val );
46+
socket.write( "\r\n".getBytes() );
47+
}
48+
socket.flush();
49+
}
50+
catch ( IOException ex ) {
51+
log.error( "++++ exception thrown while writing bytes to server" );
52+
log.error( ex.getMessage(), ex );
53+
54+
try {
55+
socket.trueClose();
56+
}
57+
catch ( IOException ioe ) {
58+
log.error( "++++ failed to close socket : " + socket.toString() );
59+
}
60+
61+
socket = null;
62+
}
63+
}
64+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* src/com/danga/MemCached/MemCacheThreadPoolExecutor.java
3+
*
4+
* @author $Author: $
5+
* @version $Revision: $ $Date: $
6+
* copyright (c) 2005 meetup, inc
7+
*
8+
* $Id: $
9+
*/
10+
package com.danga.MemCached;
11+
12+
import java.util.concurrent.*;
13+
import java.util.*;
14+
15+
public class MemCacheThreadPoolExecutor extends ThreadPoolExecutor {
16+
17+
private String poolName;
18+
private String host;
19+
private SockIOPool.SockIO socket;
20+
private boolean socketOnDemand;
21+
22+
/**
23+
*
24+
*
25+
* @param corePoolSize
26+
* @param maximumPoolSize
27+
* @param keepAliveTime
28+
* @param unit
29+
* @param workQueue
30+
* @param pooName
31+
* @param host
32+
*/
33+
public MemCacheThreadPoolExecutor( int corePoolSize, int maximumPoolSize,
34+
long keepAliveTime, TimeUnit unit,
35+
BlockingQueue<Runnable> workQueue, String poolName, String host ) {
36+
37+
super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue );
38+
39+
if ( poolName == null
40+
|| host == null
41+
|| "".equals( poolName )
42+
|| "".equals( host ) )
43+
throw new IllegalArgumentException( "missing one of host: " + host + " or pool: " + poolName );
44+
45+
this.poolName = poolName;
46+
this.host = host;
47+
48+
// get socket if we have a pool of only a single thread
49+
// else we will get socket each time
50+
if ( corePoolSize == maximumPoolSize
51+
&& corePoolSize == 1 ) {
52+
this.socket =
53+
SockIOPool.getInstance( poolName ).getConnection( host );
54+
55+
this.socketOnDemand = false;
56+
}
57+
else {
58+
this.socketOnDemand = true;
59+
60+
}
61+
}
62+
63+
protected void beforeExecute( Thread t, Runnable r ) {
64+
super.beforeExecute( t, r );
65+
66+
// make sure we have a valid connected socket
67+
// if not, then get a new connection
68+
if ( socketOnDemand
69+
|| socket == null
70+
|| !socket.isConnected() )
71+
socket = SockIOPool.getInstance( poolName ).getConnection( host );
72+
73+
((CacheTask)r).setSocket( socket );
74+
75+
}
76+
77+
protected void afterExecute( Runnable r, Throwable t ) {
78+
super.afterExecute( r, t );
79+
80+
socket = ((CacheTask)r).getSocket();
81+
if ( socketOnDemand ) {
82+
// return to pool
83+
if ( socket != null )
84+
socket.close();
85+
}
86+
else {
87+
// freshen the socket so we have it around
88+
if ( socket != null )
89+
socket.touch();
90+
else
91+
this.socket = null;
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)