Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ object Decimal {
val ROUND_CEILING = BigDecimal.RoundingMode.CEILING
val ROUND_FLOOR = BigDecimal.RoundingMode.FLOOR

/** Maximum number of decimal digits a Int can represent */
val MAX_INT_DIGITS = 9

/** Maximum number of decimal digits a Long can represent */
val MAX_LONG_DIGITS = 18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,17 @@ object DecimalType extends AbstractDataType {
}
}

/**
* Returns if dt is a DecimalType that fits inside a int
*/
def is32BitDecimalType(dt: DataType): Boolean = {
dt match {
case t: DecimalType =>
t.precision <= Decimal.MAX_INT_DIGITS
case _ => false
}
}

/**
* Returns if dt is a DecimalType that fits inside a long
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ private void initializeInternal() throws IOException {
throw new IOException("Unsupported type: " + t);
}
if (originalTypes[i] == OriginalType.DECIMAL &&
primitiveType.getDecimalMetadata().getPrecision() >
CatalystSchemaConverter.MAX_PRECISION_FOR_INT64()) {
primitiveType.getDecimalMetadata().getPrecision() > Decimal.MAX_LONG_DIGITS()) {
throw new IOException("Decimal with high precision is not supported.");
}
if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
Expand Down Expand Up @@ -439,7 +438,7 @@ private void decodeFixedLenArrayAsDecimalBatch(int col, int num) throws IOExcept
PrimitiveType type = requestedSchema.getFields().get(col).asPrimitiveType();
int precision = type.getDecimalMetadata().getPrecision();
int scale = type.getDecimalMetadata().getScale();
Preconditions.checkState(precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64(),
Preconditions.checkState(precision <= Decimal.MAX_LONG_DIGITS(),
"Unsupported precision.");

for (int n = 0; n < num; ++n) {
Expand Down Expand Up @@ -611,6 +610,11 @@ private boolean next() throws IOException {
*/
private void readBatch(int total, ColumnVector column) throws IOException {
int rowId = 0;
if (useDictionary) {
dictionaryIds = column.reserveDictionaryIds(total);
} else {
column.setDictionary(null);
}
while (total > 0) {
// Compute the number of values we want to read in this page.
int leftInPage = (int)(endOfPageValueCount - valuesRead);
Expand All @@ -620,13 +624,6 @@ private void readBatch(int total, ColumnVector column) throws IOException {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove dictionaryIds from this class.

int num = Math.min(total, leftInPage);
if (useDictionary) {
// Data is dictionary encoded. We will vector decode the ids and then resolve the values.
if (dictionaryIds == null) {
dictionaryIds = ColumnVector.allocate(total, DataTypes.IntegerType, MemoryMode.ON_HEAP);
} else {
dictionaryIds.reset();
dictionaryIds.reserve(total);
}
// Read and decode dictionary ids.
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
Expand Down Expand Up @@ -672,21 +669,13 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column) {
switch (descriptor.getType()) {
case INT32:
if (column.dataType() == DataTypes.IntegerType) {
for (int i = rowId; i < rowId + num; ++i) {
column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
column.setDictionary(dictionary);
} else if (column.dataType() == DataTypes.ByteType) {
for (int i = rowId; i < rowId + num; ++i) {
column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
column.setDictionary(dictionary);
} else if (column.dataType() == DataTypes.ShortType) {
for (int i = rowId; i < rowId + num; ++i) {
column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
column.putLong(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
column.setDictionary(dictionary);
} else if (DecimalType.is32BitDecimalType(column.dataType())) {
column.setDictionary(dictionary);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
Expand All @@ -695,28 +684,28 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column) {
case INT64:
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
}
column.setDictionary(dictionary);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
break;

case FLOAT:
for (int i = rowId; i < rowId + num; ++i) {
column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i)));
}
column.setDictionary(dictionary);
break;

case DOUBLE:
for (int i = rowId; i < rowId + num; ++i) {
column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i)));
}
column.setDictionary(dictionary);
break;

case FIXED_LEN_BYTE_ARRAY:
if (DecimalType.is64BitDecimalType(column.dataType())) {
// DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putInt(i,(int) CatalystRowConverter.binaryToUnscaledLong(v));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing space after ,

}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
Expand All @@ -727,14 +716,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column) {
break;

case BINARY:
// TODO: this is incredibly inefficient as it blows up the dictionary right here. We
// need to do this better. We should probably add the dictionary data to the ColumnVector
// and reuse it across batches. This should mean adding a ByteArray would just update
// the length and offset.
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putByteArray(i, v.getBytes());
}
column.setDictionary(dictionary);
break;

default:
Expand All @@ -756,15 +738,13 @@ private void readBooleanBatch(int rowId, int num, ColumnVector column) throws IO
private void readIntBatch(int rowId, int num, ColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType) {
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
DecimalType.is32BitDecimalType(column.dataType())) {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.ByteType) {
defColumn.readBytes(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readIntsAsLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.ShortType) {
defColumn.readShorts(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
Expand Down Expand Up @@ -822,7 +802,16 @@ private void readFixedLenByteArrayBatch(int rowId, int num,
VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (DecimalType.is64BitDecimalType(column.dataType())) {
if (DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putInt(rowId + i,
(int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
} else {
column.putNull(rowId + i);
}
}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.execution.vectorized.ColumnVector;

/**
Expand Down Expand Up @@ -239,38 +238,6 @@ public void readBooleans(int total, ColumnVector c,
}
}

public void readIntsAsLongs(int total, ColumnVector c,
int rowId, int level, VectorizedValuesReader data) {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == level) {
for (int i = 0; i < n; i++) {
c.putLong(rowId + i, data.readInteger());
}
} else {
c.putNulls(rowId, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i, data.readInteger());
} else {
c.putNull(rowId + i);
}
}
break;
}
rowId += n;
left -= n;
currentCount -= n;
}
}

public void readBytes(int total, ColumnVector c,
int rowId, int level, VectorizedValuesReader data) {
int left = total;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import java.math.BigDecimal;
import java.math.BigInteger;

import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
Expand All @@ -27,8 +31,6 @@
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

import org.apache.commons.lang.NotImplementedException;

/**
* This class represents a column of values and provides the main APIs to access the data
* values. It supports all the types and contains get/put APIs as well as their batched versions.
Expand Down Expand Up @@ -157,7 +159,7 @@ public Object[] array() {
} else if (dt instanceof StringType) {
for (int i = 0; i < length; i++) {
if (!data.getIsNull(offset + i)) {
list[i] = ColumnVectorUtils.toString(data.getByteArray(offset + i));
list[i] = getUTF8String(i).toString();
}
}
} else if (dt instanceof CalendarIntervalType) {
Expand Down Expand Up @@ -204,28 +206,17 @@ public float getFloat(int ordinal) {

@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
if (precision <= Decimal.MAX_LONG_DIGITS()) {
return Decimal.apply(getLong(ordinal), precision, scale);
} else {
byte[] bytes = getBinary(ordinal);
BigInteger bigInteger = new BigInteger(bytes);
BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
return Decimal.apply(javaDecimal, precision, scale);
}
return data.getDecimal(offset + ordinal, precision, scale);
}

@Override
public UTF8String getUTF8String(int ordinal) {
Array child = data.getByteArray(offset + ordinal);
return UTF8String.fromBytes(child.byteArray, child.byteArrayOffset, child.length);
return data.getUTF8String(offset + ordinal);
}

@Override
public byte[] getBinary(int ordinal) {
ColumnVector.Array array = data.getByteArray(offset + ordinal);
byte[] bytes = new byte[array.length];
System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
return bytes;
return data.getBinary(offset + ordinal);
}

@Override
Expand Down Expand Up @@ -534,12 +525,57 @@ public final int putByteArray(int rowId, byte[] value) {
/**
* Returns the value for rowId.
*/
public final Array getByteArray(int rowId) {
private Array getByteArray(int rowId) {
Array array = getArray(rowId);
array.data.loadBytes(array);
return array;
}

/**
* Returns the decimal for rowId.
*/
public final Decimal getDecimal(int rowId, int precision, int scale) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
return Decimal.apply(getInt(rowId), precision, scale);
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
return Decimal.apply(getLong(rowId), precision, scale);
} else {
// TODO: best perf?
byte[] bytes = getBinary(rowId);
BigInteger bigInteger = new BigInteger(bytes);
BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
return Decimal.apply(javaDecimal, precision, scale);
}
}

/**
* Returns the UTF8String for rowId.
*/
public final UTF8String getUTF8String(int rowId) {
if (dictionary == null) {
ColumnVector.Array a = getByteArray(rowId);
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
} else {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(rowId));
return UTF8String.fromBytes(v.getBytes());
}
}

/**
* Returns the byte array for rowId.
*/
public final byte[] getBinary(int rowId) {
if (dictionary == null) {
ColumnVector.Array array = getByteArray(rowId);
byte[] bytes = new byte[array.length];
System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
return bytes;
} else {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(rowId));
return v.getBytes();
}
}

/**
* Append APIs. These APIs all behave similarly and will append data to the current vector. It
* is not valid to mix the put and append APIs. The append APIs are slower and should only be
Expand Down Expand Up @@ -816,6 +852,39 @@ public final int appendStruct(boolean isNull) {
*/
protected final ColumnarBatch.Row resultStruct;

/**
* The Dictionary for this column.
*
* If it's not null, will be used to decode the value in getXXX().
*/
protected Dictionary dictionary;

/**
* Reusable column for ids of dictionary.
*/
protected ColumnVector dictionaryIds;

/**
* Update the dictionary.
*/
public void setDictionary(Dictionary dictionary) {
this.dictionary = dictionary;
}

/**
* Reserve a integer column for ids of dictionary.
*/
public ColumnVector reserveDictionaryIds(int capacity) {
if (dictionaryIds == null) {
dictionaryIds = allocate(capacity, DataTypes.IntegerType,
this instanceof OnHeapColumnVector ? MemoryMode.ON_HEAP : MemoryMode.OFF_HEAP);
} else {
dictionaryIds.reset();
dictionaryIds.reserve(capacity);
}
return dictionaryIds;
}

/**
* Sets up the common state and also handles creating the child columns if this is a nested
* type.
Expand Down
Loading