Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
PARQUET-787: Update byte buffer input streams for review comments.
  • Loading branch information
rdblue committed Feb 21, 2018
commit 4abba3e7ac146a4d2444b44954538c6fe861c477
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -32,7 +31,7 @@
class MultiBufferInputStream extends ByteBufferInputStream {
private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);

private final Collection<ByteBuffer> buffers;
private final List<ByteBuffer> buffers;
private final long length;

private Iterator<ByteBuffer> iterator;
Expand All @@ -43,7 +42,7 @@ class MultiBufferInputStream extends ByteBufferInputStream {
private long markLimit = 0;
private List<ByteBuffer> markBuffers = new ArrayList<>();

MultiBufferInputStream(Collection<ByteBuffer> buffers) {
MultiBufferInputStream(List<ByteBuffer> buffers) {
this.buffers = buffers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not a good practice to accept a mutable collection as is. I would suggest copying it at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because all of the uses are internal, I think it is safe. I'd rather not create extra lists and copy because I don't think it is likely that the lists passed in here are going to be reused.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


long totalLen = 0;
Expand Down Expand Up @@ -171,10 +170,11 @@ public List<ByteBuffer> sliceBuffers(long len) throws EOFException {
}

List<ByteBuffer> buffers = new ArrayList<>();
int bytesAccumulated = 0;
long bytesAccumulated = 0;
while (bytesAccumulated < len) {
if (current.remaining() > 0) {
// get a slice of the current buffer to return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we could avoid slicing ByteBuffers that fit entirely in the requested range, but since it's a cheap operation, I'm not sure it is worth the effort of explicitly handling them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to duplicate the buffer either way, so we'd only be able to avoid setting the position and limit, which doesn't seem worth it to me.

// always fits in an int because remaining returns an int that is >= 0
int bufLen = (int) Math.min(len - bytesAccumulated, current.remaining());
ByteBuffer slice = current.duplicate();
slice.limit(slice.position() + bufLen);
Expand Down Expand Up @@ -211,6 +211,9 @@ public List<ByteBuffer> remainingBuffers() {
@Override
public int read(byte[] bytes, int off, int len) {
if (len <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might swallow an error here in case of len is negative. It might worth checking and throwing an IndexOutOfBoundsException if required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if (len < 0) {
throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len);
}
return 0;
}

Expand Down Expand Up @@ -267,7 +270,7 @@ public int available() {
}

@Override
public synchronized void mark(int readlimit) {
public void mark(int readlimit) {
if (mark >= 0) {
discardMark();
}
Expand All @@ -279,7 +282,7 @@ public synchronized void mark(int readlimit) {
}

@Override
public synchronized void reset() throws IOException {
public void reset() throws IOException {
if (mark >= 0 && position < markLimit) {
this.position = mark;
// replace the current iterator with one that adds back the buffers that
Expand All @@ -288,11 +291,11 @@ public synchronized void reset() throws IOException {
discardMark();
nextBuffer(); // go back to the marked buffers
} else {
throw new IOException("No mark defined");
throw new IOException("No mark defined or has read past the previous mark limit");
}
}

private synchronized void discardMark() {
private void discardMark() {
this.mark = -1;
this.markLimit = 0;
markBuffers = new ArrayList<>();
Expand Down Expand Up @@ -353,13 +356,18 @@ public boolean hasNext() {

@Override
public E next() {
if (!hasNext()) {
if (useFirst && !first.hasNext()) {
useFirst = false;
}

if (!useFirst && !second.hasNext()) {
throw new NoSuchElementException();
}

if (useFirst) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An Iterator shall not rely on that hasNext() is invoked every time before next(). As a private implementation it is good enough however it might worth a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, good catch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the logic to conform to the contract.

return first.next();
}

return second.next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ public List<ByteBuffer> remainingBuffers() {
}

@Override
public synchronized void mark(int readlimit) {
public void mark(int readlimit) {
this.mark = buffer.position();
}

@Override
public synchronized void reset() throws IOException {
public void reset() throws IOException {
if (mark >= 0) {
buffer.position(mark);
this.mark = -1;
Expand Down