Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-23972: Fix test failures and review comments.
  • Loading branch information
rdblue committed May 7, 2018
commit 93f2a9274e8a0c13478f03427df7b7fad1c2180b
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) {

if (buffer.hasArray()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we assert buffer.hasArray() is always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there is no guarantee that the buffer from Parquet is on the heap.

int offset = buffer.arrayOffset() + buffer.position();
c.putIntsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET);
c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
} else {
for (int i = 0; i < total; i += 1) {
c.putInt(rowId + i, buffer.getInt());
Expand All @@ -89,7 +89,7 @@ public final void readLongs(int total, WritableColumnVector c, int rowId) {

if (buffer.hasArray()) {
int offset = buffer.arrayOffset() + buffer.position();
c.putLongsLittleEndian(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET);
c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
} else {
for (int i = 0; i < total; i += 1) {
c.putLong(rowId + i, buffer.getLong());
Expand All @@ -104,7 +104,7 @@ public final void readFloats(int total, WritableColumnVector c, int rowId) {

if (buffer.hasArray()) {
int offset = buffer.arrayOffset() + buffer.position();
c.putFloats(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET);
c.putFloats(rowId, total, buffer.array(), offset);
} else {
for (int i = 0; i < total; i += 1) {
c.putFloat(rowId + i, buffer.getFloat());
Expand All @@ -119,24 +119,18 @@ public final void readDoubles(int total, WritableColumnVector c, int rowId) {

if (buffer.hasArray()) {
int offset = buffer.arrayOffset() + buffer.position();
c.putDoubles(rowId, total, buffer.array(), offset - Platform.BYTE_ARRAY_OFFSET);
c.putDoubles(rowId, total, buffer.array(), offset);
} else {
for (int i = 0; i < total; i += 1) {
c.putDouble(rowId + i, buffer.getDouble());
}
}
}

private byte getByte() {
try {
return (byte) in.read();
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read a byte", e);
}
}

@Override
public final void readBytes(int total, WritableColumnVector c, int rowId) {
// Bytes are stored as a 4-byte little endian int. Just read the first byte.
// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);

Expand All @@ -151,7 +145,11 @@ public final void readBytes(int total, WritableColumnVector c, int rowId) {
public final boolean readBoolean() {
// TODO: vectorize decoding and keep boolean[] instead of currentByte
if (bitOffset == 0) {
currentByte = getByte();
try {
currentByte = (byte) in.read();
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read a byte", e);
}
}

boolean v = (currentByte & (1 << bitOffset)) != 0;
Expand Down