From 16a38b0a30d6a7129fdc23e3f5878b8a6eac1235 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Mon, 25 May 2015 16:04:26 +0900 Subject: [PATCH 1/2] Fixes #222: Check whthere available() == 0 before calling blocking read() --- .../core/buffer/InputStreamBufferInput.java | 9 ++++-- .../core/buffer/MessageBufferInputTest.scala | 32 +++++++++++++++++-- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java index d9a7c3466..feb473ea3 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java @@ -57,11 +57,14 @@ public MessageBuffer next() throws IOException { byte[] buffer = null; int cursor = 0; while(!reachedEOF && cursor < bufferSize) { - if(buffer == null) + if(buffer == null) { buffer = new byte[bufferSize]; + } - int readLen = in.read(buffer, cursor, bufferSize - cursor); - if(readLen == -1) { + int readLen = -1; + // available() == 0 means, it reached the end of the stream + if(in.available() == 0 || + (readLen = in.read(buffer, cursor, bufferSize - cursor)) == -1) { reachedEOF = true; break; } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala index 379badf7e..8daa5929e 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala @@ -2,11 +2,11 @@ package org.msgpack.core.buffer import org.msgpack.core.{MessageUnpacker, MessagePack, MessagePackSpec} import java.io._ -import xerial.core.io.IOUtil +import xerial.core.io.IOUtil._ import scala.util.Random import java.util.zip.{GZIPOutputStream, GZIPInputStream} import java.nio.ByteBuffer -import org.msgpack.unpacker.MessagePackUnpacker + /** * Created on 5/30/14. @@ -49,7 +49,7 @@ class MessageBufferInputTest extends MessagePackSpec { val tmp = File.createTempFile("testbuf", ".dat", new File("target")) tmp.getParentFile.mkdirs() tmp.deleteOnExit() - IOUtil.withResource(new FileOutputStream(tmp)) { out => + withResource(new FileOutputStream(tmp)) { out => out.write(b) } tmp @@ -135,6 +135,32 @@ class MessageBufferInputTest extends MessagePackSpec { buf.reset(in1) readInt(buf) shouldBe 42 } + + "be non-blocking" taggedAs("non-blocking") in { + + withResource(new PipedOutputStream()) { pipedOutputStream => + withResource(new PipedInputStream()) { pipedInputStream => + pipedInputStream.connect(pipedOutputStream) + + val packer = MessagePack.newDefaultPacker(pipedOutputStream) + .packArrayHeader(2) + .packLong(42) + .packString("hello world") + + packer.flush + + val unpacker = MessagePack.newDefaultUnpacker(pipedInputStream) + unpacker.hasNext() shouldBe true + unpacker.unpackArrayHeader() shouldBe 2 + unpacker.unpackLong() shouldBe 42L + unpacker.unpackString() shouldBe "hello world" + + packer.close + unpacker.close + } + } + } + } "ChannelBufferInput" should { From af428dee61c3468e0db593e26980df2d03444f17 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Tue, 26 May 2015 01:06:32 +0900 Subject: [PATCH 2/2] Use partial read results to avoid I/O block when reading at the end of a stream --- .../core/buffer/InputStreamBufferInput.java | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java index feb473ea3..7ccb61f7c 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java @@ -54,24 +54,13 @@ public MessageBuffer next() throws IOException { if(reachedEOF) return null; - byte[] buffer = null; - int cursor = 0; - while(!reachedEOF && cursor < bufferSize) { - if(buffer == null) { - buffer = new byte[bufferSize]; - } - - int readLen = -1; - // available() == 0 means, it reached the end of the stream - if(in.available() == 0 || - (readLen = in.read(buffer, cursor, bufferSize - cursor)) == -1) { - reachedEOF = true; - break; - } - cursor += readLen; + byte[] buffer = new byte[bufferSize]; + int readLen = in.read(buffer); + if(readLen == -1) { + reachedEOF = true; + return null; } - - return buffer == null ? null : MessageBuffer.wrap(buffer).slice(0, cursor); + return MessageBuffer.wrap(buffer).slice(0, readLen); } @Override