Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,20 @@ public MessagePacker(MessageBufferOutput out, MessagePack.Config config) {
this.position = 0;
}

public void reset(MessageBufferOutput out) throws IOException {
/**
* Reset output. This method doesn't close the old resource.
* @param out new output
* @return the old resource
*/
public MessageBufferOutput reset(MessageBufferOutput out) throws IOException {
// Validate the argument
MessageBufferOutput newOut = checkNotNull(out, "MessageBufferOutput is null");

try {
if(this.out != newOut) {
this.out.close();
}
}
finally {
// Reset the internal states here for the exception safety
this.out = newOut;
this.position = 0;
}
// Reset the internal states
MessageBufferOutput old = this.out;
this.out = newOut;
this.position = 0;
return old;
}


Expand Down
32 changes: 16 additions & 16 deletions msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,24 @@ public MessageUnpacker(MessageBufferInput in, MessagePack.Config config) {
this.config = checkNotNull(config, "Config");
}

public void reset(MessageBufferInput in) throws IOException {
/**
* Reset input. This method doesn't close the old resource.
* @param in new input
* @return the old resource
*/
public MessageBufferInput reset(MessageBufferInput in) throws IOException {
MessageBufferInput newIn = checkNotNull(in, "MessageBufferInput is null");

try {
if(in != newIn) {
close();
}
}
finally {
// Reset the internal states here for the exception safety
this.in = newIn;
this.buffer = EMPTY_BUFFER;
this.position = 0;
this.totalReadBytes = 0;
this.secondaryBuffer = null;
this.reachedEOF = false;
// No need to initialize the already allocated string decoder here since we can reuse it.
}
// Reset the internal states
MessageBufferInput old = this.in;
this.in = newIn;
this.buffer = EMPTY_BUFFER;
this.position = 0;
this.totalReadBytes = 0;
this.secondaryBuffer = null;
this.reachedEOF = false;
// No need to initialize the already allocated string decoder here since we can reuse it.
return old;
}

public long getTotalReadBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@ public ArrayBufferInput(byte[] arr, int offset, int length) {
this.buffer = MessageBuffer.wrap(checkNotNull(arr, "input array is null")).slice(offset, length);
}

public void reset(MessageBuffer buf) {
/**
* Reset buffer. This method doesn't close the old resource.
* @param buf new buffer
* @return the old resource
*/
public MessageBuffer reset(MessageBuffer buf) {
MessageBuffer old = this.buffer;
this.buffer = buf;
this.isRead = false;
return old;
}

public void reset(byte[] arr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ public ByteBufferInput(ByteBuffer input) {
this.input = checkNotNull(input, "input ByteBuffer is null");
}

public void reset(ByteBuffer input) {
/**
* Reset buffer. This method doesn't close the old resource.
* @param input new buffer
* @return the old resource
*/
public ByteBuffer reset(ByteBuffer input) {
ByteBuffer old = this.input;
this.input = input;
isRead = false;
return old;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ public ChannelBufferInput(ReadableByteChannel channel, int bufferSize) {
this.bufferSize = bufferSize;
}

public void reset(ReadableByteChannel channel) throws IOException {
this.channel.close();
/**
* Reset channel. This method doesn't close the old resource.
* @param channel new channel
* @return the old resource
*/
public ReadableByteChannel reset(ReadableByteChannel channel) throws IOException {
ReadableByteChannel old = this.channel;
this.channel = channel;
this.reachedEOF = false;
return old;
}

@Override
public MessageBuffer next() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ public ChannelBufferOutput(WritableByteChannel channel) {
this.channel = checkNotNull(channel, "output channel is null");
}

public void reset(WritableByteChannel channel) throws IOException {
this.channel.close();
/**
* Reset channel. This method doesn't close the old resource.
* @param channel new channel
* @return the old resource
*/
public WritableByteChannel reset(WritableByteChannel channel) throws IOException {
WritableByteChannel old = this.channel;
this.channel = channel;
return old;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ public InputStreamBufferInput(InputStream in, int bufferSize) {
this.bufferSize = bufferSize;
}

public void reset(InputStream in) throws IOException {
this.in.close();
/**
* Reset Stream. This method doesn't close the old resource.
* @param in new stream
* @return the old resource
*/
public InputStream reset(InputStream in) throws IOException {
InputStream old = this.in;
this.in = in;
reachedEOF = false;
return old;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ public OutputStreamBufferOutput(OutputStream out) {
this.out = checkNotNull(out, "output is null");
}

public void reset(OutputStream out) throws IOException {
this.out.close();
/**
* Reset Stream. This method doesn't close the old resource.
* @param out new stream
* @return the old resource
*/
public OutputStream reset(OutputStream out) throws IOException {
OutputStream old = this.out;
this.out = out;
return old;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
//
package org.msgpack.core

import java.io.ByteArrayOutputStream
import java.io.{FileInputStream, FileOutputStream, File, ByteArrayOutputStream}

import org.msgpack.core.buffer.OutputStreamBufferOutput
import org.msgpack.core.buffer.{ChannelBufferOutput, MessageBufferOutput, OutputStreamBufferOutput}
import xerial.core.io.IOUtil

import scala.util.Random
Expand All @@ -41,6 +41,24 @@ class MessagePackerTest extends MessagePackSpec {
result shouldBe answer
}

def createTempFile = {
val f = File.createTempFile("msgpackTest", "msgpack")
f.deleteOnExit
f
}

def createTempFileWithOutputStream = {
val f = createTempFile
val out = new FileOutputStream(f)
(f, out)
}

def createTempFileWithChannel = {
val (f, out) = createTempFileWithOutputStream
val ch = out.getChannel
(f, ch)
}

"MessagePacker" should {

"reset the internal states" in {
Expand Down Expand Up @@ -130,5 +148,59 @@ class MessagePackerTest extends MessagePackSpec {
case (bufferSize, stringSize) => test(bufferSize, stringSize)
}
}

"reset OutputStreamBufferOutput" in {
val (f0, out0) = createTempFileWithOutputStream
val packer = MessagePack.newDefaultPacker(out0)
packer.packInt(99)
packer.close

val up0 = MessagePack.newDefaultUnpacker(new FileInputStream(f0))
up0.unpackInt shouldBe 99
up0.hasNext shouldBe false
up0.close

val (f1, out1) = createTempFileWithOutputStream
packer.reset(new OutputStreamBufferOutput(out1))
packer.packInt(99)
packer.flush
packer.reset(new OutputStreamBufferOutput(out1))
packer.packString("hello")
packer.close

val up1 = MessagePack.newDefaultUnpacker(new FileInputStream(f1))
up1.unpackInt shouldBe 99
up1.unpackString shouldBe "hello"
up1.hasNext shouldBe false
up1.close
}

"reset ChannelBufferOutput" in {
val (f0, out0) = createTempFileWithChannel
val packer = MessagePack.newDefaultPacker(out0)
packer.packInt(99)
packer.close

val up0 = MessagePack.newDefaultUnpacker(new FileInputStream(f0))
up0.unpackInt shouldBe 99
up0.hasNext shouldBe false
up0.close

val (f1, out1) = createTempFileWithChannel
packer.reset(new ChannelBufferOutput(out1))
packer.packInt(99)
packer.flush
packer.reset(new ChannelBufferOutput(out1))
packer.packString("hello")
packer.close

val up1 = MessagePack.newDefaultUnpacker(new FileInputStream(f1))
up1.unpackInt shouldBe 99
up1.unpackString shouldBe "hello"
up1.hasNext shouldBe false
up1.close
}

}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.msgpack.core

import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream}
import java.io._
import scala.util.Random
import org.msgpack.core.buffer.{MessageBuffer, MessageBufferInput, OutputStreamBufferOutput, ArrayBufferInput}
import org.msgpack.core.buffer._
import org.msgpack.value.ValueType
import xerial.core.io.IOUtil

Expand Down Expand Up @@ -143,6 +143,19 @@ class MessageUnpackerTest extends MessagePackSpec {
}
}

def createTempFile = {
val f = File.createTempFile("msgpackTest", "msgpack")
f.deleteOnExit
val p = MessagePack.newDefaultPacker(new FileOutputStream(f))
p.packInt(99)
p.close
f
}

def checkFile(u:MessageUnpacker) = {
u.unpackInt shouldBe 99
u.hasNext shouldBe false
}

"MessageUnpacker" should {

Expand Down Expand Up @@ -581,5 +594,29 @@ class MessageUnpackerTest extends MessagePackSpec {
// This performance comparition is too close, so we disabled it
// t("reuse-array-input").averageWithoutMinMax should be <= t("no-buffer-reset").averageWithoutMinMax
}

"reset ChannelBufferInput" in {
val f0 = createTempFile
val u = MessagePack.newDefaultUnpacker(new FileInputStream(f0).getChannel)
checkFile(u)

val f1 = createTempFile
val ch = new FileInputStream(f1).getChannel
u.reset(new ChannelBufferInput(ch))
checkFile(u)
u.close
}

"reset InputStreamBufferInput" in {
val f0 = createTempFile
val u = MessagePack.newDefaultUnpacker(new FileInputStream(f0))
checkFile(u)

val f1 = createTempFile
val in = new FileInputStream(f1)
u.reset(new InputStreamBufferInput(in))
checkFile(u)
u.close
}
}
}