diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java index 7a90b7a9a..a1b28b640 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java @@ -208,36 +208,44 @@ private MessageBuffer prepareNumberBuffer(int readLength) position += readLength; // here assumes following buffer.getXxx never throws exception return buffer; // Return the default buffer } - else if (remaining == 0) { - buffer = getNextBuffer(); - position = readLength; - nextReadPosition = 0; - return buffer; - } else { - // When the default buffer doesn't contain the whole length - - // TODO This doesn't work if MessageBuffer is allocated by newDirectBuffer. - // Add copy method to MessageBuffer to solve this issue. - - // Copy the data fragment from the current buffer - - numberBuffer.putBytes(0, - buffer.array(), buffer.arrayOffset() + position, - remaining); - - // TODO loop this method until castBuffer is filled - MessageBuffer next = getNextBuffer(); + // When the default buffer doesn't contain the whole length, + // fill the temporary buffer from the current data fragment and + // next fragment(s). + + // TODO buffer.array() doesn't work if MessageBuffer is allocated by + // newDirectBuffer. dd copy method to MessageBuffer to solve this issue. + + int off = 0; + if (remaining > 0) { + numberBuffer.putBytes(0, + buffer.array(), buffer.arrayOffset() + position, + remaining); + readLength -= remaining; + off += remaining; + } - numberBuffer.putBytes(remaining, - next.array(), next.arrayOffset(), - readLength - remaining); + while (true) { + nextBuffer(); + int nextSize = buffer.size(); + if (nextSize >= readLength) { + numberBuffer.putBytes(off, + buffer.array(), buffer.arrayOffset(), + readLength); + position = readLength; + break; + } + else { + numberBuffer.putBytes(off, + buffer.array(), buffer.arrayOffset(), + nextSize); + readLength -= nextSize; + off += nextSize; + } + } - buffer = next; - position = readLength - remaining; nextReadPosition = 0; - - return numberBuffer; // Return the numberBuffer + return numberBuffer; } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 033ee86b7..33eecffbb 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -16,17 +16,33 @@ package org.msgpack.core import java.io._ -import java.nio.ByteBuffer import org.msgpack.core.buffer._ import org.msgpack.value.ValueType -import xerial.core.io.IOUtil +import xerial.core.io.IOUtil._ import scala.util.Random -/** - * Created on 2014/05/07. - */ +object MessageUnpackerTest { + class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput { + var cursor = 0 + override def next(): MessageBuffer = { + if (cursor < array.length) { + val a = array(cursor) + cursor += 1 + MessageBuffer.wrap(a) + } + else { + null + } + } + + override def close(): Unit = {} + } +} + +import MessageUnpackerTest._ + class MessageUnpackerTest extends MessagePackSpec { def testData: Array[Byte] = { @@ -246,21 +262,6 @@ class MessageUnpackerTest extends MessagePackSpec { } - class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput { - var cursor = 0 - override def next(): MessageBuffer = { - if (cursor < array.length) { - val a = array(cursor) - cursor += 1 - MessageBuffer.wrap(a) - } - else { - null - } - } - - override def close(): Unit = {} - } "read data at the buffer boundary" taggedAs ("boundary") in { @@ -587,7 +588,7 @@ class MessageUnpackerTest extends MessagePackSpec { val N = 1000 val t = time("unpacker", repeat = 10) { block("no-buffer-reset") { - IOUtil.withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker => + withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker => for (i <- 0 until N) { val buf = new ArrayBufferInput(arr) unpacker.reset(buf) @@ -598,7 +599,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("reuse-array-input") { - IOUtil.withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker => + withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker => val buf = new ArrayBufferInput(arr) for (i <- 0 until N) { buf.reset(arr) @@ -610,7 +611,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("reuse-message-buffer") { - IOUtil.withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker => + withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker => val buf = new ArrayBufferInput(arr) for (i <- 0 until N) { buf.reset(mb) @@ -703,5 +704,32 @@ class MessageUnpackerTest extends MessagePackSpec { Seq(8185, 8186, 8187, 8188, 16377, 16378, 16379, 16380).foreach { n => check(s, n)} } } + + def readTest(input:MessageBufferInput): Unit = { + withResource(MessagePack.newDefaultUnpacker(input)) { unpacker => + while (unpacker.hasNext) { + unpacker.unpackValue() + } + } + } + + "read value length at buffer boundary" taggedAs("number-boundary") in { + val input = new SplitMessageBufferInput(Array( + Array[Byte](MessagePack.Code.STR16), + Array[Byte](0x00), + Array[Byte](0x05), // STR16 length at the boundary + "hello".getBytes(MessagePack.UTF8)) + ) + readTest(input) + + val input2 = new SplitMessageBufferInput(Array( + Array[Byte](MessagePack.Code.STR32), + Array[Byte](0x00), + Array[Byte](0x00, 0x00), + Array[Byte](0x05), // STR32 length at the boundary + "hello".getBytes(MessagePack.UTF8)) + ) + readTest(input2) + } } }