From 446da7db21f70daac7837c15e95be19a7e56346c Mon Sep 17 00:00:00 2001 From: Min Date: Thu, 28 Jul 2016 02:13:26 +0900 Subject: [PATCH 01/12] Add ByteBuffer input support again --- .../java/org/msgpack/core/MessagePack.java | 24 ++ .../java/org/msgpack/core/MessagePacker.java | 6 +- .../org/msgpack/core/MessageUnpacker.java | 18 +- .../msgpack/core/buffer/ByteBufferInput.java | 69 +++++ .../msgpack/core/buffer/MessageBuffer.java | 86 ++++++- .../msgpack/core/MessageUnpackerTest.scala | 236 ++++++++++-------- .../msgpack/core/buffer/ByteStringTest.scala | 10 + .../core/buffer/MessageBufferInputTest.scala | 5 + .../core/buffer/MessageBufferTest.scala | 72 +++++- 9 files changed, 398 insertions(+), 128 deletions(-) create mode 100644 msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java b/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java index 7e420c521..7737aa664 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java @@ -16,6 +16,7 @@ package org.msgpack.core; import org.msgpack.core.buffer.ArrayBufferInput; +import org.msgpack.core.buffer.ByteBufferInput; import org.msgpack.core.buffer.ChannelBufferInput; import org.msgpack.core.buffer.ChannelBufferOutput; import org.msgpack.core.buffer.InputStreamBufferInput; @@ -25,6 +26,7 @@ import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; @@ -236,6 +238,17 @@ public static MessageUnpacker newDefaultUnpacker(byte[] contents, int offset, in return DEFAULT_UNPACKER_CONFIG.newUnpacker(contents, offset, length); } + /** + * Create an unpacker that reads the data from a given ByteBuffer + * + * @param contents + * @return + */ + public static MessageUnpacker newDefaultUnpacker(ByteBuffer contents) + { + return DEFAULT_UNPACKER_CONFIG.newUnpacker(contents); + } + /** * MessagePacker configuration. */ @@ -524,6 +537,17 @@ public MessageUnpacker newUnpacker(byte[] contents, int offset, int length) return newUnpacker(new ArrayBufferInput(contents, offset, length)); } + /** + * Create an unpacker that reads the data from a given ByteBuffer + * + * @param contents + * @return + */ + public MessageUnpacker newUnpacker(ByteBuffer contents) + { + return newUnpacker(new ByteBufferInput(contents)); + } + /** * Allow unpackBinaryHeader to read str format family (default: true) */ diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java index 17dc3a169..7c6bfae70 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java @@ -531,8 +531,7 @@ else if (s.length() < (1 << 8)) { throw new IllegalArgumentException("Unexpected UTF-8 encoder state"); } // move 1 byte backward to expand 3-byte header region to 3 bytes - buffer.putBytes(position + 3, - buffer.array(), buffer.arrayOffset() + position + 2, written); + buffer.putMessageBuffer(position + 3, buffer, position + 2, written); // write 3-byte header buffer.putByte(position++, STR16); buffer.putShort(position, (short) written); @@ -560,8 +559,7 @@ else if (s.length() < (1 << 16)) { throw new IllegalArgumentException("Unexpected UTF-8 encoder state"); } // move 2 bytes backward to expand 3-byte header region to 5 bytes - buffer.putBytes(position + 5, - buffer.array(), buffer.arrayOffset() + position + 3, written); + buffer.putMessageBuffer(position + 5, buffer, position + 3, written); // write 3-byte header header buffer.putByte(position++, STR32); buffer.putInt(position, written); diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java index d36d2825c..399e126dc 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java @@ -213,14 +213,9 @@ private MessageBuffer prepareNumberBuffer(int readLength) // fill the temporary buffer from the current data fragment and // next fragment(s). - // TODO buffer.array() doesn't work if MessageBuffer is allocated by - // newDirectBuffer. dd copy method to MessageBuffer to solve this issue. - int off = 0; if (remaining > 0) { - numberBuffer.putBytes(0, - buffer.array(), buffer.arrayOffset() + position, - remaining); + numberBuffer.putMessageBuffer(0, buffer, position, remaining); readLength -= remaining; off += remaining; } @@ -229,16 +224,12 @@ private MessageBuffer prepareNumberBuffer(int readLength) nextBuffer(); int nextSize = buffer.size(); if (nextSize >= readLength) { - numberBuffer.putBytes(off, - buffer.array(), buffer.arrayOffset(), - readLength); + numberBuffer.putMessageBuffer(off, buffer, 0, readLength); position = readLength; break; } else { - numberBuffer.putBytes(off, - buffer.array(), buffer.arrayOffset(), - nextSize); + numberBuffer.putMessageBuffer(off, buffer, 0, nextSize); readLength -= nextSize; off += nextSize; } @@ -1041,7 +1032,8 @@ private void handleCoderError(CoderResult cr) private String decodeStringFastPath(int length) { if (actionOnMalformedString == CodingErrorAction.REPLACE && - actionOnUnmappableString == CodingErrorAction.REPLACE) { + actionOnUnmappableString == CodingErrorAction.REPLACE && + buffer.hasArray()) { String s = new String(buffer.array(), buffer.arrayOffset() + position, length, MessagePack.UTF8); position += length; return s; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java new file mode 100644 index 000000000..13a7db1d7 --- /dev/null +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java @@ -0,0 +1,69 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.core.buffer; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.msgpack.core.Preconditions.checkNotNull; + +/** + * {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer} + */ +public class ByteBufferInput + implements MessageBufferInput +{ + private ByteBuffer input; + private boolean isRead = false; + + public ByteBufferInput(ByteBuffer input) + { + this.input = checkNotNull(input, "input ByteBuffer is null"); + } + + /** + * Reset buffer. This method doesn't close the old resource. + * + * @param input new buffer + * @return the old resource + */ + public ByteBuffer reset(ByteBuffer input) + { + ByteBuffer old = this.input; + this.input = input; + isRead = false; + return old; + } + + @Override + public MessageBuffer next() + throws IOException + { + if (isRead) { + return null; + } + + isRead = true; + return MessageBuffer.wrap(input); + } + + @Override + public void close() + throws IOException + { + // Nothing to do + } +} \ No newline at end of file diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java index d4d5f2238..d7eff7ba7 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java @@ -43,6 +43,7 @@ public class MessageBuffer * Reference to MessageBuffer Constructors */ private static final Constructor mbArrConstructor; + private static final Constructor mbBBConstructor; /** * The offset from the object memory header to its byte array data @@ -145,6 +146,11 @@ public class MessageBuffer Constructor mbArrCstr = bufferCls.getDeclaredConstructor(byte[].class, int.class, int.class); mbArrCstr.setAccessible(true); mbArrConstructor = mbArrCstr; + + // MessageBufferX(ByteBuffer) constructor + Constructor mbBBCstr = bufferCls.getDeclaredConstructor(ByteBuffer.class); + mbBBCstr.setAccessible(true); + mbBBConstructor = mbBBCstr; } catch (Exception e) { e.printStackTrace(System.err); @@ -170,6 +176,12 @@ public class MessageBuffer */ protected final int size; + /** + * Reference is used to hold a reference to an object that holds the underlying memory so that it cannot be + * released by the garbage collector. + */ + protected final ByteBuffer reference; + public static MessageBuffer allocate(int length) { return wrap(new byte[length]); @@ -185,6 +197,11 @@ public static MessageBuffer wrap(byte[] array, int offset, int length) return newMessageBuffer(array, offset, length); } + public static MessageBuffer wrap(ByteBuffer bb) + { + return newMessageBuffer(bb).slice(bb.position(), bb.remaining()); + } + /** * Creates a new MessageBuffer instance backed by a java heap array * @@ -202,11 +219,32 @@ private static MessageBuffer newMessageBuffer(byte[] arr, int off, int len) } } + /** + * Creates a new MessageBuffer instance backed by ByteBuffer + * + * @param bb + * @return + */ + private static MessageBuffer newMessageBuffer(ByteBuffer bb) + { + checkNotNull(bb); + try { + // We need to use reflection to create MessageBuffer instances in order to prevent TypeProfile generation for getInt method. TypeProfile will be + // generated to resolve one of the method references when two or more classes overrides the method. + return (MessageBuffer) mbBBConstructor.newInstance(bb); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public static void releaseBuffer(MessageBuffer buffer) { if (isUniversalBuffer || buffer.base instanceof byte[]) { // We have nothing to do. Wait until the garbage-collector collects this array object } + else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.base)) { + DirectBufferAccess.clean(buffer.base); + } else { // Maybe cannot reach here unsafe.freeMemory(buffer.address); @@ -225,6 +263,35 @@ public static void releaseBuffer(MessageBuffer buffer) this.base = arr; this.address = ARRAY_BYTE_BASE_OFFSET + offset; this.size = length; + this.reference = null; + } + + /** + * Create a MessageBuffer instance from a given ByteBuffer instance + * + * @param bb + */ + MessageBuffer(ByteBuffer bb) + { + if (bb.isDirect()) { + if (isUniversalBuffer) { + throw new IllegalStateException("Cannot create MessageBuffer from DirectBuffer"); + } + // Direct buffer or off-heap memory + this.base = null; + this.address = DirectBufferAccess.getAddress(bb); + this.size = bb.capacity(); + this.reference = bb; + } + else if (bb.hasArray()) { + this.base = bb.array(); + this.address = ARRAY_BYTE_BASE_OFFSET; + this.size = bb.array().length; + this.reference = null; + } + else { + throw new IllegalArgumentException("Only the array-backed ByteBuffer or DirectBuffer are supported"); + } } protected MessageBuffer(Object base, long address, int length) @@ -232,6 +299,7 @@ protected MessageBuffer(Object base, long address, int length) this.base = base; this.address = address; this.size = length; + this.reference = null; } /** @@ -393,6 +461,11 @@ else if (src.hasArray()) { } } + public void putMessageBuffer(int index, MessageBuffer src, int srcOffset, int len) + { + unsafe.copyMemory(src.base, src.address + srcOffset, base, address + index, len); + } + /** * Create a ByteBuffer view of the range [index, index+length) of this memory * @@ -402,7 +475,13 @@ else if (src.hasArray()) { */ public ByteBuffer sliceAsByteBuffer(int index, int length) { - return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length); + if (hasArray()) { + return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length); + } + else { + assert (!isUniversalBuffer); + return DirectBufferAccess.newByteBuffer(address, index, length, reference); + } } /** @@ -415,6 +494,11 @@ public ByteBuffer sliceAsByteBuffer() return sliceAsByteBuffer(0, size()); } + public boolean hasArray() + { + return base instanceof byte[]; + } + /** * Get a copy of this buffer * diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 96809f626..173620282 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -16,6 +16,7 @@ package org.msgpack.core import java.io._ +import java.nio.ByteBuffer import org.msgpack.core.buffer._ import org.msgpack.value.ValueType @@ -188,33 +189,47 @@ class MessageUnpackerTest extends MessagePackSpec { u.hasNext shouldBe false } + def unpackers(data: Array[Byte]) : Seq[MessageUnpacker] = { + val bb = ByteBuffer.allocate(data.length) + val db = ByteBuffer.allocateDirect(data.length) + bb.put(data).flip() + db.put(data).flip() + Seq( + MessagePack.newDefaultUnpacker(data), + MessagePack.newDefaultUnpacker(bb), + MessagePack.newDefaultUnpacker(db) + ) + } + "MessageUnpacker" should { "parse message packed data" taggedAs ("unpack") in { val arr = testData - val unpacker = MessagePack.newDefaultUnpacker(arr) + for (unpacker <- unpackers(arr)) { - var count = 0 - while (unpacker.hasNext) { - count += 1 - readValue(unpacker) + var count = 0 + while (unpacker.hasNext) { + count += 1 + readValue(unpacker) + } + count shouldBe 6 + unpacker.getTotalReadBytes shouldBe arr.length } - count shouldBe 6 - unpacker.getTotalReadBytes shouldBe arr.length } "skip reading values" in { - val unpacker = MessagePack.newDefaultUnpacker(testData) - var skipCount = 0 - while (unpacker.hasNext) { - unpacker.skipValue() - skipCount += 1 - } + for (unpacker <- unpackers(testData)) { + var skipCount = 0 + while (unpacker.hasNext) { + unpacker.skipValue() + skipCount += 1 + } - skipCount shouldBe 2 - unpacker.getTotalReadBytes shouldBe testData.length + skipCount shouldBe 2 + unpacker.getTotalReadBytes shouldBe testData.length + } } "compare skip performance" taggedAs ("skip") in { @@ -223,21 +238,23 @@ class MessageUnpackerTest extends MessagePackSpec { time("skip performance", repeat = 100) { block("switch") { - val unpacker = MessagePack.newDefaultUnpacker(data) - var skipCount = 0 - while (unpacker.hasNext) { - unpacker.skipValue() - skipCount += 1 + for (unpacker <- unpackers(data)) { + var skipCount = 0 + while (unpacker.hasNext) { + unpacker.skipValue() + skipCount += 1 + } + skipCount shouldBe N } - skipCount shouldBe N } } time("bulk skip performance", repeat = 100) { block("switch") { - val unpacker = MessagePack.newDefaultUnpacker(data) - unpacker.skipValue(N) - unpacker.hasNext shouldBe false + for (unpacker <- unpackers(data)) { + unpacker.skipValue(N) + unpacker.hasNext shouldBe false + } } } @@ -247,26 +264,27 @@ class MessageUnpackerTest extends MessagePackSpec { debug(intSeq.mkString(", ")) - val ib = Seq.newBuilder[Int] + for (unpacker <- unpackers(testData2)) { + val ib = Seq.newBuilder[Int] - val unpacker = MessagePack.newDefaultUnpacker(testData2) - while (unpacker.hasNext) { - val f = unpacker.getNextFormat - f.getValueType match { - case ValueType.INTEGER => - val i = unpacker.unpackInt() - trace(f"read int: $i%,d") - ib += i - case ValueType.BOOLEAN => - val b = unpacker.unpackBoolean() - trace(s"read boolean: $b") - case other => - unpacker.skipValue() + while (unpacker.hasNext) { + val f = unpacker.getNextFormat + f.getValueType match { + case ValueType.INTEGER => + val i = unpacker.unpackInt() + trace(f"read int: $i%,d") + ib += i + case ValueType.BOOLEAN => + val b = unpacker.unpackBoolean() + trace(s"read boolean: $b") + case other => + unpacker.skipValue() + } } - } - ib.result shouldBe intSeq - unpacker.getTotalReadBytes shouldBe testData2.length + ib.result shouldBe intSeq + unpacker.getTotalReadBytes shouldBe testData2.length + } } @@ -276,29 +294,30 @@ class MessageUnpackerTest extends MessagePackSpec { trait SplitTest { val data: Array[Byte] def run { - val unpacker = MessagePack.newDefaultUnpacker(data) - val numElems = { - var c = 0 - while (unpacker.hasNext) { - readValue(unpacker) - c += 1 + for (unpacker <- unpackers(data)) { + val numElems = { + var c = 0 + while (unpacker.hasNext) { + readValue(unpacker) + c += 1 + } + c } - c - } - for (splitPoint <- 1 until data.length - 1) { - debug(s"split at $splitPoint") - val (h, t) = data.splitAt(splitPoint) - val bin = new SplitMessageBufferInput(Array(h, t)) - val unpacker = MessagePack.newDefaultUnpacker(bin) - var count = 0 - while (unpacker.hasNext) { - count += 1 - val f = unpacker.getNextFormat - readValue(unpacker) + for (splitPoint <- 1 until data.length - 1) { + debug(s"split at $splitPoint") + val (h, t) = data.splitAt(splitPoint) + val bin = new SplitMessageBufferInput(Array(h, t)) + val unpacker = MessagePack.newDefaultUnpacker(bin) + var count = 0 + while (unpacker.hasNext) { + count += 1 + val f = unpacker.getNextFormat + readValue(unpacker) + } + count shouldBe numElems + unpacker.getTotalReadBytes shouldBe data.length } - count shouldBe numElems - unpacker.getTotalReadBytes shouldBe data.length } } } @@ -316,7 +335,7 @@ class MessageUnpackerTest extends MessagePackSpec { packer.close val data = packer.toByteArray - val unpacker = MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192)) + val unpacker = MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192)) (0 until 1170).foreach { i => unpacker.unpackLong() shouldBe 0x0011223344556677L unpacker.unpackString() shouldBe "hello" @@ -349,7 +368,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("v7") { - val unpacker = MessagePack.newDefaultUnpacker(data) + val unpacker = Random.shuffle(unpackers(data)).head var count = 0 try { while (unpacker.hasNext) { @@ -451,7 +470,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("v7") { - val unpacker = MessagePack.newDefaultUnpacker(data) + val unpacker = Random.shuffle(unpackers(data)).head var count = 0 try { while (unpacker.hasNext) { @@ -495,7 +514,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("v7") { - val unpacker = MessagePack.newDefaultUnpacker(b) + val unpacker = Random.shuffle(unpackers(b)).head var i = 0 while (i < R) { val len = unpacker.unpackBinaryHeader() @@ -507,7 +526,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("v7-ref") { - val unpacker = MessagePack.newDefaultUnpacker(b) + val unpacker = Random.shuffle(unpackers(b)).head var i = 0 while (i < R) { val len = unpacker.unpackBinaryHeader() @@ -533,16 +552,17 @@ class MessageUnpackerTest extends MessagePackSpec { packer.writePayload(data) packer.close() - val unpacker = MessagePack.newDefaultUnpacker(b.toByteArray) - val len = unpacker.unpackBinaryHeader() - len shouldBe s - val ref = unpacker.readPayloadAsReference(len) - unpacker.close() - ref.size() shouldBe s - val stored = new Array[Byte](len) - ref.getBytes(0, stored, 0, len) + for (unpacker <- unpackers(b.toByteArray)) { + val len = unpacker.unpackBinaryHeader() + len shouldBe s + val ref = unpacker.readPayloadAsReference(len) + unpacker.close() + ref.size() shouldBe s + val stored = new Array[Byte](len) + ref.getBytes(0, stored, 0, len) - stored shouldBe data + stored shouldBe data + } } } @@ -552,35 +572,36 @@ class MessageUnpackerTest extends MessagePackSpec { val data = intSeq val b = createMessagePackData(packer => data foreach packer.packInt) - val unpacker = MessagePack.newDefaultUnpacker(b) + for (unpacker <- unpackers(b)) { - val unpacked = Array.newBuilder[Int] - while (unpacker.hasNext) { - unpacked += unpacker.unpackInt() - } - unpacker.close - unpacked.result shouldBe data - - val data2 = intSeq - val b2 = createMessagePackData(packer => data2 foreach packer.packInt) - val bi = new ArrayBufferInput(b2) - unpacker.reset(bi) - val unpacked2 = Array.newBuilder[Int] - while (unpacker.hasNext) { - unpacked2 += unpacker.unpackInt() - } - unpacker.close - unpacked2.result shouldBe data2 - - // reused the buffer input instance - bi.reset(b2) - unpacker.reset(bi) - val unpacked3 = Array.newBuilder[Int] - while (unpacker.hasNext) { - unpacked3 += unpacker.unpackInt() + val unpacked = Array.newBuilder[Int] + while (unpacker.hasNext) { + unpacked += unpacker.unpackInt() + } + unpacker.close + unpacked.result shouldBe data + + val data2 = intSeq + val b2 = createMessagePackData(packer => data2 foreach packer.packInt) + val bi = new ArrayBufferInput(b2) + unpacker.reset(bi) + val unpacked2 = Array.newBuilder[Int] + while (unpacker.hasNext) { + unpacked2 += unpacker.unpackInt() + } + unpacker.close + unpacked2.result shouldBe data2 + + // reused the buffer input instance + bi.reset(b2) + unpacker.reset(bi) + val unpacked3 = Array.newBuilder[Int] + while (unpacker.hasNext) { + unpacked3 += unpacker.unpackInt() + } + unpacker.close + unpacked3.result shouldBe data2 } - unpacker.close - unpacked3.result shouldBe data2 } @@ -678,13 +699,14 @@ class MessageUnpackerTest extends MessagePackSpec { Seq(8191, 8192, 8193, 16383, 16384, 16385).foreach { n => val arr = createLargeData(n) - val unpacker = MessagePack.newDefaultUnpacker(arr) + for (unpacker <- unpackers(arr)) { - unpacker.unpackArrayHeader shouldBe 2 - unpacker.unpackString.length shouldBe n - unpacker.unpackInt shouldBe 1 + unpacker.unpackArrayHeader shouldBe 2 + unpacker.unpackString.length shouldBe n + unpacker.unpackInt shouldBe 1 - unpacker.getTotalReadBytes shouldBe arr.length + unpacker.getTotalReadBytes shouldBe arr.length + } } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala index 2c080b59a..18876ddb8 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala @@ -43,4 +43,14 @@ class ByteStringTest MessagePack.newDefaultUnpacker(input).unpackString() } + + "Unpacking a ByteString's ByteBuffer" should { + "fail with a regular MessageBuffer" in { + + // can't demonstrate with new ByteBufferInput(byteString.asByteBuffer) + // as Travis tests run with JDK6 that picks up MessageBufferU + a[RuntimeException] shouldBe thrownBy(unpackString(new + MessageBuffer(byteString.asByteBuffer))) + } + } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala index 1638806ee..6b1c0da48 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala @@ -94,6 +94,11 @@ class MessageBufferInputTest ArrayBufferInput(_)) } + "support ByteBuffers" in { + runTest(b => new + ByteBufferInput(b.toByteBuffer)) + } + "support InputStreams" taggedAs ("is") in { runTest(b => new diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala index 75ef00a11..a585b9537 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala @@ -34,19 +34,28 @@ class MessageBufferTest info(s"MessageBuffer type: ${b.getClass.getName}") } - "wrap ByteBuffer considering position and remaining values" taggedAs ("wrap-bb") in { + "wrap byte array considering position and remaining values" taggedAs ("wrap-ba") in { val d = Array[Byte](10, 11, 12, 13, 14, 15, 16, 17, 18, 19) val mb = MessageBuffer.wrap(d, 2, 2) mb.getByte(0) shouldBe 12 mb.size() shouldBe 2 } + "wrap ByteBuffer considering position and remaining values" taggedAs ("wrap-bb") in { + val d = Array[Byte](10, 11, 12, 13, 14, 15, 16, 17, 18, 19) + val subset = ByteBuffer.wrap(d, 2, 2) + val mb = MessageBuffer.wrap(subset) + mb.getByte(0) shouldBe 12 + mb.size() shouldBe 2 + } + "have better performance than ByteBuffer" in { val N = 1000000 val M = 64 * 1024 * 1024 val ub = MessageBuffer.allocate(M) + val ud = MessageBuffer.wrap(ByteBuffer.allocateDirect(M)) val hb = ByteBuffer.allocate(M) val db = ByteBuffer.allocateDirect(M) @@ -82,6 +91,14 @@ class MessageBufferTest } } + block("unsafe direct") { + var i = 0 + while (i < N) { + ud.getInt((i * 4) % M) + i += 1 + } + } + block("allocate") { var i = 0 while (i < N) { @@ -108,6 +125,14 @@ class MessageBufferTest } } + block("unsafe direct") { + var i = 0 + while (i < N) { + ud.getInt((rs(i) * 4) % M) + i += 1 + } + } + block("allocate") { var i = 0 while (i < N) { @@ -128,7 +153,9 @@ class MessageBufferTest "convert to ByteBuffer" in { for (t <- Seq( - MessageBuffer.allocate(10)) + MessageBuffer.allocate(10), + MessageBuffer.wrap(ByteBuffer.allocate(10)), + MessageBuffer.wrap(ByteBuffer.allocateDirect(10))) ) { val bb = t.sliceAsByteBuffer bb.position shouldBe 0 @@ -139,7 +166,9 @@ class MessageBufferTest "put ByteBuffer on itself" in { for (t <- Seq( - MessageBuffer.allocate(10)) + MessageBuffer.allocate(10), + MessageBuffer.wrap(ByteBuffer.allocate(10)), + MessageBuffer.wrap(ByteBuffer.allocateDirect(10))) ) { val b = Array[Byte](0x02, 0x03) val srcArray = ByteBuffer.wrap(b) @@ -163,11 +192,46 @@ class MessageBufferTest } } + "put MessageBuffer on itself" in { + for (t <- Seq( + MessageBuffer.allocate(10), + MessageBuffer.wrap(ByteBuffer.allocate(10)), + MessageBuffer.wrap(ByteBuffer.allocateDirect(10))) + ) { + val b = Array[Byte](0x02, 0x03) + val srcArray = ByteBuffer.wrap(b) + val srcHeap = ByteBuffer.allocate(b.length) + srcHeap.put(b).flip + val srcOffHeap = ByteBuffer.allocateDirect(b.length) + srcOffHeap.put(b).flip + + for (src <- Seq(MessageBuffer.wrap(srcArray), MessageBuffer.wrap(srcHeap), MessageBuffer.wrap(srcOffHeap))) { + // Write header bytes + val header = Array[Byte](0x00, 0x01) + t.putBytes(0, header, 0, header.length) + // Write src after the header + t.putMessageBuffer(header.length, src, 0, header.length) + + t.getByte(0) shouldBe 0x00 + t.getByte(1) shouldBe 0x01 + t.getByte(2) shouldBe 0x02 + t.getByte(3) shouldBe 0x03 + } + } + } + "copy sliced buffer" in { def prepareBytes : Array[Byte] = { Array[Byte](0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07) } + def prepareDirectBuffer : ByteBuffer = { + val directBuffer = ByteBuffer.allocateDirect(prepareBytes.length) + directBuffer.put(prepareBytes) + directBuffer.flip + directBuffer + } + def checkSliceAndCopyTo(srcBuffer: MessageBuffer, dstBuffer: MessageBuffer) = { val sliced = srcBuffer.slice(2, 5) @@ -191,6 +255,8 @@ class MessageBufferTest } checkSliceAndCopyTo(MessageBuffer.wrap(prepareBytes), MessageBuffer.wrap(prepareBytes)) + checkSliceAndCopyTo(MessageBuffer.wrap(ByteBuffer.wrap(prepareBytes)), MessageBuffer.wrap(ByteBuffer.wrap(prepareBytes))) + checkSliceAndCopyTo(MessageBuffer.wrap(prepareDirectBuffer), MessageBuffer.wrap(prepareDirectBuffer)) } } } From b1db42dfe0c5d4ab4a9d654a7ab9acc28c19c6d7 Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Wed, 27 Jul 2016 14:22:44 -0700 Subject: [PATCH 02/12] ArrayBufferInput and ByteBufferInput will never throw IOException --- .../main/java/org/msgpack/core/buffer/ArrayBufferInput.java | 2 -- .../main/java/org/msgpack/core/buffer/ByteBufferInput.java | 4 +--- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java index 3fbc97208..3d11ad9ed 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java @@ -80,7 +80,6 @@ public void reset(byte[] arr, int offset, int len) @Override public MessageBuffer next() - throws IOException { if (isEmpty) { return null; @@ -91,7 +90,6 @@ public MessageBuffer next() @Override public void close() - throws IOException { buffer = null; isEmpty = true; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java index 13a7db1d7..41936d9ab 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java @@ -50,7 +50,6 @@ public ByteBuffer reset(ByteBuffer input) @Override public MessageBuffer next() - throws IOException { if (isRead) { return null; @@ -62,8 +61,7 @@ public MessageBuffer next() @Override public void close() - throws IOException { // Nothing to do } -} \ No newline at end of file +} From 2908767605d4c00f083f3a647dddc04d55eba136 Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Wed, 27 Jul 2016 14:23:22 -0700 Subject: [PATCH 03/12] ByteBufferInput should not be affected when given ByteBuffer's position is modified --- .../java/org/msgpack/core/buffer/ByteBufferInput.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java index 41936d9ab..f644d18d1 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java @@ -31,19 +31,19 @@ public class ByteBufferInput public ByteBufferInput(ByteBuffer input) { - this.input = checkNotNull(input, "input ByteBuffer is null"); + this.input = checkNotNull(input, "input ByteBuffer is null").slice(); } /** - * Reset buffer. This method doesn't close the old resource. + * Reset buffer. * * @param input new buffer - * @return the old resource + * @return the old buffer */ public ByteBuffer reset(ByteBuffer input) { ByteBuffer old = this.input; - this.input = input; + this.input = checkNotNull(input, "input ByteBuffer is null").slice(); isRead = false; return old; } @@ -55,8 +55,9 @@ public MessageBuffer next() return null; } + MessageBuffer b = MessageBuffer.wrap(input); isRead = true; - return MessageBuffer.wrap(input); + return b; } @Override From 131eda02bc7d54518fdf069496e55235dfe499fe Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Wed, 27 Jul 2016 14:23:54 -0700 Subject: [PATCH 04/12] fix import lines at MessagePackExample that made sbt failed --- .../java/org/msgpack/core/example/MessagePackExample.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java b/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java index b4250c40c..5feb15dbc 100644 --- a/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java +++ b/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java @@ -15,9 +15,13 @@ // package org.msgpack.core.example; -import org.msgpack.core.*; +import org.msgpack.core.MessagePack; import org.msgpack.core.MessagePack.PackerConfig; import org.msgpack.core.MessagePack.UnpackerConfig; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePacker; +import org.msgpack.core.MessageUnpacker; import org.msgpack.value.ArrayValue; import org.msgpack.value.ExtensionValue; import org.msgpack.value.FloatValue; From 51e4a0a18d594c93121e9da4211aee285e663cb2 Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Wed, 27 Jul 2016 14:24:51 -0700 Subject: [PATCH 05/12] MessageBuffer: exception and validation fixes with docs --- .../msgpack/core/buffer/MessageBuffer.java | 116 +++++++++++++++--- 1 file changed, 101 insertions(+), 15 deletions(-) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java index d7eff7ba7..3fa6a3be2 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java @@ -19,6 +19,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -182,24 +183,76 @@ public class MessageBuffer */ protected final ByteBuffer reference; - public static MessageBuffer allocate(int length) + /** + * Allocates a new MessageBuffer backed by a byte array. + * + * @throws IllegalArgumentException If the capacity is a negative integer + * + */ + public static MessageBuffer allocate(int size) { - return wrap(new byte[length]); + if (size < 0) { + throw new IllegalArgumentException("size must not be negative"); + } + return wrap(new byte[size]); } + /** + * Wraps a ByteBuffer into a MessageBuffer. + * + * The new MessageBuffer will be backed by the given byte array. Modifications to the new MessageBuffer will cause the byte array to be modified and vice versa. + * + * The new buffer's size will be array.length. hasArray() will return true. + * + * @param array the byte array that will gack this MessageBuffer + * @return + * + */ public static MessageBuffer wrap(byte[] array) { return newMessageBuffer(array, 0, array.length); } + /** + * Wraps a ByteBuffer into a MessageBuffer. + * + * The new MessageBuffer will be backed by the given byte array. Modifications to the new MessageBuffer will cause the byte array to be modified and vice versa. + * + * The new buffer's size will be length. hasArray() will return true. + * + * @param array the byte array that will gack this MessageBuffer + * @param offset The offset of the subarray to be used; must be non-negative and no larger than array.length + * @param length The length of the subarray to be used; must be non-negative and no larger than array.length - offset + * @return + * + */ public static MessageBuffer wrap(byte[] array, int offset, int length) { return newMessageBuffer(array, offset, length); } + /** + * Wraps a ByteBuffer into a MessageBuffer. + * + * The new MessageBuffer will be backed by the given byte buffer. Modifications to the new MessageBuffer will cause the byte buffer to be modified and vice versa. However, change of position, limit, or mark of given byte buffer doesn't affect MessageBuffer. + * + * The new buffer's size will be bb.remaining(). hasArray() will return the same result with bb.hasArray(). + * + * @param bb the byte buffer that will gack this MessageBuffer + * @throws IllegalArgumentException given byte buffer returns false both from hasArray() and isDirect() + * @throws UnsupportedOperationException given byte buffer is a direct buffer and this platform doesn't support Unsafe API + * @return + * + */ public static MessageBuffer wrap(ByteBuffer bb) { - return newMessageBuffer(bb).slice(bb.position(), bb.remaining()); + MessageBuffer b = newMessageBuffer(bb); + if (bb.position() > 0 || bb.limit() != bb.capacity()) { + return b.slice(bb.position(), bb.remaining()); + } + else { + return b; + } } /** @@ -214,8 +267,24 @@ private static MessageBuffer newMessageBuffer(byte[] arr, int off, int len) try { return (MessageBuffer) mbArrConstructor.newInstance(arr, off, len); } - catch (Throwable e) { - throw new RuntimeException(e); + catch (InstantiationException e) { + // should never happen + throw new IllegalStateException(e); + } + catch (IllegalAccessException e) { + // should never happen unless security manager restricts this reflection + throw new IllegalStateException(e); + } + catch (InvocationTargetException e) { + if (e.getCause() instanceof RuntimeException) { + // underlaying constructor may throw RuntimeException + throw (RuntimeException) e.getCause(); + } + else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } + // should never happen + throw new IllegalStateException(e.getCause()); } } @@ -232,18 +301,35 @@ private static MessageBuffer newMessageBuffer(ByteBuffer bb) // We need to use reflection to create MessageBuffer instances in order to prevent TypeProfile generation for getInt method. TypeProfile will be // generated to resolve one of the method references when two or more classes overrides the method. return (MessageBuffer) mbBBConstructor.newInstance(bb); - } catch (Exception e) { - throw new RuntimeException(e); + } + catch (InstantiationException e) { + // should never happen + throw new IllegalStateException(e); + } + catch (IllegalAccessException e) { + // should never happen unless security manager restricts this reflection + throw new IllegalStateException(e); + } + catch (InvocationTargetException e) { + if (e.getCause() instanceof RuntimeException) { + // underlaying constructor may throw RuntimeException + throw (RuntimeException) e.getCause(); + } + else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } + // should never happen + throw new IllegalStateException(e.getCause()); } } public static void releaseBuffer(MessageBuffer buffer) { - if (isUniversalBuffer || buffer.base instanceof byte[]) { + if (isUniversalBuffer || buffer.base != null) { // We have nothing to do. Wait until the garbage-collector collects this array object } - else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.base)) { - DirectBufferAccess.clean(buffer.base); + else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.reference)) { + DirectBufferAccess.clean(buffer.reference); } else { // Maybe cannot reach here @@ -260,7 +346,7 @@ else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.base)) { */ MessageBuffer(byte[] arr, int offset, int length) { - this.base = arr; + this.base = arr; // non-null is already checked at newMessageBuffer this.address = ARRAY_BYTE_BASE_OFFSET + offset; this.size = length; this.reference = null; @@ -275,7 +361,7 @@ else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.base)) { { if (bb.isDirect()) { if (isUniversalBuffer) { - throw new IllegalStateException("Cannot create MessageBuffer from DirectBuffer"); + throw new UnsupportedOperationException("Cannot create MessageBuffer from a DirectBuffer on this platform"); } // Direct buffer or off-heap memory this.base = null; @@ -290,7 +376,7 @@ else if (bb.hasArray()) { this.reference = null; } else { - throw new IllegalArgumentException("Only the array-backed ByteBuffer or DirectBuffer are supported"); + throw new IllegalArgumentException("Only the array-backed ByteBuffer or DirectBuffer is supported"); } } @@ -475,7 +561,7 @@ public void putMessageBuffer(int index, MessageBuffer src, int srcOffset, int le */ public ByteBuffer sliceAsByteBuffer(int index, int length) { - if (hasArray()) { + if (base != null) { return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length); } else { @@ -496,7 +582,7 @@ public ByteBuffer sliceAsByteBuffer() public boolean hasArray() { - return base instanceof byte[]; + return base != null; } /** From 4cb93245f809dface61c5c3647a41924c05fdc6d Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Thu, 28 Jul 2016 17:21:49 -0700 Subject: [PATCH 06/12] MessageBuffer.base != null check should all call hasArray for ease of future change --- .../main/java/org/msgpack/core/buffer/MessageBuffer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java index 3fa6a3be2..1d32d2b1d 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java @@ -325,7 +325,7 @@ else if (e.getCause() instanceof Error) { public static void releaseBuffer(MessageBuffer buffer) { - if (isUniversalBuffer || buffer.base != null) { + if (isUniversalBuffer || buffer.hasArray()) { // We have nothing to do. Wait until the garbage-collector collects this array object } else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.reference)) { @@ -536,7 +536,7 @@ else if (src.hasArray()) { src.position(src.position() + len); } else { - if (base != null) { + if (hasArray()) { src.get((byte[]) base, index, len); } else { @@ -561,7 +561,7 @@ public void putMessageBuffer(int index, MessageBuffer src, int srcOffset, int le */ public ByteBuffer sliceAsByteBuffer(int index, int length) { - if (base != null) { + if (hasArray()) { return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length); } else { From ee8c341c1f8bdc23cadb980a433dfe619109eb4e Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Thu, 28 Jul 2016 17:22:23 -0700 Subject: [PATCH 07/12] removed nunsed import --- .../src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java | 2 -- .../src/main/java/org/msgpack/core/buffer/ByteBufferInput.java | 1 - 2 files changed, 3 deletions(-) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java index 3d11ad9ed..5c6454e69 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java @@ -15,8 +15,6 @@ // package org.msgpack.core.buffer; -import java.io.IOException; - import static org.msgpack.core.Preconditions.checkNotNull; /** diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java index f644d18d1..fd0311b83 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java @@ -15,7 +15,6 @@ // package org.msgpack.core.buffer; -import java.io.IOException; import java.nio.ByteBuffer; import static org.msgpack.core.Preconditions.checkNotNull; From 5c77085e7b212bb7105c786bf4aaa18f3814c5de Mon Sep 17 00:00:00 2001 From: "Min(Dongmin Yu)" Date: Thu, 4 Aug 2016 12:50:50 +0900 Subject: [PATCH 08/12] Refactor creating MessageBuffer instance --- .../msgpack/core/buffer/MessageBuffer.java | 40 +++++++------------ 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java index 1d32d2b1d..0f4b39e0b 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java @@ -198,7 +198,7 @@ public static MessageBuffer allocate(int size) } /** - * Wraps a ByteBuffer into a MessageBuffer. + * Wraps a byte array into a MessageBuffer. * * The new MessageBuffer will be backed by the given byte array. Modifications to the new MessageBuffer will cause the byte array to be modified and vice versa. * @@ -214,7 +214,7 @@ public static MessageBuffer wrap(byte[] array) } /** - * Wraps a ByteBuffer into a MessageBuffer. + * Wraps a byte array into a MessageBuffer. * * The new MessageBuffer will be backed by the given byte array. Modifications to the new MessageBuffer will cause the byte array to be modified and vice versa. * @@ -264,28 +264,7 @@ public static MessageBuffer wrap(ByteBuffer bb) private static MessageBuffer newMessageBuffer(byte[] arr, int off, int len) { checkNotNull(arr); - try { - return (MessageBuffer) mbArrConstructor.newInstance(arr, off, len); - } - catch (InstantiationException e) { - // should never happen - throw new IllegalStateException(e); - } - catch (IllegalAccessException e) { - // should never happen unless security manager restricts this reflection - throw new IllegalStateException(e); - } - catch (InvocationTargetException e) { - if (e.getCause() instanceof RuntimeException) { - // underlaying constructor may throw RuntimeException - throw (RuntimeException) e.getCause(); - } - else if (e.getCause() instanceof Error) { - throw (Error) e.getCause(); - } - // should never happen - throw new IllegalStateException(e.getCause()); - } + return newInstance(mbArrConstructor, arr, off, len); } /** @@ -297,10 +276,21 @@ else if (e.getCause() instanceof Error) { private static MessageBuffer newMessageBuffer(ByteBuffer bb) { checkNotNull(bb); + return newInstance(mbBBConstructor, bb); + } + + /** + * Creates a new MessageBuffer instance + * + * @param constructor A MessageBuffer constructor + * @return new MessageBuffer instance + */ + private static MessageBuffer newInstance(Constructor constructor, Object... args) + { try { // We need to use reflection to create MessageBuffer instances in order to prevent TypeProfile generation for getInt method. TypeProfile will be // generated to resolve one of the method references when two or more classes overrides the method. - return (MessageBuffer) mbBBConstructor.newInstance(bb); + return (MessageBuffer) constructor.newInstance(args); } catch (InstantiationException e) { // should never happen From 4ce46a3b987c5d60bd100015ad9da242fe5d03cd Mon Sep 17 00:00:00 2001 From: "Min(Dongmin Yu)" Date: Tue, 9 Aug 2016 10:40:33 +0900 Subject: [PATCH 09/12] Add more tests for ByteBuffer --- .../msgpack/core/MessageUnpackerTest.scala | 189 ++++++++++++++---- 1 file changed, 145 insertions(+), 44 deletions(-) diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 173620282..3aaedb8b8 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -345,6 +345,22 @@ class MessageUnpackerTest extends MessagePackSpec { "be faster then msgpack-v6 skip" taggedAs ("cmp-skip") in { + trait Fixture { + val unpacker: MessageUnpacker + def run { + var count = 0 + try { + while (unpacker.hasNext) { + unpacker.skipValue() + count += 1 + } + } + finally { + unpacker.close() + } + } + } + val data = testData3(10000) val N = 100 @@ -367,21 +383,29 @@ class MessageUnpackerTest extends MessagePackSpec { unpacker.close() } - block("v7") { - val unpacker = Random.shuffle(unpackers(data)).head - var count = 0 - try { - while (unpacker.hasNext) { - unpacker.skipValue() - count += 1 - } - } - finally - unpacker.close() + block("v7-array") { + new Fixture { override val unpacker = MessagePack.newDefaultUnpacker(data) }.run + } + + block("v7-array-buffer") { + new Fixture { + val bb = ByteBuffer.allocate(data.length) + bb.put(data).flip() + override val unpacker = MessagePack.newDefaultUnpacker(bb) + }.run + } + block("v7-direct-buffer") { + new Fixture { + val db = ByteBuffer.allocateDirect(data.length) + db.put(data).flip() + override val unpacker = MessagePack.newDefaultUnpacker(db) + }.run } } - t("v7").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + t("v7-array").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + t("v7-array-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + t("v7-direct-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax } import org.msgpack.`type`.{ValueType => ValueTypeV6} @@ -448,6 +472,20 @@ class MessageUnpackerTest extends MessagePackSpec { unpacker.skipValue() } } + trait Fixture { + val unpacker : MessageUnpacker + def run { + var count = 0 + try { + while (unpacker.hasNext) { + readValue(unpacker) + count += 1 + } + } + finally + unpacker.close() + } + } val data = testData3(10000) val N = 100 @@ -469,22 +507,29 @@ class MessageUnpackerTest extends MessagePackSpec { unpacker.close() } - block("v7") { - val unpacker = Random.shuffle(unpackers(data)).head - var count = 0 - try { - while (unpacker.hasNext) { - readValue(unpacker) - count += 1 - } - } - finally - unpacker.close() + block("v7-array") { + new Fixture { override val unpacker = MessagePack.newDefaultUnpacker(data) }.run } - } - t("v7").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + block("v7-array-buffer") { + new Fixture { + val bb = ByteBuffer.allocate(data.length) + bb.put(data).flip() + override val unpacker = MessagePack.newDefaultUnpacker(bb) + }.run + } + block("v7-direct-buffer") { + new Fixture { + val db = ByteBuffer.allocateDirect(data.length) + db.put(data).flip() + override val unpacker = MessagePack.newDefaultUnpacker(db) + }.run + } + } + t("v7-array").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + t("v7-array-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + t("v7-direct-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax } @@ -500,6 +545,35 @@ class MessageUnpackerTest extends MessagePackSpec { } packer.close() + trait Fixture { + val unpacker : MessageUnpacker + val loop : Int + def run { + var i = 0 + try { + while (i < loop) { + val len = unpacker.unpackBinaryHeader() + val out = new Array[Byte](len) + unpacker.readPayload(out, 0, len) + i += 1 + } + } + finally + unpacker.close() + } + def runRef { + var i = 0 + try { + while (i < loop) { + val len = unpacker.unpackBinaryHeader() + val out = unpacker.readPayloadAsReference(len) + i += 1 + } + } + finally + unpacker.close() + } + } val b = bos.toByteArray time("unpackBinary", repeat = 100) { block("v6") { @@ -513,27 +587,54 @@ class MessageUnpackerTest extends MessagePackSpec { unpacker.close() } - block("v7") { - val unpacker = Random.shuffle(unpackers(b)).head - var i = 0 - while (i < R) { - val len = unpacker.unpackBinaryHeader() - val out = new Array[Byte](len) - unpacker.readPayload(out, 0, len) - i += 1 - } - unpacker.close() + block("v7-array") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(b) + override val loop = R + }.run } - block("v7-ref") { - val unpacker = Random.shuffle(unpackers(b)).head - var i = 0 - while (i < R) { - val len = unpacker.unpackBinaryHeader() - val out = unpacker.readPayloadAsReference(len) - i += 1 - } - unpacker.close() + block("v7-array-buffer") { + new Fixture { + val bb = ByteBuffer.allocate(b.length) + bb.put(b).flip() + override val unpacker = MessagePack.newDefaultUnpacker(bb) + override val loop = R + }.run + } + + block("v7-direct-buffer") { + new Fixture { + val db = ByteBuffer.allocateDirect(b.length) + db.put(b).flip() + override val unpacker = MessagePack.newDefaultUnpacker(db) + override val loop = R + }.run + } + + block("v7-ref-array") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(b) + override val loop = R + }.run + } + + block("v7-ref-array-buffer") { + new Fixture { + val bb = ByteBuffer.allocate(b.length) + bb.put(b).flip() + override val unpacker = MessagePack.newDefaultUnpacker(bb) + override val loop = R + }.run + } + + block("v7-ref-direct-buffer") { + new Fixture { + val db = ByteBuffer.allocateDirect(b.length) + db.put(b).flip() + override val unpacker = MessagePack.newDefaultUnpacker(db) + override val loop = R + }.run } } } From c24eb5f7684bd7cd183e69f452dd6f52019d23b6 Mon Sep 17 00:00:00 2001 From: Min Date: Wed, 10 Aug 2016 17:16:41 +0900 Subject: [PATCH 10/12] Add ByteBuffer constructor at MessageBufferU and MessageBufferBE --- .../main/java/org/msgpack/core/buffer/MessageBufferBE.java | 7 +++++++ .../main/java/org/msgpack/core/buffer/MessageBufferU.java | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java index 1326b396e..0676de6da 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java @@ -15,6 +15,8 @@ // package org.msgpack.core.buffer; +import java.nio.ByteBuffer; + import static org.msgpack.core.Preconditions.checkArgument; /** @@ -30,6 +32,11 @@ public class MessageBufferBE super(arr, offset, length); } + MessageBufferBE(ByteBuffer bb) + { + super(bb); + } + private MessageBufferBE(Object base, long address, int length) { super(base, address, length); diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java index 1e8783738..1ac4b129c 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java @@ -37,6 +37,12 @@ public class MessageBufferU this.wrap = bb.slice(); } + MessageBufferU(ByteBuffer bb) + { + super(bb); + this.wrap = bb.slice(); + } + private MessageBufferU(Object base, long address, int length, ByteBuffer wrap) { super(base, address, length); From f09f595b16dac8d3814f4ddacc1b4e30dfbb2d0c Mon Sep 17 00:00:00 2001 From: "Min(Dongmin Yu)" Date: Fri, 12 Aug 2016 14:11:09 +0900 Subject: [PATCH 11/12] Fix test failures at universal MessageBuffer --- .../msgpack/core/buffer/MessageBuffer.java | 16 +++------ .../msgpack/core/buffer/MessageBufferU.java | 11 +++--- .../msgpack/core/MessageUnpackerTest.scala | 27 ++++++++------ .../core/buffer/MessageBufferTest.scala | 35 +++++++++---------- 4 files changed, 45 insertions(+), 44 deletions(-) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java index 0f4b39e0b..50d472ae5 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java @@ -246,13 +246,7 @@ public static MessageBuffer wrap(byte[] array, int offset, int length) */ public static MessageBuffer wrap(ByteBuffer bb) { - MessageBuffer b = newMessageBuffer(bb); - if (bb.position() > 0 || bb.limit() != bb.capacity()) { - return b.slice(bb.position(), bb.remaining()); - } - else { - return b; - } + return newMessageBuffer(bb); } /** @@ -355,14 +349,14 @@ else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.reference)) { } // Direct buffer or off-heap memory this.base = null; - this.address = DirectBufferAccess.getAddress(bb); - this.size = bb.capacity(); + this.address = DirectBufferAccess.getAddress(bb) + bb.position(); + this.size = bb.remaining(); this.reference = bb; } else if (bb.hasArray()) { this.base = bb.array(); - this.address = ARRAY_BYTE_BASE_OFFSET; - this.size = bb.array().length; + this.address = ARRAY_BYTE_BASE_OFFSET + bb.arrayOffset() + bb.position(); + this.size = bb.remaining(); this.reference = null; } else { diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java index 1ac4b129c..6af9f8d7d 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java @@ -31,10 +31,7 @@ public class MessageBufferU MessageBufferU(byte[] arr, int offset, int length) { super(arr, offset, length); - ByteBuffer bb = ByteBuffer.wrap(arr); - bb.position(offset); - bb.limit(offset + length); - this.wrap = bb.slice(); + this.wrap = ByteBuffer.wrap(arr, offset, length).slice(); } MessageBufferU(ByteBuffer bb) @@ -248,6 +245,12 @@ public void copyTo(int index, MessageBuffer dst, int offset, int length) } } + @Override + public void putMessageBuffer(int index, MessageBuffer src, int srcOffset, int len) + { + putBytes(index, src.toByteArray(), srcOffset, len); + } + @Override public byte[] toByteArray() { diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 3aaedb8b8..8677644ff 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -46,6 +46,7 @@ import MessageUnpackerTest._ class MessageUnpackerTest extends MessagePackSpec { + val universal = MessageBuffer.allocate(0).isInstanceOf[MessageBufferU] def testData: Array[Byte] = { val out = new ByteArrayOutputStream() val packer = MessagePack.newDefaultPacker(out) @@ -194,11 +195,14 @@ class MessageUnpackerTest extends MessagePackSpec { val db = ByteBuffer.allocateDirect(data.length) bb.put(data).flip() db.put(data).flip() - Seq( - MessagePack.newDefaultUnpacker(data), - MessagePack.newDefaultUnpacker(bb), - MessagePack.newDefaultUnpacker(db) - ) + val builder = Seq.newBuilder[MessageUnpacker] + builder += MessagePack.newDefaultUnpacker(data) + builder += MessagePack.newDefaultUnpacker(bb) + if (!universal) { + builder += MessagePack.newDefaultUnpacker(db) + } + + builder.result() } "MessageUnpacker" should { @@ -394,7 +398,7 @@ class MessageUnpackerTest extends MessagePackSpec { override val unpacker = MessagePack.newDefaultUnpacker(bb) }.run } - block("v7-direct-buffer") { + if (!universal) block("v7-direct-buffer") { new Fixture { val db = ByteBuffer.allocateDirect(data.length) db.put(data).flip() @@ -405,7 +409,7 @@ class MessageUnpackerTest extends MessagePackSpec { t("v7-array").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax t("v7-array-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax - t("v7-direct-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + if (!universal) t("v7-direct-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax } import org.msgpack.`type`.{ValueType => ValueTypeV6} @@ -518,7 +522,8 @@ class MessageUnpackerTest extends MessagePackSpec { override val unpacker = MessagePack.newDefaultUnpacker(bb) }.run } - block("v7-direct-buffer") { + + if (!universal) block("v7-direct-buffer") { new Fixture { val db = ByteBuffer.allocateDirect(data.length) db.put(data).flip() @@ -529,7 +534,7 @@ class MessageUnpackerTest extends MessagePackSpec { t("v7-array").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax t("v7-array-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax - t("v7-direct-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + if (!universal) t("v7-direct-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax } @@ -603,7 +608,7 @@ class MessageUnpackerTest extends MessagePackSpec { }.run } - block("v7-direct-buffer") { + if (!universal) block("v7-direct-buffer") { new Fixture { val db = ByteBuffer.allocateDirect(b.length) db.put(b).flip() @@ -628,7 +633,7 @@ class MessageUnpackerTest extends MessagePackSpec { }.run } - block("v7-ref-direct-buffer") { + if (!universal) block("v7-ref-direct-buffer") { new Fixture { val db = ByteBuffer.allocateDirect(b.length) db.put(b).flip() diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala index a585b9537..b7871065b 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala @@ -29,6 +29,7 @@ class MessageBufferTest "MessageBuffer" should { + val universal = MessageBuffer.allocate(0).isInstanceOf[MessageBufferU] "check buffer type" in { val b = MessageBuffer.allocate(0) info(s"MessageBuffer type: ${b.getClass.getName}") @@ -55,7 +56,7 @@ class MessageBufferTest val M = 64 * 1024 * 1024 val ub = MessageBuffer.allocate(M) - val ud = MessageBuffer.wrap(ByteBuffer.allocateDirect(M)) + val ud = if (universal) MessageBuffer.wrap(ByteBuffer.allocate(M)) else MessageBuffer.wrap(ByteBuffer.allocateDirect(M)) val hb = ByteBuffer.allocate(M) val db = ByteBuffer.allocateDirect(M) @@ -150,13 +151,14 @@ class MessageBufferTest } } } + val builder = Seq.newBuilder[MessageBuffer] + builder += MessageBuffer.allocate(10) + builder += MessageBuffer.wrap(ByteBuffer.allocate(10)) + if (!universal) builder += MessageBuffer.wrap(ByteBuffer.allocateDirect(10)) + val buffers = builder.result() "convert to ByteBuffer" in { - for (t <- Seq( - MessageBuffer.allocate(10), - MessageBuffer.wrap(ByteBuffer.allocate(10)), - MessageBuffer.wrap(ByteBuffer.allocateDirect(10))) - ) { + for (t <- buffers) { val bb = t.sliceAsByteBuffer bb.position shouldBe 0 bb.limit shouldBe 10 @@ -165,11 +167,7 @@ class MessageBufferTest } "put ByteBuffer on itself" in { - for (t <- Seq( - MessageBuffer.allocate(10), - MessageBuffer.wrap(ByteBuffer.allocate(10)), - MessageBuffer.wrap(ByteBuffer.allocateDirect(10))) - ) { + for (t <- buffers) { val b = Array[Byte](0x02, 0x03) val srcArray = ByteBuffer.wrap(b) val srcHeap = ByteBuffer.allocate(b.length) @@ -193,19 +191,18 @@ class MessageBufferTest } "put MessageBuffer on itself" in { - for (t <- Seq( - MessageBuffer.allocate(10), - MessageBuffer.wrap(ByteBuffer.allocate(10)), - MessageBuffer.wrap(ByteBuffer.allocateDirect(10))) - ) { + for (t <- buffers) { val b = Array[Byte](0x02, 0x03) val srcArray = ByteBuffer.wrap(b) val srcHeap = ByteBuffer.allocate(b.length) srcHeap.put(b).flip val srcOffHeap = ByteBuffer.allocateDirect(b.length) srcOffHeap.put(b).flip + val builder = Seq.newBuilder[ByteBuffer] + builder ++= Seq(srcArray, srcHeap) + if (!universal) builder += srcOffHeap - for (src <- Seq(MessageBuffer.wrap(srcArray), MessageBuffer.wrap(srcHeap), MessageBuffer.wrap(srcOffHeap))) { + for (src <- builder.result().map(d => MessageBuffer.wrap(d))) { // Write header bytes val header = Array[Byte](0x00, 0x01) t.putBytes(0, header, 0, header.length) @@ -256,7 +253,9 @@ class MessageBufferTest checkSliceAndCopyTo(MessageBuffer.wrap(prepareBytes), MessageBuffer.wrap(prepareBytes)) checkSliceAndCopyTo(MessageBuffer.wrap(ByteBuffer.wrap(prepareBytes)), MessageBuffer.wrap(ByteBuffer.wrap(prepareBytes))) - checkSliceAndCopyTo(MessageBuffer.wrap(prepareDirectBuffer), MessageBuffer.wrap(prepareDirectBuffer)) + if (!universal) { + checkSliceAndCopyTo(MessageBuffer.wrap(prepareDirectBuffer), MessageBuffer.wrap(prepareDirectBuffer)) + } } } } From ed1c3a33c9aea7e7410a8b69ff654fe66f5d833f Mon Sep 17 00:00:00 2001 From: "Min(Dongmin Yu)" Date: Fri, 12 Aug 2016 14:50:41 +0900 Subject: [PATCH 12/12] Remove side effect of unpacker benchmark --- .../msgpack/core/MessageUnpackerTest.scala | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 8677644ff..1c8864c7c 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -367,6 +367,10 @@ class MessageUnpackerTest extends MessagePackSpec { val data = testData3(10000) val N = 100 + val bb = ByteBuffer.allocate(data.length) + bb.put(data).flip() + val db = ByteBuffer.allocateDirect(data.length) + db.put(data).flip() val t = time("skip performance", repeat = N) { block("v6") { @@ -393,15 +397,11 @@ class MessageUnpackerTest extends MessagePackSpec { block("v7-array-buffer") { new Fixture { - val bb = ByteBuffer.allocate(data.length) - bb.put(data).flip() override val unpacker = MessagePack.newDefaultUnpacker(bb) }.run } if (!universal) block("v7-direct-buffer") { new Fixture { - val db = ByteBuffer.allocateDirect(data.length) - db.put(data).flip() override val unpacker = MessagePack.newDefaultUnpacker(db) }.run } @@ -493,6 +493,11 @@ class MessageUnpackerTest extends MessagePackSpec { val data = testData3(10000) val N = 100 + val bb = ByteBuffer.allocate(data.length) + bb.put(data).flip() + val db = ByteBuffer.allocateDirect(data.length) + db.put(data).flip() + val t = time("unpack performance", repeat = N) { block("v6") { val v6 = new org.msgpack.MessagePack() @@ -517,16 +522,12 @@ class MessageUnpackerTest extends MessagePackSpec { block("v7-array-buffer") { new Fixture { - val bb = ByteBuffer.allocate(data.length) - bb.put(data).flip() override val unpacker = MessagePack.newDefaultUnpacker(bb) }.run } if (!universal) block("v7-direct-buffer") { new Fixture { - val db = ByteBuffer.allocateDirect(data.length) - db.put(data).flip() override val unpacker = MessagePack.newDefaultUnpacker(db) }.run } @@ -580,6 +581,11 @@ class MessageUnpackerTest extends MessagePackSpec { } } val b = bos.toByteArray + val bb = ByteBuffer.allocate(b.length) + bb.put(b).flip() + val db = ByteBuffer.allocateDirect(b.length) + db.put(b).flip() + time("unpackBinary", repeat = 100) { block("v6") { val v6 = new org.msgpack.MessagePack() @@ -601,8 +607,6 @@ class MessageUnpackerTest extends MessagePackSpec { block("v7-array-buffer") { new Fixture { - val bb = ByteBuffer.allocate(b.length) - bb.put(b).flip() override val unpacker = MessagePack.newDefaultUnpacker(bb) override val loop = R }.run @@ -610,8 +614,6 @@ class MessageUnpackerTest extends MessagePackSpec { if (!universal) block("v7-direct-buffer") { new Fixture { - val db = ByteBuffer.allocateDirect(b.length) - db.put(b).flip() override val unpacker = MessagePack.newDefaultUnpacker(db) override val loop = R }.run @@ -621,25 +623,21 @@ class MessageUnpackerTest extends MessagePackSpec { new Fixture { override val unpacker = MessagePack.newDefaultUnpacker(b) override val loop = R - }.run + }.runRef } block("v7-ref-array-buffer") { new Fixture { - val bb = ByteBuffer.allocate(b.length) - bb.put(b).flip() override val unpacker = MessagePack.newDefaultUnpacker(bb) override val loop = R - }.run + }.runRef } if (!universal) block("v7-ref-direct-buffer") { new Fixture { - val db = ByteBuffer.allocateDirect(b.length) - db.put(b).flip() override val unpacker = MessagePack.newDefaultUnpacker(db) override val loop = R - }.run + }.runRef } } }