diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java b/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java index 7e420c521..7737aa664 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java @@ -16,6 +16,7 @@ package org.msgpack.core; import org.msgpack.core.buffer.ArrayBufferInput; +import org.msgpack.core.buffer.ByteBufferInput; import org.msgpack.core.buffer.ChannelBufferInput; import org.msgpack.core.buffer.ChannelBufferOutput; import org.msgpack.core.buffer.InputStreamBufferInput; @@ -25,6 +26,7 @@ import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; @@ -236,6 +238,17 @@ public static MessageUnpacker newDefaultUnpacker(byte[] contents, int offset, in return DEFAULT_UNPACKER_CONFIG.newUnpacker(contents, offset, length); } + /** + * Create an unpacker that reads the data from a given ByteBuffer + * + * @param contents + * @return + */ + public static MessageUnpacker newDefaultUnpacker(ByteBuffer contents) + { + return DEFAULT_UNPACKER_CONFIG.newUnpacker(contents); + } + /** * MessagePacker configuration. */ @@ -524,6 +537,17 @@ public MessageUnpacker newUnpacker(byte[] contents, int offset, int length) return newUnpacker(new ArrayBufferInput(contents, offset, length)); } + /** + * Create an unpacker that reads the data from a given ByteBuffer + * + * @param contents + * @return + */ + public MessageUnpacker newUnpacker(ByteBuffer contents) + { + return newUnpacker(new ByteBufferInput(contents)); + } + /** * Allow unpackBinaryHeader to read str format family (default: true) */ diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java index 17dc3a169..7c6bfae70 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java @@ -531,8 +531,7 @@ else if (s.length() < (1 << 8)) { throw new IllegalArgumentException("Unexpected UTF-8 encoder state"); } // move 1 byte backward to expand 3-byte header region to 3 bytes - buffer.putBytes(position + 3, - buffer.array(), buffer.arrayOffset() + position + 2, written); + buffer.putMessageBuffer(position + 3, buffer, position + 2, written); // write 3-byte header buffer.putByte(position++, STR16); buffer.putShort(position, (short) written); @@ -560,8 +559,7 @@ else if (s.length() < (1 << 16)) { throw new IllegalArgumentException("Unexpected UTF-8 encoder state"); } // move 2 bytes backward to expand 3-byte header region to 5 bytes - buffer.putBytes(position + 5, - buffer.array(), buffer.arrayOffset() + position + 3, written); + buffer.putMessageBuffer(position + 5, buffer, position + 3, written); // write 3-byte header header buffer.putByte(position++, STR32); buffer.putInt(position, written); diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java index d36d2825c..399e126dc 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java @@ -213,14 +213,9 @@ private MessageBuffer prepareNumberBuffer(int readLength) // fill the temporary buffer from the current data fragment and // next fragment(s). - // TODO buffer.array() doesn't work if MessageBuffer is allocated by - // newDirectBuffer. dd copy method to MessageBuffer to solve this issue. - int off = 0; if (remaining > 0) { - numberBuffer.putBytes(0, - buffer.array(), buffer.arrayOffset() + position, - remaining); + numberBuffer.putMessageBuffer(0, buffer, position, remaining); readLength -= remaining; off += remaining; } @@ -229,16 +224,12 @@ private MessageBuffer prepareNumberBuffer(int readLength) nextBuffer(); int nextSize = buffer.size(); if (nextSize >= readLength) { - numberBuffer.putBytes(off, - buffer.array(), buffer.arrayOffset(), - readLength); + numberBuffer.putMessageBuffer(off, buffer, 0, readLength); position = readLength; break; } else { - numberBuffer.putBytes(off, - buffer.array(), buffer.arrayOffset(), - nextSize); + numberBuffer.putMessageBuffer(off, buffer, 0, nextSize); readLength -= nextSize; off += nextSize; } @@ -1041,7 +1032,8 @@ private void handleCoderError(CoderResult cr) private String decodeStringFastPath(int length) { if (actionOnMalformedString == CodingErrorAction.REPLACE && - actionOnUnmappableString == CodingErrorAction.REPLACE) { + actionOnUnmappableString == CodingErrorAction.REPLACE && + buffer.hasArray()) { String s = new String(buffer.array(), buffer.arrayOffset() + position, length, MessagePack.UTF8); position += length; return s; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java index 3fbc97208..5c6454e69 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java @@ -15,8 +15,6 @@ // package org.msgpack.core.buffer; -import java.io.IOException; - import static org.msgpack.core.Preconditions.checkNotNull; /** @@ -80,7 +78,6 @@ public void reset(byte[] arr, int offset, int len) @Override public MessageBuffer next() - throws IOException { if (isEmpty) { return null; @@ -91,7 +88,6 @@ public MessageBuffer next() @Override public void close() - throws IOException { buffer = null; isEmpty = true; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java new file mode 100644 index 000000000..fd0311b83 --- /dev/null +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java @@ -0,0 +1,67 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.core.buffer; + +import java.nio.ByteBuffer; + +import static org.msgpack.core.Preconditions.checkNotNull; + +/** + * {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer} + */ +public class ByteBufferInput + implements MessageBufferInput +{ + private ByteBuffer input; + private boolean isRead = false; + + public ByteBufferInput(ByteBuffer input) + { + this.input = checkNotNull(input, "input ByteBuffer is null").slice(); + } + + /** + * Reset buffer. + * + * @param input new buffer + * @return the old buffer + */ + public ByteBuffer reset(ByteBuffer input) + { + ByteBuffer old = this.input; + this.input = checkNotNull(input, "input ByteBuffer is null").slice(); + isRead = false; + return old; + } + + @Override + public MessageBuffer next() + { + if (isRead) { + return null; + } + + MessageBuffer b = MessageBuffer.wrap(input); + isRead = true; + return b; + } + + @Override + public void close() + { + // Nothing to do + } +} diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java index d4d5f2238..50d472ae5 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java @@ -19,6 +19,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -43,6 +44,7 @@ public class MessageBuffer * Reference to MessageBuffer Constructors */ private static final Constructor mbArrConstructor; + private static final Constructor mbBBConstructor; /** * The offset from the object memory header to its byte array data @@ -145,6 +147,11 @@ public class MessageBuffer Constructor mbArrCstr = bufferCls.getDeclaredConstructor(byte[].class, int.class, int.class); mbArrCstr.setAccessible(true); mbArrConstructor = mbArrCstr; + + // MessageBufferX(ByteBuffer) constructor + Constructor mbBBCstr = bufferCls.getDeclaredConstructor(ByteBuffer.class); + mbBBCstr.setAccessible(true); + mbBBConstructor = mbBBCstr; } catch (Exception e) { e.printStackTrace(System.err); @@ -170,21 +177,78 @@ public class MessageBuffer */ protected final int size; - public static MessageBuffer allocate(int length) + /** + * Reference is used to hold a reference to an object that holds the underlying memory so that it cannot be + * released by the garbage collector. + */ + protected final ByteBuffer reference; + + /** + * Allocates a new MessageBuffer backed by a byte array. + * + * @throws IllegalArgumentException If the capacity is a negative integer + * + */ + public static MessageBuffer allocate(int size) { - return wrap(new byte[length]); + if (size < 0) { + throw new IllegalArgumentException("size must not be negative"); + } + return wrap(new byte[size]); } + /** + * Wraps a byte array into a MessageBuffer. + * + * The new MessageBuffer will be backed by the given byte array. Modifications to the new MessageBuffer will cause the byte array to be modified and vice versa. + * + * The new buffer's size will be array.length. hasArray() will return true. + * + * @param array the byte array that will gack this MessageBuffer + * @return + * + */ public static MessageBuffer wrap(byte[] array) { return newMessageBuffer(array, 0, array.length); } + /** + * Wraps a byte array into a MessageBuffer. + * + * The new MessageBuffer will be backed by the given byte array. Modifications to the new MessageBuffer will cause the byte array to be modified and vice versa. + * + * The new buffer's size will be length. hasArray() will return true. + * + * @param array the byte array that will gack this MessageBuffer + * @param offset The offset of the subarray to be used; must be non-negative and no larger than array.length + * @param length The length of the subarray to be used; must be non-negative and no larger than array.length - offset + * @return + * + */ public static MessageBuffer wrap(byte[] array, int offset, int length) { return newMessageBuffer(array, offset, length); } + /** + * Wraps a ByteBuffer into a MessageBuffer. + * + * The new MessageBuffer will be backed by the given byte buffer. Modifications to the new MessageBuffer will cause the byte buffer to be modified and vice versa. However, change of position, limit, or mark of given byte buffer doesn't affect MessageBuffer. + * + * The new buffer's size will be bb.remaining(). hasArray() will return the same result with bb.hasArray(). + * + * @param bb the byte buffer that will gack this MessageBuffer + * @throws IllegalArgumentException given byte buffer returns false both from hasArray() and isDirect() + * @throws UnsupportedOperationException given byte buffer is a direct buffer and this platform doesn't support Unsafe API + * @return + * + */ + public static MessageBuffer wrap(ByteBuffer bb) + { + return newMessageBuffer(bb); + } + /** * Creates a new MessageBuffer instance backed by a java heap array * @@ -194,19 +258,63 @@ public static MessageBuffer wrap(byte[] array, int offset, int length) private static MessageBuffer newMessageBuffer(byte[] arr, int off, int len) { checkNotNull(arr); + return newInstance(mbArrConstructor, arr, off, len); + } + + /** + * Creates a new MessageBuffer instance backed by ByteBuffer + * + * @param bb + * @return + */ + private static MessageBuffer newMessageBuffer(ByteBuffer bb) + { + checkNotNull(bb); + return newInstance(mbBBConstructor, bb); + } + + /** + * Creates a new MessageBuffer instance + * + * @param constructor A MessageBuffer constructor + * @return new MessageBuffer instance + */ + private static MessageBuffer newInstance(Constructor constructor, Object... args) + { try { - return (MessageBuffer) mbArrConstructor.newInstance(arr, off, len); + // We need to use reflection to create MessageBuffer instances in order to prevent TypeProfile generation for getInt method. TypeProfile will be + // generated to resolve one of the method references when two or more classes overrides the method. + return (MessageBuffer) constructor.newInstance(args); + } + catch (InstantiationException e) { + // should never happen + throw new IllegalStateException(e); + } + catch (IllegalAccessException e) { + // should never happen unless security manager restricts this reflection + throw new IllegalStateException(e); } - catch (Throwable e) { - throw new RuntimeException(e); + catch (InvocationTargetException e) { + if (e.getCause() instanceof RuntimeException) { + // underlaying constructor may throw RuntimeException + throw (RuntimeException) e.getCause(); + } + else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } + // should never happen + throw new IllegalStateException(e.getCause()); } } public static void releaseBuffer(MessageBuffer buffer) { - if (isUniversalBuffer || buffer.base instanceof byte[]) { + if (isUniversalBuffer || buffer.hasArray()) { // We have nothing to do. Wait until the garbage-collector collects this array object } + else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.reference)) { + DirectBufferAccess.clean(buffer.reference); + } else { // Maybe cannot reach here unsafe.freeMemory(buffer.address); @@ -222,9 +330,38 @@ public static void releaseBuffer(MessageBuffer buffer) */ MessageBuffer(byte[] arr, int offset, int length) { - this.base = arr; + this.base = arr; // non-null is already checked at newMessageBuffer this.address = ARRAY_BYTE_BASE_OFFSET + offset; this.size = length; + this.reference = null; + } + + /** + * Create a MessageBuffer instance from a given ByteBuffer instance + * + * @param bb + */ + MessageBuffer(ByteBuffer bb) + { + if (bb.isDirect()) { + if (isUniversalBuffer) { + throw new UnsupportedOperationException("Cannot create MessageBuffer from a DirectBuffer on this platform"); + } + // Direct buffer or off-heap memory + this.base = null; + this.address = DirectBufferAccess.getAddress(bb) + bb.position(); + this.size = bb.remaining(); + this.reference = bb; + } + else if (bb.hasArray()) { + this.base = bb.array(); + this.address = ARRAY_BYTE_BASE_OFFSET + bb.arrayOffset() + bb.position(); + this.size = bb.remaining(); + this.reference = null; + } + else { + throw new IllegalArgumentException("Only the array-backed ByteBuffer or DirectBuffer is supported"); + } } protected MessageBuffer(Object base, long address, int length) @@ -232,6 +369,7 @@ protected MessageBuffer(Object base, long address, int length) this.base = base; this.address = address; this.size = length; + this.reference = null; } /** @@ -382,7 +520,7 @@ else if (src.hasArray()) { src.position(src.position() + len); } else { - if (base != null) { + if (hasArray()) { src.get((byte[]) base, index, len); } else { @@ -393,6 +531,11 @@ else if (src.hasArray()) { } } + public void putMessageBuffer(int index, MessageBuffer src, int srcOffset, int len) + { + unsafe.copyMemory(src.base, src.address + srcOffset, base, address + index, len); + } + /** * Create a ByteBuffer view of the range [index, index+length) of this memory * @@ -402,7 +545,13 @@ else if (src.hasArray()) { */ public ByteBuffer sliceAsByteBuffer(int index, int length) { - return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length); + if (hasArray()) { + return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length); + } + else { + assert (!isUniversalBuffer); + return DirectBufferAccess.newByteBuffer(address, index, length, reference); + } } /** @@ -415,6 +564,11 @@ public ByteBuffer sliceAsByteBuffer() return sliceAsByteBuffer(0, size()); } + public boolean hasArray() + { + return base != null; + } + /** * Get a copy of this buffer * diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java index 1326b396e..0676de6da 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java @@ -15,6 +15,8 @@ // package org.msgpack.core.buffer; +import java.nio.ByteBuffer; + import static org.msgpack.core.Preconditions.checkArgument; /** @@ -30,6 +32,11 @@ public class MessageBufferBE super(arr, offset, length); } + MessageBufferBE(ByteBuffer bb) + { + super(bb); + } + private MessageBufferBE(Object base, long address, int length) { super(base, address, length); diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java index 1e8783738..6af9f8d7d 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java @@ -31,9 +31,12 @@ public class MessageBufferU MessageBufferU(byte[] arr, int offset, int length) { super(arr, offset, length); - ByteBuffer bb = ByteBuffer.wrap(arr); - bb.position(offset); - bb.limit(offset + length); + this.wrap = ByteBuffer.wrap(arr, offset, length).slice(); + } + + MessageBufferU(ByteBuffer bb) + { + super(bb); this.wrap = bb.slice(); } @@ -242,6 +245,12 @@ public void copyTo(int index, MessageBuffer dst, int offset, int length) } } + @Override + public void putMessageBuffer(int index, MessageBuffer src, int srcOffset, int len) + { + putBytes(index, src.toByteArray(), srcOffset, len); + } + @Override public byte[] toByteArray() { diff --git a/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java b/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java index b4250c40c..5feb15dbc 100644 --- a/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java +++ b/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java @@ -15,9 +15,13 @@ // package org.msgpack.core.example; -import org.msgpack.core.*; +import org.msgpack.core.MessagePack; import org.msgpack.core.MessagePack.PackerConfig; import org.msgpack.core.MessagePack.UnpackerConfig; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePacker; +import org.msgpack.core.MessageUnpacker; import org.msgpack.value.ArrayValue; import org.msgpack.value.ExtensionValue; import org.msgpack.value.FloatValue; diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 96809f626..1c8864c7c 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -16,6 +16,7 @@ package org.msgpack.core import java.io._ +import java.nio.ByteBuffer import org.msgpack.core.buffer._ import org.msgpack.value.ValueType @@ -45,6 +46,7 @@ import MessageUnpackerTest._ class MessageUnpackerTest extends MessagePackSpec { + val universal = MessageBuffer.allocate(0).isInstanceOf[MessageBufferU] def testData: Array[Byte] = { val out = new ByteArrayOutputStream() val packer = MessagePack.newDefaultPacker(out) @@ -188,33 +190,50 @@ class MessageUnpackerTest extends MessagePackSpec { u.hasNext shouldBe false } + def unpackers(data: Array[Byte]) : Seq[MessageUnpacker] = { + val bb = ByteBuffer.allocate(data.length) + val db = ByteBuffer.allocateDirect(data.length) + bb.put(data).flip() + db.put(data).flip() + val builder = Seq.newBuilder[MessageUnpacker] + builder += MessagePack.newDefaultUnpacker(data) + builder += MessagePack.newDefaultUnpacker(bb) + if (!universal) { + builder += MessagePack.newDefaultUnpacker(db) + } + + builder.result() + } + "MessageUnpacker" should { "parse message packed data" taggedAs ("unpack") in { val arr = testData - val unpacker = MessagePack.newDefaultUnpacker(arr) + for (unpacker <- unpackers(arr)) { - var count = 0 - while (unpacker.hasNext) { - count += 1 - readValue(unpacker) + var count = 0 + while (unpacker.hasNext) { + count += 1 + readValue(unpacker) + } + count shouldBe 6 + unpacker.getTotalReadBytes shouldBe arr.length } - count shouldBe 6 - unpacker.getTotalReadBytes shouldBe arr.length } "skip reading values" in { - val unpacker = MessagePack.newDefaultUnpacker(testData) - var skipCount = 0 - while (unpacker.hasNext) { - unpacker.skipValue() - skipCount += 1 - } + for (unpacker <- unpackers(testData)) { + var skipCount = 0 + while (unpacker.hasNext) { + unpacker.skipValue() + skipCount += 1 + } - skipCount shouldBe 2 - unpacker.getTotalReadBytes shouldBe testData.length + skipCount shouldBe 2 + unpacker.getTotalReadBytes shouldBe testData.length + } } "compare skip performance" taggedAs ("skip") in { @@ -223,21 +242,23 @@ class MessageUnpackerTest extends MessagePackSpec { time("skip performance", repeat = 100) { block("switch") { - val unpacker = MessagePack.newDefaultUnpacker(data) - var skipCount = 0 - while (unpacker.hasNext) { - unpacker.skipValue() - skipCount += 1 + for (unpacker <- unpackers(data)) { + var skipCount = 0 + while (unpacker.hasNext) { + unpacker.skipValue() + skipCount += 1 + } + skipCount shouldBe N } - skipCount shouldBe N } } time("bulk skip performance", repeat = 100) { block("switch") { - val unpacker = MessagePack.newDefaultUnpacker(data) - unpacker.skipValue(N) - unpacker.hasNext shouldBe false + for (unpacker <- unpackers(data)) { + unpacker.skipValue(N) + unpacker.hasNext shouldBe false + } } } @@ -247,26 +268,27 @@ class MessageUnpackerTest extends MessagePackSpec { debug(intSeq.mkString(", ")) - val ib = Seq.newBuilder[Int] + for (unpacker <- unpackers(testData2)) { + val ib = Seq.newBuilder[Int] - val unpacker = MessagePack.newDefaultUnpacker(testData2) - while (unpacker.hasNext) { - val f = unpacker.getNextFormat - f.getValueType match { - case ValueType.INTEGER => - val i = unpacker.unpackInt() - trace(f"read int: $i%,d") - ib += i - case ValueType.BOOLEAN => - val b = unpacker.unpackBoolean() - trace(s"read boolean: $b") - case other => - unpacker.skipValue() + while (unpacker.hasNext) { + val f = unpacker.getNextFormat + f.getValueType match { + case ValueType.INTEGER => + val i = unpacker.unpackInt() + trace(f"read int: $i%,d") + ib += i + case ValueType.BOOLEAN => + val b = unpacker.unpackBoolean() + trace(s"read boolean: $b") + case other => + unpacker.skipValue() + } } - } - ib.result shouldBe intSeq - unpacker.getTotalReadBytes shouldBe testData2.length + ib.result shouldBe intSeq + unpacker.getTotalReadBytes shouldBe testData2.length + } } @@ -276,29 +298,30 @@ class MessageUnpackerTest extends MessagePackSpec { trait SplitTest { val data: Array[Byte] def run { - val unpacker = MessagePack.newDefaultUnpacker(data) - val numElems = { - var c = 0 - while (unpacker.hasNext) { - readValue(unpacker) - c += 1 + for (unpacker <- unpackers(data)) { + val numElems = { + var c = 0 + while (unpacker.hasNext) { + readValue(unpacker) + c += 1 + } + c } - c - } - for (splitPoint <- 1 until data.length - 1) { - debug(s"split at $splitPoint") - val (h, t) = data.splitAt(splitPoint) - val bin = new SplitMessageBufferInput(Array(h, t)) - val unpacker = MessagePack.newDefaultUnpacker(bin) - var count = 0 - while (unpacker.hasNext) { - count += 1 - val f = unpacker.getNextFormat - readValue(unpacker) + for (splitPoint <- 1 until data.length - 1) { + debug(s"split at $splitPoint") + val (h, t) = data.splitAt(splitPoint) + val bin = new SplitMessageBufferInput(Array(h, t)) + val unpacker = MessagePack.newDefaultUnpacker(bin) + var count = 0 + while (unpacker.hasNext) { + count += 1 + val f = unpacker.getNextFormat + readValue(unpacker) + } + count shouldBe numElems + unpacker.getTotalReadBytes shouldBe data.length } - count shouldBe numElems - unpacker.getTotalReadBytes shouldBe data.length } } } @@ -316,7 +339,7 @@ class MessageUnpackerTest extends MessagePackSpec { packer.close val data = packer.toByteArray - val unpacker = MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192)) + val unpacker = MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192)) (0 until 1170).foreach { i => unpacker.unpackLong() shouldBe 0x0011223344556677L unpacker.unpackString() shouldBe "hello" @@ -326,8 +349,28 @@ class MessageUnpackerTest extends MessagePackSpec { "be faster then msgpack-v6 skip" taggedAs ("cmp-skip") in { + trait Fixture { + val unpacker: MessageUnpacker + def run { + var count = 0 + try { + while (unpacker.hasNext) { + unpacker.skipValue() + count += 1 + } + } + finally { + unpacker.close() + } + } + } + val data = testData3(10000) val N = 100 + val bb = ByteBuffer.allocate(data.length) + bb.put(data).flip() + val db = ByteBuffer.allocateDirect(data.length) + db.put(data).flip() val t = time("skip performance", repeat = N) { block("v6") { @@ -348,21 +391,25 @@ class MessageUnpackerTest extends MessagePackSpec { unpacker.close() } - block("v7") { - val unpacker = MessagePack.newDefaultUnpacker(data) - var count = 0 - try { - while (unpacker.hasNext) { - unpacker.skipValue() - count += 1 - } - } - finally - unpacker.close() + block("v7-array") { + new Fixture { override val unpacker = MessagePack.newDefaultUnpacker(data) }.run + } + + block("v7-array-buffer") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(bb) + }.run + } + if (!universal) block("v7-direct-buffer") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(db) + }.run } } - t("v7").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + t("v7-array").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + t("v7-array-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + if (!universal) t("v7-direct-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax } import org.msgpack.`type`.{ValueType => ValueTypeV6} @@ -429,9 +476,28 @@ class MessageUnpackerTest extends MessagePackSpec { unpacker.skipValue() } } + trait Fixture { + val unpacker : MessageUnpacker + def run { + var count = 0 + try { + while (unpacker.hasNext) { + readValue(unpacker) + count += 1 + } + } + finally + unpacker.close() + } + } val data = testData3(10000) val N = 100 + val bb = ByteBuffer.allocate(data.length) + bb.put(data).flip() + val db = ByteBuffer.allocateDirect(data.length) + db.put(data).flip() + val t = time("unpack performance", repeat = N) { block("v6") { val v6 = new org.msgpack.MessagePack() @@ -450,22 +516,26 @@ class MessageUnpackerTest extends MessagePackSpec { unpacker.close() } - block("v7") { - val unpacker = MessagePack.newDefaultUnpacker(data) - var count = 0 - try { - while (unpacker.hasNext) { - readValue(unpacker) - count += 1 - } - } - finally - unpacker.close() + block("v7-array") { + new Fixture { override val unpacker = MessagePack.newDefaultUnpacker(data) }.run } - } - t("v7").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + block("v7-array-buffer") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(bb) + }.run + } + if (!universal) block("v7-direct-buffer") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(db) + }.run + } + } + + t("v7-array").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + t("v7-array-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax + if (!universal) t("v7-direct-buffer").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax } @@ -481,7 +551,41 @@ class MessageUnpackerTest extends MessagePackSpec { } packer.close() + trait Fixture { + val unpacker : MessageUnpacker + val loop : Int + def run { + var i = 0 + try { + while (i < loop) { + val len = unpacker.unpackBinaryHeader() + val out = new Array[Byte](len) + unpacker.readPayload(out, 0, len) + i += 1 + } + } + finally + unpacker.close() + } + def runRef { + var i = 0 + try { + while (i < loop) { + val len = unpacker.unpackBinaryHeader() + val out = unpacker.readPayloadAsReference(len) + i += 1 + } + } + finally + unpacker.close() + } + } val b = bos.toByteArray + val bb = ByteBuffer.allocate(b.length) + bb.put(b).flip() + val db = ByteBuffer.allocateDirect(b.length) + db.put(b).flip() + time("unpackBinary", repeat = 100) { block("v6") { val v6 = new org.msgpack.MessagePack() @@ -494,27 +598,46 @@ class MessageUnpackerTest extends MessagePackSpec { unpacker.close() } - block("v7") { - val unpacker = MessagePack.newDefaultUnpacker(b) - var i = 0 - while (i < R) { - val len = unpacker.unpackBinaryHeader() - val out = new Array[Byte](len) - unpacker.readPayload(out, 0, len) - i += 1 - } - unpacker.close() + block("v7-array") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(b) + override val loop = R + }.run } - block("v7-ref") { - val unpacker = MessagePack.newDefaultUnpacker(b) - var i = 0 - while (i < R) { - val len = unpacker.unpackBinaryHeader() - val out = unpacker.readPayloadAsReference(len) - i += 1 - } - unpacker.close() + block("v7-array-buffer") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(bb) + override val loop = R + }.run + } + + if (!universal) block("v7-direct-buffer") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(db) + override val loop = R + }.run + } + + block("v7-ref-array") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(b) + override val loop = R + }.runRef + } + + block("v7-ref-array-buffer") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(bb) + override val loop = R + }.runRef + } + + if (!universal) block("v7-ref-direct-buffer") { + new Fixture { + override val unpacker = MessagePack.newDefaultUnpacker(db) + override val loop = R + }.runRef } } } @@ -533,16 +656,17 @@ class MessageUnpackerTest extends MessagePackSpec { packer.writePayload(data) packer.close() - val unpacker = MessagePack.newDefaultUnpacker(b.toByteArray) - val len = unpacker.unpackBinaryHeader() - len shouldBe s - val ref = unpacker.readPayloadAsReference(len) - unpacker.close() - ref.size() shouldBe s - val stored = new Array[Byte](len) - ref.getBytes(0, stored, 0, len) + for (unpacker <- unpackers(b.toByteArray)) { + val len = unpacker.unpackBinaryHeader() + len shouldBe s + val ref = unpacker.readPayloadAsReference(len) + unpacker.close() + ref.size() shouldBe s + val stored = new Array[Byte](len) + ref.getBytes(0, stored, 0, len) - stored shouldBe data + stored shouldBe data + } } } @@ -552,35 +676,36 @@ class MessageUnpackerTest extends MessagePackSpec { val data = intSeq val b = createMessagePackData(packer => data foreach packer.packInt) - val unpacker = MessagePack.newDefaultUnpacker(b) + for (unpacker <- unpackers(b)) { - val unpacked = Array.newBuilder[Int] - while (unpacker.hasNext) { - unpacked += unpacker.unpackInt() - } - unpacker.close - unpacked.result shouldBe data - - val data2 = intSeq - val b2 = createMessagePackData(packer => data2 foreach packer.packInt) - val bi = new ArrayBufferInput(b2) - unpacker.reset(bi) - val unpacked2 = Array.newBuilder[Int] - while (unpacker.hasNext) { - unpacked2 += unpacker.unpackInt() - } - unpacker.close - unpacked2.result shouldBe data2 - - // reused the buffer input instance - bi.reset(b2) - unpacker.reset(bi) - val unpacked3 = Array.newBuilder[Int] - while (unpacker.hasNext) { - unpacked3 += unpacker.unpackInt() + val unpacked = Array.newBuilder[Int] + while (unpacker.hasNext) { + unpacked += unpacker.unpackInt() + } + unpacker.close + unpacked.result shouldBe data + + val data2 = intSeq + val b2 = createMessagePackData(packer => data2 foreach packer.packInt) + val bi = new ArrayBufferInput(b2) + unpacker.reset(bi) + val unpacked2 = Array.newBuilder[Int] + while (unpacker.hasNext) { + unpacked2 += unpacker.unpackInt() + } + unpacker.close + unpacked2.result shouldBe data2 + + // reused the buffer input instance + bi.reset(b2) + unpacker.reset(bi) + val unpacked3 = Array.newBuilder[Int] + while (unpacker.hasNext) { + unpacked3 += unpacker.unpackInt() + } + unpacker.close + unpacked3.result shouldBe data2 } - unpacker.close - unpacked3.result shouldBe data2 } @@ -678,13 +803,14 @@ class MessageUnpackerTest extends MessagePackSpec { Seq(8191, 8192, 8193, 16383, 16384, 16385).foreach { n => val arr = createLargeData(n) - val unpacker = MessagePack.newDefaultUnpacker(arr) + for (unpacker <- unpackers(arr)) { - unpacker.unpackArrayHeader shouldBe 2 - unpacker.unpackString.length shouldBe n - unpacker.unpackInt shouldBe 1 + unpacker.unpackArrayHeader shouldBe 2 + unpacker.unpackString.length shouldBe n + unpacker.unpackInt shouldBe 1 - unpacker.getTotalReadBytes shouldBe arr.length + unpacker.getTotalReadBytes shouldBe arr.length + } } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala index 2c080b59a..18876ddb8 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala @@ -43,4 +43,14 @@ class ByteStringTest MessagePack.newDefaultUnpacker(input).unpackString() } + + "Unpacking a ByteString's ByteBuffer" should { + "fail with a regular MessageBuffer" in { + + // can't demonstrate with new ByteBufferInput(byteString.asByteBuffer) + // as Travis tests run with JDK6 that picks up MessageBufferU + a[RuntimeException] shouldBe thrownBy(unpackString(new + MessageBuffer(byteString.asByteBuffer))) + } + } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala index 1638806ee..6b1c0da48 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala @@ -94,6 +94,11 @@ class MessageBufferInputTest ArrayBufferInput(_)) } + "support ByteBuffers" in { + runTest(b => new + ByteBufferInput(b.toByteBuffer)) + } + "support InputStreams" taggedAs ("is") in { runTest(b => new diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala index 75ef00a11..b7871065b 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala @@ -29,24 +29,34 @@ class MessageBufferTest "MessageBuffer" should { + val universal = MessageBuffer.allocate(0).isInstanceOf[MessageBufferU] "check buffer type" in { val b = MessageBuffer.allocate(0) info(s"MessageBuffer type: ${b.getClass.getName}") } - "wrap ByteBuffer considering position and remaining values" taggedAs ("wrap-bb") in { + "wrap byte array considering position and remaining values" taggedAs ("wrap-ba") in { val d = Array[Byte](10, 11, 12, 13, 14, 15, 16, 17, 18, 19) val mb = MessageBuffer.wrap(d, 2, 2) mb.getByte(0) shouldBe 12 mb.size() shouldBe 2 } + "wrap ByteBuffer considering position and remaining values" taggedAs ("wrap-bb") in { + val d = Array[Byte](10, 11, 12, 13, 14, 15, 16, 17, 18, 19) + val subset = ByteBuffer.wrap(d, 2, 2) + val mb = MessageBuffer.wrap(subset) + mb.getByte(0) shouldBe 12 + mb.size() shouldBe 2 + } + "have better performance than ByteBuffer" in { val N = 1000000 val M = 64 * 1024 * 1024 val ub = MessageBuffer.allocate(M) + val ud = if (universal) MessageBuffer.wrap(ByteBuffer.allocate(M)) else MessageBuffer.wrap(ByteBuffer.allocateDirect(M)) val hb = ByteBuffer.allocate(M) val db = ByteBuffer.allocateDirect(M) @@ -82,6 +92,14 @@ class MessageBufferTest } } + block("unsafe direct") { + var i = 0 + while (i < N) { + ud.getInt((i * 4) % M) + i += 1 + } + } + block("allocate") { var i = 0 while (i < N) { @@ -108,6 +126,14 @@ class MessageBufferTest } } + block("unsafe direct") { + var i = 0 + while (i < N) { + ud.getInt((rs(i) * 4) % M) + i += 1 + } + } + block("allocate") { var i = 0 while (i < N) { @@ -125,11 +151,14 @@ class MessageBufferTest } } } + val builder = Seq.newBuilder[MessageBuffer] + builder += MessageBuffer.allocate(10) + builder += MessageBuffer.wrap(ByteBuffer.allocate(10)) + if (!universal) builder += MessageBuffer.wrap(ByteBuffer.allocateDirect(10)) + val buffers = builder.result() "convert to ByteBuffer" in { - for (t <- Seq( - MessageBuffer.allocate(10)) - ) { + for (t <- buffers) { val bb = t.sliceAsByteBuffer bb.position shouldBe 0 bb.limit shouldBe 10 @@ -138,9 +167,7 @@ class MessageBufferTest } "put ByteBuffer on itself" in { - for (t <- Seq( - MessageBuffer.allocate(10)) - ) { + for (t <- buffers) { val b = Array[Byte](0x02, 0x03) val srcArray = ByteBuffer.wrap(b) val srcHeap = ByteBuffer.allocate(b.length) @@ -163,11 +190,45 @@ class MessageBufferTest } } + "put MessageBuffer on itself" in { + for (t <- buffers) { + val b = Array[Byte](0x02, 0x03) + val srcArray = ByteBuffer.wrap(b) + val srcHeap = ByteBuffer.allocate(b.length) + srcHeap.put(b).flip + val srcOffHeap = ByteBuffer.allocateDirect(b.length) + srcOffHeap.put(b).flip + val builder = Seq.newBuilder[ByteBuffer] + builder ++= Seq(srcArray, srcHeap) + if (!universal) builder += srcOffHeap + + for (src <- builder.result().map(d => MessageBuffer.wrap(d))) { + // Write header bytes + val header = Array[Byte](0x00, 0x01) + t.putBytes(0, header, 0, header.length) + // Write src after the header + t.putMessageBuffer(header.length, src, 0, header.length) + + t.getByte(0) shouldBe 0x00 + t.getByte(1) shouldBe 0x01 + t.getByte(2) shouldBe 0x02 + t.getByte(3) shouldBe 0x03 + } + } + } + "copy sliced buffer" in { def prepareBytes : Array[Byte] = { Array[Byte](0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07) } + def prepareDirectBuffer : ByteBuffer = { + val directBuffer = ByteBuffer.allocateDirect(prepareBytes.length) + directBuffer.put(prepareBytes) + directBuffer.flip + directBuffer + } + def checkSliceAndCopyTo(srcBuffer: MessageBuffer, dstBuffer: MessageBuffer) = { val sliced = srcBuffer.slice(2, 5) @@ -191,6 +252,10 @@ class MessageBufferTest } checkSliceAndCopyTo(MessageBuffer.wrap(prepareBytes), MessageBuffer.wrap(prepareBytes)) + checkSliceAndCopyTo(MessageBuffer.wrap(ByteBuffer.wrap(prepareBytes)), MessageBuffer.wrap(ByteBuffer.wrap(prepareBytes))) + if (!universal) { + checkSliceAndCopyTo(MessageBuffer.wrap(prepareDirectBuffer), MessageBuffer.wrap(prepareDirectBuffer)) + } } } }