diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java index d9a7c3466..7ccb61f7c 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java @@ -54,21 +54,13 @@ public MessageBuffer next() throws IOException { if(reachedEOF) return null; - byte[] buffer = null; - int cursor = 0; - while(!reachedEOF && cursor < bufferSize) { - if(buffer == null) - buffer = new byte[bufferSize]; - - int readLen = in.read(buffer, cursor, bufferSize - cursor); - if(readLen == -1) { - reachedEOF = true; - break; - } - cursor += readLen; + byte[] buffer = new byte[bufferSize]; + int readLen = in.read(buffer); + if(readLen == -1) { + reachedEOF = true; + return null; } - - return buffer == null ? null : MessageBuffer.wrap(buffer).slice(0, cursor); + return MessageBuffer.wrap(buffer).slice(0, readLen); } @Override diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala index 379badf7e..8daa5929e 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala @@ -2,11 +2,11 @@ package org.msgpack.core.buffer import org.msgpack.core.{MessageUnpacker, MessagePack, MessagePackSpec} import java.io._ -import xerial.core.io.IOUtil +import xerial.core.io.IOUtil._ import scala.util.Random import java.util.zip.{GZIPOutputStream, GZIPInputStream} import java.nio.ByteBuffer -import org.msgpack.unpacker.MessagePackUnpacker + /** * Created on 5/30/14. @@ -49,7 +49,7 @@ class MessageBufferInputTest extends MessagePackSpec { val tmp = File.createTempFile("testbuf", ".dat", new File("target")) tmp.getParentFile.mkdirs() tmp.deleteOnExit() - IOUtil.withResource(new FileOutputStream(tmp)) { out => + withResource(new FileOutputStream(tmp)) { out => out.write(b) } tmp @@ -135,6 +135,32 @@ class MessageBufferInputTest extends MessagePackSpec { buf.reset(in1) readInt(buf) shouldBe 42 } + + "be non-blocking" taggedAs("non-blocking") in { + + withResource(new PipedOutputStream()) { pipedOutputStream => + withResource(new PipedInputStream()) { pipedInputStream => + pipedInputStream.connect(pipedOutputStream) + + val packer = MessagePack.newDefaultPacker(pipedOutputStream) + .packArrayHeader(2) + .packLong(42) + .packString("hello world") + + packer.flush + + val unpacker = MessagePack.newDefaultUnpacker(pipedInputStream) + unpacker.hasNext() shouldBe true + unpacker.unpackArrayHeader() shouldBe 2 + unpacker.unpackLong() shouldBe 42L + unpacker.unpackString() shouldBe "hello world" + + packer.close + unpacker.close + } + } + } + } "ChannelBufferInput" should {