Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixes compilation error introduced while rebasing
  • Loading branch information
liancheng committed Aug 1, 2015
commit b37fe7724fa6780ae4c5952d71a4e6b2e10fd959
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
(row: InternalRow, ordinal: Int) =>
recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))

case DecimalType.Fixed(precision, _) =>
makeDecimalWriter(precision)
case DecimalType.Fixed(precision, scale) =>
makeDecimalWriter(precision, scale)

case t: StructType =>
val fieldWriters = t.map(_.dataType).map(makeWriter)
Expand Down Expand Up @@ -185,7 +185,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
}
}

private def makeDecimalWriter(precision: Int): ValueWriter = {
private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = {
assert(
precision <= DecimalType.MAX_PRECISION,
s"Precision overflow: $precision is greater than ${DecimalType.MAX_PRECISION}")
Expand All @@ -194,18 +194,18 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi

val int32Writer =
(row: InternalRow, ordinal: Int) =>
recordConsumer.addInteger(row.getDecimal(ordinal).toUnscaledLong.toInt)
recordConsumer.addInteger(row.getDecimal(ordinal, precision, scale).toUnscaledLong.toInt)

val int64Writer =
(row: InternalRow, ordinal: Int) =>
recordConsumer.addLong(row.getDecimal(ordinal).toUnscaledLong)
recordConsumer.addLong(row.getDecimal(ordinal, precision, scale).toUnscaledLong)

val binaryWriterUsingUnscaledLong =
(row: InternalRow, ordinal: Int) => {
// When the precision is low enough (<= 18) to squeeze the decimal value into a `Long`, we
// can build a fixed-length byte array with length `numBytes` using the unscaled `Long`
// value and the `decimalBuffer` for better performance.
val unscaled = row.getDecimal(ordinal).toUnscaledLong
val unscaled = row.getDecimal(ordinal, precision, scale).toUnscaledLong
var i = 0
var shift = 8 * (numBytes - 1)

Expand All @@ -220,14 +220,15 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi

val binaryWriterUsingUnscaledBytes =
(row: InternalRow, ordinal: Int) => {
val bytes = row.getDecimal(ordinal).toJavaBigDecimal.unscaledValue().toByteArray
val decimal = row.getDecimal(ordinal, precision, scale)
val bytes = decimal.toJavaBigDecimal.unscaledValue().toByteArray
val fixedLengthBytes = if (bytes.length == numBytes) {
// If the length of the underlying byte array of the unscaled `BigInteger` happens to be
// `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`.
bytes
} else {
// Otherwise, the length must be less than `numBytes`. In this case we copy contents of
// the underlying bytes with enough sign bytes to `decimalBuffer` to form the result
// the underlying bytes with padding sign bytes to `decimalBuffer` to form the result
// fixed-length byte array.
val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte)
Expand All @@ -250,7 +251,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi

// All other cases:
// - Standard mode, writes decimals with precision > 18 as FIXED_LEN_BYTE_ARRAY
// - Legacy mode, writes decimals with all precision as FIXED_LEN_BYTE_ARRAY
// - Legacy mode, writes decimals with precision > 18 as FIXED_LEN_BYTE_ARRAY
case _ => binaryWriterUsingUnscaledBytes
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ import org.apache.parquet.schema.MessageType
import org.apache.parquet.{Log => ParquetLog}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
import org.apache.spark.rdd.RDD._
import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.execution.{SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
Expand Down