Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor ByteBufferInputStream implementations.
This renames the existing implementation to SingleBufferInputStream,
moves the new implementation to MultiBufferInputStream, and adds an
interface that both implement to access slices of the backing arrays.
  • Loading branch information
rdblue committed Feb 20, 2018
commit a4fa05ac52c4b6f059613a7125de8a76a33c7cba
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOExc
int effectiveBitLength = valueCount * bitsPerValue;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitsPerValue);
this.in = new ByteBufferInputStream(in, offset, length);

ByteBuffer buffer = in.duplicate();
in.position(in.position() + offset);
in.limit(in.position() + length);
this.in = ByteBufferInputStream.wrap(buffer);
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
this.nextOffset = offset + length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.parquet.column.values.delta;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.apache.parquet.bytes.ByteBufferInputStream;
Expand All @@ -28,7 +27,6 @@
import org.apache.parquet.column.values.bitpacking.Packer;
import org.apache.parquet.io.ParquetDecodingException;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
Expand Down Expand Up @@ -68,7 +66,9 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*/
@Override
public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
in = new ByteBufferInputStream(page, offset, page.limit() - offset);
ByteBuffer buffer = page.duplicate();
buffer.position(buffer.position() + offset);
in = ByteBufferInputStream.wrap(buffer);
this.config = DeltaBinaryPackingConfig.readConfig(in);
this.page = page;
this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
Expand Down Expand Up @@ -123,7 +123,7 @@ private void checkRead() {
}
}

private void loadNewBlockToBuffer() {
private void loadNewBlockToBuffer() throws IOException {
try {
minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in);
} catch (IOException e) {
Expand Down Expand Up @@ -152,13 +152,13 @@ private void loadNewBlockToBuffer() {
*
* @param packer the packer created from bitwidth of current mini block
*/
private void unpackMiniBlock(BytePackerForLong packer) {
private void unpackMiniBlock(BytePackerForLong packer) throws IOException {
for (int j = 0; j < config.miniBlockSizeInValues; j += 8) {
unpack8Values(packer);
}
}

private void unpack8Values(BytePackerForLong packer) {
private void unpack8Values(BytePackerForLong packer) throws IOException {
//calculate the pos because the packer api uses array not stream
int pos = page.limit() - in.available();
packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public DictionaryValuesReader(Dictionary dictionary) {
@Override
public void initFromPage(int valueCount, ByteBuffer page, int offset)
throws IOException {
this.in = new ByteBufferInputStream(page, offset, page.limit() - offset);
ByteBuffer buffer = page.duplicate();
buffer.position(buffer.position() + offset);
this.in = ByteBufferInputStream.wrap(buffer);
if (page.limit() - offset > 0) {
LOG.debug("init from page at offset {} for length {}", offset, (page.limit() - offset));
int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOExc
}

private ByteBufferInputStream toInputStream(ByteBuffer in, int offset) {
return new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset);
ByteBuffer buffer = in.duplicate();
buffer.position(buffer.position() + offset);
return ByteBufferInputStream.wrap(buffer);
}

public static class DoublePlainValuesReader extends PlainValuesReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bitpacking.BytePacker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public RunLengthBitPackingHybridValuesReader(int bitWidth) {

@Override
public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException {
ByteBufferInputStream in = new ByteBufferInputStream(page, offset, page.limit() - offset);
ByteBuffer buffer = page.duplicate();
buffer.position(buffer.position() + offset);
ByteBufferInputStream in = ByteBufferInputStream.wrap(buffer);
int length = BytesUtils.readIntLittleEndian(in);

decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private void doIntegrationTest(int bitWidth) throws Exception {
numValues += 1000;

ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer();
ByteBufferInputStream in = new ByteBufferInputStream(encodedBytes);
ByteBufferInputStream in = ByteBufferInputStream.wrap(encodedBytes);

RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
import static org.junit.Assert.assertEquals;

import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.junit.Test;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bitpacking.BytePacker;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,82 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this white-space change intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'll revert it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.bytes;

import java.io.IOException;
import java.io.EOFException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;

/**
* This ByteBufferInputStream does not consume the ByteBuffer being passed in,
* but will create a slice of the current buffer.
*/
public class ByteBufferInputStream extends InputStream {

protected ByteBuffer byteBuf;
protected int initPos;
protected int count;
public ByteBufferInputStream(ByteBuffer buffer) {
this(buffer, buffer.position(), buffer.remaining());
}

public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) {
ByteBuffer temp = buffer.duplicate();
temp.position(offset);
byteBuf = temp.slice();
byteBuf.limit(count);
this.initPos = offset;
this.count = count;
}

public ByteBuffer toByteBuffer() {
return byteBuf.slice();
}

@Override
public int read() throws IOException {
if (!byteBuf.hasRemaining()) {
return -1;
public abstract class ByteBufferInputStream extends InputStream {

public static ByteBufferInputStream wrap(ByteBuffer... buffers) {
if (buffers.length == 1) {
return new SingleBufferInputStream(buffers[0]);
} else {
return new MultiBufferInputStream(Arrays.asList(buffers));
}
//Workaround for unsigned byte
return byteBuf.get() & 0xFF;
}

@Override
public int read(byte[] bytes, int offset, int length) throws IOException {
int count = Math.min(byteBuf.remaining(), length);
if (count == 0) return -1;
byteBuf.get(bytes, offset, count);
return count;
}

@Override
public long skip(long n) {
if (n > byteBuf.remaining())
n = byteBuf.remaining();
int pos = byteBuf.position();
byteBuf.position((int)(pos + n));
return n;
public static ByteBufferInputStream wrap(List<ByteBuffer> buffers) {
if (buffers.size() == 1) {
return new SingleBufferInputStream(buffers.get(0));
} else {
return new MultiBufferInputStream(buffers);
}
}

public abstract long position();

public abstract List<ByteBuffer> sliceBuffers(long length) throws EOFException;

@Override
public int available() {
return byteBuf.remaining();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public ByteBuffer toByteBuffer() throws IOException {
* @throws IOException
*/
public InputStream toInputStream() throws IOException {
return new ByteBufferInputStream(toByteBuffer());
return ByteBufferInputStream.wrap(toByteBuffer());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
* under the License.
*/

package org.apache.parquet.hadoop.util;
package org.apache.parquet.bytes;

import org.apache.parquet.Preconditions;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -31,7 +29,7 @@
import java.util.List;
import java.util.NoSuchElementException;

public class ByteBufferInputStream extends InputStream {
class MultiBufferInputStream extends ByteBufferInputStream {
private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);

private final Collection<ByteBuffer> buffers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Collection has no defined element order while in this implementation the order of buffers matters. I think List would be better here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Expand All @@ -45,7 +43,7 @@ public class ByteBufferInputStream extends InputStream {
private long markLimit = 0;
private List<ByteBuffer> markBuffers = new ArrayList<>();

public ByteBufferInputStream(Collection<ByteBuffer> buffers) {
MultiBufferInputStream(Collection<ByteBuffer> buffers) {
this.buffers = buffers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not a good practice to accept a mutable collection as is. I would suggest copying it at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because all of the uses are internal, I think it is safe. I'd rather not create extra lists and copy because I don't think it is likely that the lists passed in here are going to be reused.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


long totalLen = 0;
Expand Down Expand Up @@ -164,7 +162,7 @@ public int read() throws IOException {
while (true) {
if (current.remaining() > 0) {
this.position += 1;
return current.get();
return current.get() & 0xFF; // as unsigned
} else if (!nextBuffer()) {
// there are no more buffers
throw new EOFException();
Expand Down Expand Up @@ -204,7 +202,7 @@ public synchronized void reset() throws IOException {
discardMark();
nextBuffer(); // go back to the marked buffers
} else {
throw new RuntimeException("No mark defined");
throw new IOException("No mark defined");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message might mislead the user. Reaching markLimit should also be mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

}
}

Expand Down
Loading