Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,13 @@ public MessageBuffer next() throws IOException {
if(reachedEOF)
return null;

byte[] buffer = null;
int cursor = 0;
while(!reachedEOF && cursor < bufferSize) {
if(buffer == null)
buffer = new byte[bufferSize];

int readLen = in.read(buffer, cursor, bufferSize - cursor);
if(readLen == -1) {
reachedEOF = true;
break;
}
cursor += readLen;
byte[] buffer = new byte[bufferSize];
int readLen = in.read(buffer);
if(readLen == -1) {
reachedEOF = true;
return null;
}

return buffer == null ? null : MessageBuffer.wrap(buffer).slice(0, cursor);
return MessageBuffer.wrap(buffer).slice(0, readLen);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package org.msgpack.core.buffer

import org.msgpack.core.{MessageUnpacker, MessagePack, MessagePackSpec}
import java.io._
import xerial.core.io.IOUtil
import xerial.core.io.IOUtil._
import scala.util.Random
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
import java.nio.ByteBuffer
import org.msgpack.unpacker.MessagePackUnpacker


/**
* Created on 5/30/14.
Expand Down Expand Up @@ -49,7 +49,7 @@ class MessageBufferInputTest extends MessagePackSpec {
val tmp = File.createTempFile("testbuf", ".dat", new File("target"))
tmp.getParentFile.mkdirs()
tmp.deleteOnExit()
IOUtil.withResource(new FileOutputStream(tmp)) { out =>
withResource(new FileOutputStream(tmp)) { out =>
out.write(b)
}
tmp
Expand Down Expand Up @@ -135,6 +135,32 @@ class MessageBufferInputTest extends MessagePackSpec {
buf.reset(in1)
readInt(buf) shouldBe 42
}

"be non-blocking" taggedAs("non-blocking") in {

withResource(new PipedOutputStream()) { pipedOutputStream =>
withResource(new PipedInputStream()) { pipedInputStream =>
pipedInputStream.connect(pipedOutputStream)

val packer = MessagePack.newDefaultPacker(pipedOutputStream)
.packArrayHeader(2)
.packLong(42)
.packString("hello world")

packer.flush

val unpacker = MessagePack.newDefaultUnpacker(pipedInputStream)
unpacker.hasNext() shouldBe true
unpacker.unpackArrayHeader() shouldBe 2
unpacker.unpackLong() shouldBe 42L
unpacker.unpackString() shouldBe "hello world"

packer.close
unpacker.close
}
}
}

}

"ChannelBufferInput" should {
Expand Down