Skip to content

Commit f53e5ae

Browse files
committed
Merge pull request #104 from msgpack/v07-unpack-binary
Optimize unpackBinary
2 parents a2adc90 + ed0f7cc commit f53e5ae

File tree

11 files changed

+326
-33
lines changed

11 files changed

+326
-33
lines changed

msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public MessageUnpacker(byte[] arr) {
117117
* @param in
118118
*/
119119
public MessageUnpacker(InputStream in) {
120-
this(new InputStreamBufferInput(in));
120+
this(InputStreamBufferInput.newBufferInput(in));
121121
}
122122

123123
/**
@@ -992,6 +992,10 @@ public void readPayload(ByteBuffer dst) throws IOException {
992992
}
993993
}
994994

995+
public void readPayload(byte[] dst) throws IOException {
996+
readPayload(dst, 0, dst.length);
997+
}
998+
995999
/**
9961000
* Read up to len bytes of data into the destination array
9971001
*

msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,21 @@ public class ArrayBufferInput implements MessageBufferInput {
1212
private boolean isRead = false;
1313

1414
public ArrayBufferInput(byte[] arr) {
15-
this.buffer = MessageBuffer.wrap(checkNotNull(arr, "input array is null"));
15+
this(arr, 0, arr.length);
1616
}
1717

18+
public ArrayBufferInput(byte[] arr, int offset, int length) {
19+
checkArgument(offset + length <= arr.length);
20+
this.buffer = MessageBuffer.wrap(checkNotNull(arr, "input array is null")).slice(offset, length);
21+
}
22+
23+
1824
@Override
1925
public MessageBuffer next() throws IOException {
20-
if(isRead) {
26+
if(isRead)
2127
return null;
22-
} else {
23-
isRead = true;
24-
return buffer;
25-
}
28+
isRead = true;
29+
return buffer;
2630
}
2731

2832
@Override
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.msgpack.core.buffer;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
import static org.msgpack.core.Preconditions.*;
6+
7+
/**
8+
* {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer}
9+
*/
10+
public class ByteBufferInput implements MessageBufferInput {
11+
12+
private final ByteBuffer input;
13+
private boolean isRead = false;
14+
15+
public ByteBufferInput(ByteBuffer input) {
16+
this.input = checkNotNull(input, "input ByteBuffer is null");
17+
}
18+
19+
20+
@Override
21+
public MessageBuffer next() throws IOException {
22+
if(isRead)
23+
return null;
24+
25+
isRead = true;
26+
return MessageBuffer.wrap(input);
27+
}
28+
29+
30+
@Override
31+
public void close() throws IOException {
32+
// Nothing to do
33+
}
34+
}

msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,43 @@
44
import java.nio.ByteBuffer;
55
import java.nio.channels.ReadableByteChannel;
66

7-
import static org.msgpack.core.Preconditions.checkNotNull;
7+
import static org.msgpack.core.Preconditions.*;
88

99
/**
1010
* {@link MessageBufferInput} adapter for {@link java.nio.channels.ReadableByteChannel}
1111
*/
1212
public class ChannelBufferInput implements MessageBufferInput {
1313

1414
private final ReadableByteChannel channel;
15+
private boolean reachedEOF = false;
16+
private final int bufferSize;
1517

1618
public ChannelBufferInput(ReadableByteChannel channel) {
19+
this(channel, 8192);
20+
}
21+
22+
public ChannelBufferInput(ReadableByteChannel channel, int bufferSize) {
1723
this.channel = checkNotNull(channel, "input channel is null");
24+
checkArgument(bufferSize > 0, "buffer size must be > 0: " + bufferSize);
25+
this.bufferSize = bufferSize;
1826
}
1927

2028
@Override
2129
public MessageBuffer next() throws IOException {
22-
MessageBuffer m = MessageBuffer.newBuffer(8192);
23-
ByteBuffer b = m.toByteBuffer(0, m.size);
24-
for(int ret = 0; (ret = channel.read(b)) != -1; ) {
30+
31+
if(reachedEOF)
32+
return null;
33+
34+
MessageBuffer m = MessageBuffer.newBuffer(bufferSize);
35+
ByteBuffer b = m.toByteBuffer();
36+
while(!reachedEOF && b.remaining() > 0) {
37+
int ret = channel.read(b);
38+
if(ret == -1) {
39+
reachedEOF = true;
40+
}
2541
}
2642
b.flip();
27-
if(b.remaining() < m.size)
28-
return m.slice(0, b.remaining());
29-
else
30-
return m;
43+
return b.remaining() == 0 ? null : m.slice(0, b.limit());
3144
}
3245

3346
@Override
Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package org.msgpack.core.buffer;
22

3+
import java.io.ByteArrayInputStream;
4+
import java.io.FileInputStream;
35
import java.io.IOException;
46
import java.io.InputStream;
7+
import java.lang.reflect.Field;
58

69
import static org.msgpack.core.Preconditions.checkNotNull;
710

@@ -10,29 +13,82 @@
1013
*/
1114
public class InputStreamBufferInput implements MessageBufferInput {
1215

16+
private static Field bufField;
17+
private static Field bufPosField;
18+
private static Field bufCountField;
19+
20+
private static Field getField(String name) {
21+
Field f = null;
22+
try {
23+
f = ByteArrayInputStream.class.getDeclaredField(name);
24+
f.setAccessible(true);
25+
}
26+
catch(Exception e) {
27+
e.printStackTrace();
28+
}
29+
return f;
30+
}
31+
32+
static {
33+
bufField = getField("buf");
34+
bufPosField = getField("pos");
35+
bufCountField = getField("count");
36+
}
37+
1338
private final InputStream in;
14-
private byte[] buffer = new byte[8192];
39+
private final int bufferSize;
40+
private boolean reachedEOF = false;
41+
42+
public static MessageBufferInput newBufferInput(InputStream in) {
43+
checkNotNull(in, "InputStream is null");
44+
if(in.getClass() == ByteArrayInputStream.class) {
45+
ByteArrayInputStream b = (ByteArrayInputStream) in;
46+
try {
47+
// Extract a raw byte array from the ByteArrayInputStream
48+
byte[] buf = (byte[]) bufField.get(b);
49+
int pos = (Integer) bufPosField.get(b);
50+
int length = (Integer) bufCountField.get(b);
51+
return new ArrayBufferInput(buf, pos, length);
52+
}
53+
catch(Exception e) {
54+
// Failed to retrieve the raw byte array
55+
}
56+
} else if (in instanceof FileInputStream) {
57+
return new ChannelBufferInput(((FileInputStream) in).getChannel());
58+
}
59+
60+
return new InputStreamBufferInput(in);
61+
}
1562

1663
public InputStreamBufferInput(InputStream in) {
64+
this(in, 8192);
65+
}
66+
67+
public InputStreamBufferInput(InputStream in, int bufferSize) {
1768
this.in = checkNotNull(in, "input is null");
69+
this.bufferSize = bufferSize;
1870
}
1971

2072
@Override
2173
public MessageBuffer next() throws IOException {
22-
// Manage the allocated buffers
23-
MessageBuffer m = MessageBuffer.newBuffer(buffer.length);
74+
if(reachedEOF)
75+
return null;
2476

25-
// TODO reduce the number of memory copy
77+
byte[] buffer = null;
2678
int cursor = 0;
27-
while(cursor < buffer.length) {
28-
int readLen = in.read(buffer, cursor, buffer.length - cursor);
79+
while(!reachedEOF && cursor < bufferSize) {
80+
if(buffer == null)
81+
buffer = new byte[bufferSize];
82+
83+
int readLen = in.read(buffer, cursor, bufferSize - cursor);
2984
if(readLen == -1) {
85+
reachedEOF = true;
3086
break;
3187
}
3288
cursor += readLen;
3389
}
34-
m.putBytes(0, buffer, 0, cursor);
35-
return m;
90+
91+
return buffer == null ? null : MessageBuffer.wrap(buffer).slice(0, cursor);
3692
}
3793

3894
@Override
@@ -41,7 +97,7 @@ public void close() throws IOException {
4197
in.close();
4298
}
4399
finally {
44-
buffer = null;
100+
45101
}
46102
}
47103
}

msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ public class MessageBuffer {
114114
* Reference is used to hold a reference to an object that holds the underlying memory so that it cannot be
115115
* released by the garbage collector.
116116
*/
117-
private final ByteBuffer reference;
117+
protected final ByteBuffer reference;
118118

119119
// TODO life-time managment of this buffer
120-
private AtomicInteger referenceCounter;
120+
//private AtomicInteger referenceCounter;
121121

122122

123123
static MessageBuffer newOffHeapBuffer(int length) {
@@ -243,7 +243,7 @@ else if(bb.hasArray()) {
243243
this.reference = null;
244244
}
245245

246-
private MessageBuffer(Object base, long address, int length, ByteBuffer reference) {
246+
protected MessageBuffer(Object base, long address, int length, ByteBuffer reference) {
247247
this.base = base;
248248
this.address = address;
249249
this.size = length;
@@ -259,7 +259,12 @@ private MessageBuffer(Object base, long address, int length, ByteBuffer referenc
259259

260260
public MessageBuffer slice(int offset, int length) {
261261
// TODO ensure deleting this slice does not collapse this MessageBuffer
262-
return new MessageBuffer(base, address + offset, length, reference);
262+
if(offset == 0 && length == size())
263+
return this;
264+
else {
265+
checkArgument(offset + length <= size());
266+
return new MessageBuffer(base, address + offset, length, reference);
267+
}
263268
}
264269

265270
public byte getByte(int index) {
@@ -388,6 +393,17 @@ public ByteBuffer toByteBuffer(int index, int length) {
388393
}
389394
}
390395

396+
public ByteBuffer toByteBuffer() {
397+
return toByteBuffer(0, size());
398+
}
399+
400+
public byte[] toByteArray() {
401+
byte[] b = new byte[size()];
402+
unsafe.copyMemory(base, address, b, ARRAY_BYTE_BASE_OFFSET, size());
403+
return b;
404+
}
405+
406+
391407
public void relocate(int offset, int length, int dst) {
392408
unsafe.copyMemory(base, address + offset, base, address+dst, length);
393409
}

msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.nio.ByteBuffer;
44

5+
import static org.msgpack.core.Preconditions.checkArgument;
6+
57
/**
68
* MessageBufferBE is a {@link MessageBuffer} implementation tailored to big-endian machines.
79
* The specification of Message Pack demands writing short/int/float/long/double values in the big-endian format.
@@ -13,6 +15,20 @@ public class MessageBufferBE extends MessageBuffer {
1315
super(bb);
1416
}
1517

18+
private MessageBufferBE(Object base, long address, int length, ByteBuffer reference) {
19+
super(base, address, length, reference);
20+
}
21+
22+
@Override
23+
public MessageBufferBE slice(int offset, int length) {
24+
if(offset == 0 && length == size())
25+
return this;
26+
else {
27+
checkArgument(offset + length <= size());
28+
return new MessageBufferBE(base, address + offset, length, reference);
29+
}
30+
}
31+
1632
@Override
1733
public short getShort(int index) {
1834
return unsafe.getShort(base, address + index);

msgpack-core/src/test/scala/org/msgpack/core/MessagePackSpec.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,17 @@ import org.scalatest._
44
import xerial.core.log.{LogLevel, Logger}
55
import xerial.core.util.{TimeReport, Timer}
66
import scala.language.implicitConversions
7-
8-
trait MessagePackSpec extends WordSpec with Matchers with GivenWhenThen with OptionValues with BeforeAndAfter with Benchmark with Logger {
7+
import org.scalatest.prop.PropertyChecks
8+
9+
trait MessagePackSpec
10+
extends WordSpec
11+
with Matchers
12+
with GivenWhenThen
13+
with OptionValues
14+
with BeforeAndAfter
15+
with PropertyChecks
16+
with Benchmark
17+
with Logger {
918

1019
implicit def toTag(s:String) : Tag = Tag(s)
1120

msgpack-core/src/test/scala/org/msgpack/core/MessagePackTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import java.nio.CharBuffer
1111
/**
1212
* Created on 2014/05/07.
1313
*/
14-
class MessagePackTest extends MessagePackSpec with PropertyChecks {
14+
class MessagePackTest extends MessagePackSpec {
1515

1616
def isValidUTF8(s: String) = {
1717
MessagePack.UTF8.newEncoder().canEncode(s)

0 commit comments

Comments
 (0)