Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public MessageUnpacker(byte[] arr) {
* @param in
*/
public MessageUnpacker(InputStream in) {
this(new InputStreamBufferInput(in));
this(InputStreamBufferInput.newBufferInput(in));
}

/**
Expand Down Expand Up @@ -992,6 +992,10 @@ public void readPayload(ByteBuffer dst) throws IOException {
}
}

public void readPayload(byte[] dst) throws IOException {
readPayload(dst, 0, dst.length);
}

/**
* Read up to len bytes of data into the destination array
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ public class ArrayBufferInput implements MessageBufferInput {

private MessageBuffer buffer;
private boolean isRead = false;
private final int length;

public ArrayBufferInput(byte[] arr) {
this.buffer = MessageBuffer.wrap(checkNotNull(arr, "input array is null"));
this(arr, 0, arr.length);
}

public ArrayBufferInput(byte[] arr, int offset, int length) {
this.buffer = MessageBuffer.wrap(checkNotNull(arr, "input array is null")).slice(offset, length);
checkArgument(length <= arr.length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check offset + length <= arr.length?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I will fix this statement.

this.length = length;
}


@Override
public MessageBuffer next() throws IOException {
if(isRead) {
if(isRead)
return null;
} else {
isRead = true;
return buffer;
}
isRead = true;
return buffer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good optimization 👍

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.msgpack.core.buffer;

import java.io.IOException;
import java.nio.ByteBuffer;
import static org.msgpack.core.Preconditions.*;

/**
* {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer}
*/
public class ByteBufferInput implements MessageBufferInput {

private final ByteBuffer input;
private boolean isRead = false;

public ByteBufferInput(ByteBuffer input) {
this.input = checkNotNull(input, "input ByteBuffer is null");
}


@Override
public MessageBuffer next() throws IOException {
if(isRead)
return null;

isRead = true;
return MessageBuffer.wrap(input);
}


@Override
public void close() throws IOException {
// Nothing to do
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,43 @@
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

import static org.msgpack.core.Preconditions.checkNotNull;
import static org.msgpack.core.Preconditions.*;

/**
* {@link MessageBufferInput} adapter for {@link java.nio.channels.ReadableByteChannel}
*/
public class ChannelBufferInput implements MessageBufferInput {

private final ReadableByteChannel channel;
private boolean reachedEOF = false;
private final int bufferSize;

public ChannelBufferInput(ReadableByteChannel channel) {
this(channel, 8192);
}

public ChannelBufferInput(ReadableByteChannel channel, int bufferSize) {
this.channel = checkNotNull(channel, "input channel is null");
checkArgument(bufferSize > 0, "buffer size must be > 0: " + bufferSize);
this.bufferSize = bufferSize;
}

@Override
public MessageBuffer next() throws IOException {
MessageBuffer m = MessageBuffer.newBuffer(8192);
ByteBuffer b = m.toByteBuffer(0, m.size);
for(int ret = 0; (ret = channel.read(b)) != -1; ) {

if(reachedEOF)
return null;

MessageBuffer m = MessageBuffer.newBuffer(bufferSize);
ByteBuffer b = m.toByteBuffer();
while(!reachedEOF && b.remaining() > 0) {
int ret = channel.read(b);
if(ret == -1) {
reachedEOF = true;
}
}
b.flip();
if(b.remaining() < m.size)
return m.slice(0, b.remaining());
else
return m;
return b.remaining() == 0 ? null : m.slice(0, b.limit());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.msgpack.core.buffer;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;

import static org.msgpack.core.Preconditions.checkNotNull;

Expand All @@ -10,29 +13,82 @@
*/
public class InputStreamBufferInput implements MessageBufferInput {


private static Field bufField;
private static Field bufPosField;
private static Field bufCountField;

private static Field getField(String name) {
Field f = null;
try {
f = ByteArrayInputStream.class.getDeclaredField(name);
f.setAccessible(true);
}
catch(Exception e) {
e.printStackTrace();
}
return f;
}

static {
bufField = getField("buf");
bufPosField = getField("pos");
bufCountField = getField("count");
}

private final InputStream in;
private byte[] buffer = new byte[8192];
private final int bufferSize;
private boolean reachedEOF = false;

public static MessageBufferInput newBufferInput(InputStream in) {
if(in instanceof ByteArrayInputStream) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instanceof ByteArrayInputStream is not safe because in could be an instance of extended type of ByteArrayInputStream which overrides read(byte[]) method.
This is safe: ByteArrayInputStream.class.equals(in.getClass())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. We should compare the exact class type.

ByteArrayInputStream b = (ByteArrayInputStream) in;
try {
// Extract a raw byte array from the ByteArrayInputStream
byte[] buf = (byte[]) bufField.get(b);
int pos = (Integer) bufPosField.get(b);
int length = (Integer) bufCountField.get(b);
return new ArrayBufferInput(buf, pos, length);
}
catch(Exception e) {
// Failed to retrieve the raw byte array
}
} else if (in instanceof FileInputStream) {
return new ChannelBufferInput(((FileInputStream) in).getChannel());
}

return new InputStreamBufferInput(in);
}

public InputStreamBufferInput(InputStream in) {
this(in, 8192);
}

public InputStreamBufferInput(InputStream in, int bufferSize) {
this.in = checkNotNull(in, "input is null");
this.bufferSize = bufferSize;
}

@Override
public MessageBuffer next() throws IOException {
// Manage the allocated buffers
MessageBuffer m = MessageBuffer.newBuffer(buffer.length);
if(reachedEOF)
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need reachedEOF field? I think we can assume InputStream.read always returns -1 once it returns -1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested to read InputStream after it returns -1. Instead of returning -1, it blocks. I used GZipInputStream(ByteArrayInputStream) as an InputStream. So we should use reachedEOF here.


// TODO reduce the number of memory copy
byte[] buffer = null;
int cursor = 0;
while(cursor < buffer.length) {
int readLen = in.read(buffer, cursor, buffer.length - cursor);
while(!reachedEOF && cursor < bufferSize) {
if(buffer == null)
buffer = new byte[bufferSize];

int readLen = in.read(buffer, cursor, bufferSize - cursor);
if(readLen == -1) {
reachedEOF = true;
break;
}
cursor += readLen;
}
m.putBytes(0, buffer, 0, cursor);
return m;

return buffer == null ? null : MessageBuffer.wrap(buffer).slice(0, cursor);
}

@Override
Expand All @@ -41,7 +97,7 @@ public void close() throws IOException {
in.close();
}
finally {
buffer = null;

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ private MessageBuffer(Object base, long address, int length, ByteBuffer referenc

public MessageBuffer slice(int offset, int length) {
// TODO ensure deleting this slice does not collapse this MessageBuffer
return new MessageBuffer(base, address + offset, length, reference);
if(offset == 0 && length == size())
return this;
else
return new MessageBuffer(base, address + offset, length, reference);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about checking here if the sum of offset + length doesn't exceed size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. If that test fails this code should throw IllegalArgumentException.

}

public byte getByte(int index) {
Expand Down Expand Up @@ -388,6 +391,17 @@ public ByteBuffer toByteBuffer(int index, int length) {
}
}

public ByteBuffer toByteBuffer() {
return toByteBuffer(0, size());
}

public byte[] toByteArray() {
byte[] b = new byte[size()];
unsafe.copyMemory(base, address, b, ARRAY_BYTE_BASE_OFFSET, size());
return b;
}


public void relocate(int offset, int length, int dst) {
unsafe.copyMemory(base, address + offset, base, address+dst, length);
}
Expand Down
13 changes: 11 additions & 2 deletions msgpack-core/src/test/scala/org/msgpack/core/MessagePackSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@ import org.scalatest._
import xerial.core.log.{LogLevel, Logger}
import xerial.core.util.{TimeReport, Timer}
import scala.language.implicitConversions

trait MessagePackSpec extends WordSpec with Matchers with GivenWhenThen with OptionValues with BeforeAndAfter with Benchmark with Logger {
import org.scalatest.prop.PropertyChecks

trait MessagePackSpec
extends WordSpec
with Matchers
with GivenWhenThen
with OptionValues
with BeforeAndAfter
with PropertyChecks
with Benchmark
with Logger {

implicit def toTag(s:String) : Tag = Tag(s)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.math.BigInteger
/**
* Created on 2014/05/07.
*/
class MessagePackTest extends MessagePackSpec with PropertyChecks {
class MessagePackTest extends MessagePackSpec {

"MessagePack" should {
"detect fixint values" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,10 @@ class MessageUnpackerTest extends MessagePackSpec {
t("v7").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax
}

import org.msgpack.`type`.{ValueType=>ValueTypeV6}

"be faster than msgpack-v6 read value" taggedAs("cmp-unpack") in {

import org.msgpack.`type`.{ValueType=>ValueTypeV6}

def readValueV6(unpacker:org.msgpack.unpacker.MessagePackUnpacker) {
val vt = unpacker.getNextType()
vt match {
Expand Down Expand Up @@ -409,6 +408,47 @@ class MessageUnpackerTest extends MessagePackSpec {

}

"be faster for reading binary than v6" taggedAs("cmp-binary") in {

val bos = new ByteArrayOutputStream()
val packer = new MessagePacker(bos)
val L = 10000
val R = 100
(0 until R).foreach { i =>
packer.packBinaryHeader(L)
packer.writePayload(new Array[Byte](L))
}
packer.close()

val b = bos.toByteArray
time("unpackBinary", repeat=100) {
block("v6") {
val v6 = new org.msgpack.MessagePack()
val unpacker = new org.msgpack.unpacker.MessagePackUnpacker(v6, new ByteArrayInputStream(b))
var i = 0
while(i < R) {
val out = unpacker.readByteArray()
i += 1
}
unpacker.close()
}

block("v7") {
//val unpacker = new MessageUnpacker(b)
val unpacker = new MessageUnpacker(new ByteArrayInputStream(b))
var i = 0
while(i < R) {
val len = unpacker.unpackBinaryHeader()
val out = new Array[Byte](len)
unpacker.readPayload(out, 0, len)
i += 1
}
unpacker.close()
}
}

}



}
Expand Down
Loading