2828 ******************************************************************************/
2929package com .schooner .MemCached ;
3030
31- import java .io .BufferedReader ;
3231import java .io .ByteArrayInputStream ;
3332import java .io .ByteArrayOutputStream ;
3433import java .io .IOException ;
3534import java .io .InputStream ;
36- import java .io .StringReader ;
3735import java .io .UnsupportedEncodingException ;
3836import java .net .URLEncoder ;
3937import java .nio .ByteBuffer ;
@@ -184,14 +182,7 @@ public boolean delete(String key, Integer hashCode, Date expiry) {
184182
185183 // if we get appropriate response back, then we return true
186184 // get result code
187- sock .readBuf .clear ();
188- String line ;
189- sock .getChannel ().read (sock .readBuf );
190- sock .readBuf .flip ();
191- byte [] temp = new byte [sock .readBuf .remaining ()];
192- sock .readBuf .get (temp );
193- line = new String (temp );
194-
185+ String line = new SockInputStream (sock , Integer .MAX_VALUE ).getLine ();
195186 if (DELETED .equals (line )) { // successful
196187 if (log .isInfoEnabled ())
197188 log .info (new StringBuffer ().append ("++++ deletion of key: " ).append (key ).append (
@@ -428,14 +419,7 @@ private boolean set(String cmdname, String key, Object value, Date expiry, Integ
428419 // now write the data to the cache server
429420 sock .flush ();
430421 // get result code
431- sock .readBuf .clear ();
432- String line ; // response from server
433- // read response from server, and store it in readBuf
434- sock .getChannel ().read (sock .readBuf );
435- byte [] temp = new byte [sock .readBuf .position ()];
436- sock .readBuf .flip ();
437- sock .readBuf .get (temp );
438- line = new String (temp );
422+ String line = new SockInputStream (sock , Integer .MAX_VALUE ).getLine ();
439423 if (STORED .equals (line )) {
440424 /*
441425 * Successfully set here.
@@ -578,15 +562,8 @@ private long incrdecr(String cmdname, String key, long inc, Integer hashCode) {
578562 String cmd = new StringBuffer ().append (cmdname ).append (" " ).append (key ).append (" " ).append (inc ).append (
579563 "\r \n " ).toString ();
580564 sock .write (cmd .getBytes ());
581- // get result back
582565 // get result code
583- sock .readBuf .clear ();
584- String line ;
585- sock .getChannel ().read (sock .readBuf );
586- sock .readBuf .flip ();
587- byte [] temp = new byte [sock .readBuf .limit ()];
588- sock .readBuf .get (temp );
589- line = new String (temp ).split ("\r \n " )[0 ];
566+ String line = new SockInputStream (sock , Integer .MAX_VALUE ).getLine ().split ("\r \n " )[0 ];
590567 if (line .matches ("\\ d+" )) {
591568 // Sucessfully increase.
592569 // return sock to pool and return result
@@ -714,23 +691,23 @@ private Object get(String cmd, String key, Integer hashCode, boolean asString) {
714691 int flag = 0 ;
715692
716693 // get result code
717- sock .readBuf .clear ();
718- sock .getChannel ().read (sock .readBuf );
719-
694+ SockInputStream input = new SockInputStream (sock , Integer .MAX_VALUE );
720695 // Then analysis the return metadata from server
721696 // including key, flag and data size
722697 boolean stop = false ;
723698 StringBuffer sb = new StringBuffer ();
724- sock .readBuf .flip ();
725- byte b ;
699+ int b ;
726700 int index = 0 ;
727701 while (!stop ) {
728702 /*
729703 * Critical block to parse the response header.
730704 */
731- b = sock . readBuf . get ();
705+ b = input . read ();
732706 if (b == ' ' || b == '\r' ) {
733707 switch (index ) {
708+ case 0 :
709+ if (END .startsWith (sb .toString ()))
710+ return null ;
734711 case 1 :
735712 break ;
736713 case 2 :
@@ -744,7 +721,7 @@ private Object get(String cmd, String key, Integer hashCode, boolean asString) {
744721 index ++;
745722 sb = new StringBuffer ();
746723 if (b == '\r' ) {
747- sock . readBuf . get ();
724+ input . read ();
748725 stop = true ;
749726 }
750727 continue ;
@@ -753,7 +730,6 @@ private Object get(String cmd, String key, Integer hashCode, boolean asString) {
753730 }
754731
755732 Object o = null ;
756- SockInputStream input = new SockInputStream (sock );
757733 input .willRead (dataSize );
758734 // we can only take out serialized objects
759735 if (dataSize > 0 ) {
@@ -787,7 +763,11 @@ private Object get(String cmd, String key, Integer hashCode, boolean asString) {
787763 o = ((ObjectTransCoder ) transCoder ).decode (in , classLoader );
788764 }
789765 }
790- sock .readBuf .clear ();
766+ input .willRead (Integer .MAX_VALUE );
767+ // Skip "\r\n" after each data block for VALUE
768+ input .getLine ();
769+ // Skip "END\r\n" after get
770+ input .getLine ();
791771 return o ;
792772 } catch (Exception ce ) {
793773 // if we have an errorHandler, use its hook
@@ -853,23 +833,23 @@ public MemcachedItem gets(String cmd, String key, Integer hashCode, boolean asSt
853833 MemcachedItem item = new MemcachedItem ();
854834
855835 // get result code
856- sock .readBuf .clear ();
857- sock .getChannel ().read (sock .readBuf );
858-
836+ SockInputStream input = new SockInputStream (sock , Integer .MAX_VALUE );
859837 // Then analysis the return metadata from server
860838 // including key, flag and data size
861839 boolean stop = false ;
862840 StringBuffer sb = new StringBuffer ();
863- sock .readBuf .flip ();
864- byte b ;
841+ int b ;
865842 int index = 0 ;
866843 while (!stop ) {
867844 /*
868845 * Critical block to parse the response header.
869846 */
870- b = sock . readBuf . get ();
847+ b = input . read ();
871848 if (b == ' ' || b == '\r' ) {
872849 switch (index ) {
850+ case 0 :
851+ if (END .startsWith (sb .toString ()))
852+ return null ;
873853 case 1 :
874854 break ;
875855 case 2 :
@@ -887,15 +867,14 @@ public MemcachedItem gets(String cmd, String key, Integer hashCode, boolean asSt
887867 index ++;
888868 sb = new StringBuffer ();
889869 if (b == '\r' ) {
890- sock . readBuf . get ();
870+ input . read ();
891871 stop = true ;
892872 }
893873 continue ;
894874 }
895875 sb .append ((char ) b );
896876 }
897877 Object o = null ;
898- SockInputStream input = new SockInputStream (sock );
899878 input .willRead (dataSize );
900879 // we can only take out serialized objects
901880 if (dataSize > 0 ) {
@@ -926,8 +905,12 @@ public MemcachedItem gets(String cmd, String key, Integer hashCode, boolean asSt
926905 o = transCoder .decode (in );
927906 }
928907 }
929- sock .readBuf .clear ();
930908 item .value = o ;
909+ input .willRead (Integer .MAX_VALUE );
910+ // Skip "\r\n" after each data block for VALUE
911+ input .getLine ();
912+ // Skip "END\r\n" after get
913+ input .getLine ();
931914 return item ;
932915
933916 } catch (Exception ce ) {
@@ -1287,13 +1270,7 @@ public boolean flushAll(String[] servers) {
12871270 sock .write (command .getBytes ());
12881271 // if we get appropriate response back, then we return true
12891272 // get result code
1290- sock .readBuf .clear ();
1291- String line ;
1292- sock .getChannel ().read (sock .readBuf );
1293- sock .readBuf .flip ();
1294- byte [] temp = new byte [sock .readBuf .remaining ()];
1295- sock .readBuf .get (temp );
1296- line = new String (temp );
1273+ String line = new SockInputStream (sock , Integer .MAX_VALUE ).getLine ();
12971274 success = (OK .equals (line )) ? success && true : false ;
12981275 } catch (IOException e ) {
12991276
@@ -1391,15 +1368,10 @@ private Map<String, Map<String, String>> stats(String[] servers, String command,
13911368 // map to hold key value pairs
13921369 Map <String , String > stats = new HashMap <String , String >();
13931370 // get result code
1394- sock . readBuf . clear ( );
1371+ SockInputStream input = new SockInputStream ( sock , Integer . MAX_VALUE );
13951372 String line ;
1396- sock .getChannel ().read (sock .readBuf );
1397- sock .readBuf .flip ();
1398- byte [] temp = new byte [sock .readBuf .remaining ()];
1399- sock .readBuf .get (temp );
1400- BufferedReader reader = new BufferedReader (new StringReader (new String (temp )));
14011373 // loop over results
1402- while ((line = reader . readLine ()) != null ) {
1374+ while ((line = input . getLine ()) != null ) {
14031375
14041376 if (line .startsWith (lineStart )) {
14051377 String [] info = line .split (" " , 3 );
@@ -1683,14 +1655,7 @@ public boolean sync(String key, Integer hashCode) {
16831655
16841656 // if we get appropriate response back, then we return true
16851657 // get result code
1686- sock .readBuf .clear ();
1687- String line ;
1688- sock .getChannel ().read (sock .readBuf );
1689- sock .readBuf .flip ();
1690- byte [] temp = new byte [sock .readBuf .remaining ()];
1691- sock .readBuf .get (temp );
1692- line = new String (temp );
1693-
1658+ String line = new SockInputStream (sock , Integer .MAX_VALUE ).getLine ();
16941659 if (SYNCED .equals (line )) {
16951660 if (log .isInfoEnabled ())
16961661 log .info (new StringBuffer ().append ("++++ sync of key: " ).append (key ).append (
@@ -1773,13 +1738,7 @@ public boolean syncAll(String[] servers) {
17731738 sock .write (command .getBytes ());
17741739 // if we get appropriate response back, then we return true
17751740 // get result code
1776- sock .readBuf .clear ();
1777- String line ;
1778- sock .getChannel ().read (sock .readBuf );
1779- sock .readBuf .flip ();
1780- byte [] temp = new byte [sock .readBuf .remaining ()];
1781- sock .readBuf .get (temp );
1782- line = new String (temp );
1741+ String line = new SockInputStream (sock , Integer .MAX_VALUE ).getLine ();
17831742 success = (SYNCED .equals (line )) ? success && true : false ;
17841743 } catch (IOException e ) {
17851744 // exception thrown
0 commit comments