Skip to content

Commit 70e26ae

Browse files
committed
start working towards NIO and non-blocking getMulti
1 parent ee420a9 commit 70e26ae

File tree

2 files changed

+313
-37
lines changed

2 files changed

+313
-37
lines changed

src/com/danga/MemCached/MemCachedClient.java

Lines changed: 271 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525

2626
import java.util.*;
2727
import java.util.zip.*;
28+
import java.nio.*;
29+
import java.nio.charset.*;
30+
import java.nio.channels.*;
31+
import java.nio.channels.spi.*;
2832
import java.io.*;
2933
import java.net.URLEncoder;
3034

@@ -161,6 +165,10 @@ public class MemCachedClient {
161165
private static Logger log =
162166
Logger.getLogger( MemCachedClient.class.getName() );
163167

168+
// default charset
169+
private static final Charset charSet =
170+
Charset.defaultCharset();
171+
164172
// return codes
165173
private static final String VALUE = "VALUE"; // start of value line from server
166174
private static final String STATS = "STAT"; // start of stats line from server
@@ -1200,7 +1208,7 @@ public Object get( String key, Integer hashCode, boolean asString ) {
12001208
errorHandler.handleErrorOnGet( this, e, key );
12011209

12021210
log.error( "failed to sanitize your key!", e );
1203-
return false;
1211+
return null;
12041212
}
12051213

12061214
// get SockIO obj using cache key
@@ -1359,7 +1367,7 @@ public Map<String,Object> getMulti( String[] keys, Integer[] hashCodes, boolean
13591367
return null;
13601368
}
13611369

1362-
Map<String,StringBuilder> sockKeys =
1370+
Map<String,StringBuilder> cmdMap =
13631371
new HashMap<String,StringBuilder>();
13641372

13651373
for ( int i = 0; i < keys.length; ++i ) {
@@ -1395,32 +1403,91 @@ public Map<String,Object> getMulti( String[] keys, Integer[] hashCodes, boolean
13951403
continue;
13961404

13971405
// store in map and list if not already
1398-
if ( !sockKeys.containsKey( sock.getHost() ) )
1399-
sockKeys.put( sock.getHost(), new StringBuilder() );
1406+
if ( !cmdMap.containsKey( sock.getHost() ) )
1407+
cmdMap.put( sock.getHost(), new StringBuilder( "get" ) );
14001408

1401-
sockKeys.get( sock.getHost() ).append( " " + key );
1409+
cmdMap.get( sock.getHost() ).append( " " + key );
14021410

14031411
// return to pool
14041412
sock.close();
14051413
}
14061414

1407-
log.info( "multi get socket count : " + sockKeys.size() );
1415+
log.info( "multi get socket count : " + cmdMap.size() );
14081416

14091417
// now query memcache
14101418
Map<String,Object> ret =
14111419
new HashMap<String,Object>( keys.length );
14121420

1413-
for ( Iterator<String> i = sockKeys.keySet().iterator(); i.hasNext(); ) {
1414-
// get SockIO obj from hostname
1421+
// now use new NIO implementation
1422+
loadItemsNIO( cmdMap, ret, asString, keys );
1423+
1424+
log.debug( "++++ memcache: got back " + ret.size() + " results" );
1425+
return ret;
1426+
}
1427+
1428+
/**
1429+
*
1430+
*
1431+
* @param cmdMap
1432+
* @param cmdMap
1433+
* @param ret
1434+
* @param ret
1435+
* @param asString
1436+
* @param keys
1437+
*/
1438+
private void loadItemsNIO( Map<String,StringBuilder> cmdMap, Map<String,Object> ret, boolean asString, String[] keys ) {
1439+
1440+
// selector for our channels
1441+
Selector selector = null;
1442+
try {
1443+
selector = SelectorProvider.provider().openSelector();
1444+
}
1445+
catch ( IOException e ) {
1446+
// if we have an errorHandler, use its hook
1447+
if ( errorHandler != null )
1448+
errorHandler.handleErrorOnGet( this, e, keys );
1449+
1450+
// exception thrown
1451+
log.error( "++++ exception thrown while grabbing Selector on getMulti" );
1452+
log.error( e.getMessage(), e );
1453+
1454+
return;
1455+
}
1456+
1457+
// map to hold our channels
1458+
Map<String,SockIOPool.SockIO> sockMap =
1459+
new HashMap<String,SockIOPool.SockIO>( cmdMap.keySet().size() );
1460+
1461+
// Convenience to move our Strings to CharBuffers
1462+
Map<String,CharBuffer> writeMap =
1463+
new HashMap<String,CharBuffer>( cmdMap.keySet().size() );
1464+
1465+
// map to store server response
1466+
// keyed off of the cache key
1467+
Map<String,byte[]> response =
1468+
new HashMap<String,byte[]>( keys.length );
1469+
1470+
// iterate through and flip the sockets to nonblocking
1471+
// get a selector and register the socket
1472+
for ( Iterator<String> i = cmdMap.keySet().iterator(); i.hasNext(); ) {
1473+
14151474
String host = i.next();
1416-
SockIOPool.SockIO sock = SockIOPool.getInstance( poolName ).getConnection( host );
1475+
SockIOPool.SockIO sock =
1476+
SockIOPool.getInstance( poolName ).getConnection( host );
14171477

14181478
try {
1419-
String cmd = "get" + sockKeys.get( host ) + "\r\n";
1420-
log.debug( "++++ memcache getMulti cmd: " + cmd );
1421-
sock.write( cmd.getBytes() );
1422-
sock.flush();
1423-
loadItems( sock, ret, asString );
1479+
// get a selector
1480+
SocketChannel channel = sock.getChannel();
1481+
channel.configureBlocking( false );
1482+
channel.register( selector, SelectionKey.OP_WRITE, host );
1483+
1484+
// store channel (so we can flip back to blocking when done)
1485+
sockMap.put( host, sock );
1486+
1487+
// store the string in a charbuffer and remove this entry
1488+
CharBuffer cb = CharBuffer.wrap( cmdMap.get( host ).append( "\r\n" ) );
1489+
writeMap.put( host, cb );
1490+
i.remove();
14241491
}
14251492
catch ( IOException e ) {
14261493

@@ -1429,12 +1496,14 @@ public Map<String,Object> getMulti( String[] keys, Integer[] hashCodes, boolean
14291496
errorHandler.handleErrorOnGet( this, e, keys );
14301497

14311498
// exception thrown
1432-
log.error( "++++ exception thrown while getting from cache on getMulti" );
1499+
log.error( "++++ exception thrown while setting up nonblocking channels on getMulti" );
14331500
log.error( e.getMessage(), e );
14341501

14351502
// clear this sockIO obj from the list
1436-
// and from the map containing keys
1503+
// and from all maps
14371504
i.remove();
1505+
1506+
// try to return to the pool
14381507
try {
14391508
sock.trueClose();
14401509
}
@@ -1443,14 +1512,194 @@ public Map<String,Object> getMulti( String[] keys, Integer[] hashCodes, boolean
14431512
}
14441513
sock = null;
14451514
}
1515+
}
14461516

1447-
// Return socket to pool
1448-
if ( sock != null )
1449-
sock.close();
1517+
// will be reused in loop
1518+
SocketChannel socket = null;
1519+
1520+
// grab a buffer to work with
1521+
ByteBuffer buf =
1522+
ByteBuffer.allocateDirect( 8192 );
1523+
1524+
// get an encoder
1525+
CharsetEncoder encoder = charSet.newEncoder();
1526+
1527+
// now lets start looping
1528+
while ( true ) {
1529+
1530+
try {
1531+
// block for event
1532+
selector.select();
1533+
}
1534+
catch ( IOException e ) {
1535+
// if we have an errorHandler, use its hook
1536+
if ( errorHandler != null )
1537+
errorHandler.handleErrorOnGet( this, e, keys );
1538+
1539+
// exception thrown
1540+
log.error( "++++ exception thrown while grabbing selecting on getMulti" );
1541+
log.error( e.getMessage(), e );
1542+
1543+
// need to put all the channels back to blocking
1544+
for ( String host : sockMap.keySet() ) {
1545+
1546+
SockIOPool.SockIO sock =
1547+
sockMap.get( host );
1548+
1549+
try {
1550+
sock.getChannel().configureBlocking( true );
1551+
}
1552+
catch ( IOException ioe ) {
1553+
// try to return to the pool since we failed to
1554+
// put back to blocking mode
1555+
try {
1556+
sock.trueClose();
1557+
}
1558+
catch ( IOException ioe2 ) { }
1559+
sock = null;
1560+
}
1561+
}
1562+
1563+
return;
1564+
}
1565+
1566+
// get our keys for this selector
1567+
Set<SelectionKey> readyKeys =
1568+
selector.selectedKeys();
1569+
1570+
Iterator<SelectionKey> i = readyKeys.iterator();
1571+
1572+
// iterate through all selection keys
1573+
while ( i.hasNext() ) {
1574+
SelectionKey sk = i.next();
1575+
i.remove();
1576+
1577+
if ( !sk.isValid() )
1578+
continue;
1579+
1580+
String host = (String)sk.attachment();
1581+
1582+
// handle a write op
1583+
if ( sk.isWritable() ) {
1584+
1585+
socket = (SocketChannel)sk.channel();
1586+
buf.clear();
1587+
1588+
if ( writeMap.containsKey( host ) ) {
1589+
CharBuffer in = writeMap.get( host );
1590+
log.error( "remaining bytes to send before encoding: " + in.remaining() );
1591+
1592+
// write what we need to write
1593+
encoder.reset();
1594+
CoderResult cr = encoder.encode( in, buf, false );
1595+
encoder.flush( buf );
1596+
1597+
log.error( "remaining bytes to send after encoding: " + in.remaining() );
1598+
1599+
// check to see if we are done?
1600+
if ( cr.isUnderflow() ) {
1601+
encoder.encode( in, buf, true );
1602+
encoder.flush( buf );
1603+
1604+
// and clear out the map
1605+
writeMap.remove( host );
1606+
}
1607+
1608+
// now send
1609+
try {
1610+
while ( buf.hasRemaining() ) {
1611+
int rc = socket.write( buf );
1612+
log.error( String.format( "wrote %d bytes to the server for host %s", rc, host ) );
1613+
}
1614+
}
1615+
catch ( IOException e ) {
1616+
// if we have an errorHandler, use its hook
1617+
if ( errorHandler != null )
1618+
errorHandler.handleErrorOnGet( this, e, keys );
1619+
1620+
log.error( "error writing to dead socket: " + e.getMessage(), e );
1621+
1622+
try {
1623+
socket.close();
1624+
sk.channel().close();
1625+
1626+
// make sure the pool knows about this
1627+
if ( sockMap.containsKey( host ) ) {
1628+
try {
1629+
sockMap.get( host ).trueClose();
1630+
}
1631+
catch ( IOException ioe ) { }
1632+
}
1633+
}
1634+
catch ( IOException ioe ) { }
1635+
1636+
sk.cancel();
1637+
1638+
// clear out any more attempts on this host
1639+
writeMap.remove( host );
1640+
sockMap.remove( host );
1641+
}
1642+
}
1643+
else {
1644+
// nothing to write, so lets switch to read
1645+
if ( sk.isValid() ) {
1646+
log.error( String.format( "Switching socket for host: %s to read in stream in getMulti", host ) );
1647+
sk.interestOps( SelectionKey.OP_READ );
1648+
}
1649+
}
1650+
1651+
continue;
1652+
}
1653+
1654+
// handle read operation
1655+
if ( sk.isReadable() ) {
1656+
1657+
// get socket and reset buffer
1658+
socket = (SocketChannel)sk.channel();
1659+
buf.clear();
1660+
1661+
int sz = 0;
1662+
try {
1663+
sz = socket.read( buf );
1664+
}
1665+
catch ( IOException e ) {
1666+
// if we have an errorHandler, use its hook
1667+
if ( errorHandler != null )
1668+
errorHandler.handleErrorOnGet( this, e, keys );
1669+
1670+
log.error( "error writing to dead socket: " + e.getMessage(), e );
1671+
1672+
try {
1673+
socket.close();
1674+
sk.channel().close();
1675+
}
1676+
catch ( IOException ioe ) { }
1677+
1678+
sk.cancel();
1679+
}
1680+
1681+
if ( sz == -1 ) {
1682+
log.info( "socket has hit EOS" );
1683+
try {
1684+
socket.close();
1685+
sk.channel().close();
1686+
}
1687+
catch ( IOException ioe ) { }
1688+
1689+
// cancel the key
1690+
sk.cancel();
1691+
}
1692+
else {
1693+
1694+
1695+
1696+
1697+
}
1698+
1699+
continue;
1700+
}
1701+
}
14501702
}
1451-
1452-
log.debug( "++++ memcache: got back " + ret.size() + " results" );
1453-
return ret;
14541703
}
14551704

14561705
/**

0 commit comments

Comments
 (0)