Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fixes errors introduced while rebasing
  • Loading branch information
liancheng committed Jul 27, 2015
commit a543d102eeb54b1005cdc06ba74cae05723a14e2
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,6 @@ private[parquet] class CatalystSchemaConverter(
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
.named(field.name)

case dec @ DecimalType.Unlimited if !followParquetFormatSpec =>
throw new AnalysisException(
s"Data type $dec is not supported. Decimal precision must be specified.")

// =====================================
// Decimals (follow Parquet format spec)
// =====================================
Expand Down Expand Up @@ -437,10 +433,6 @@ private[parquet] class CatalystSchemaConverter(
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
.named(field.name)

case dec @ DecimalType.Unlimited if followParquetFormatSpec =>
throw new AnalysisException(
s"Data type $dec is not supported. Decimal precision must be specified.")

// ===================================================
// ArrayType and MapType (for Spark versions <= 1.4.x)
// ===================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
Binary.fromByteArray(value.asInstanceOf[Array[Byte]]))
case DecimalType.Fixed(precision, _) =>
writeDecimal(value.asInstanceOf[Decimal], precision)
case d @ DecimalType.Unlimited =>
sys.error(s"Unsupported data type $d, cannot write to consumer")
case _ => sys.error(s"Do not know how to writer $schema to consumer")
}
}
Expand Down Expand Up @@ -199,7 +197,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
writer.endGroup()
}

// Scratch array used to write decimals as fixed-length binary
// Scratch array used to write decimals as fixed-length byte array
private[this] var reusableDecimalBytes = new Array[Byte](16)

private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = {
Expand All @@ -223,7 +221,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo

case bytes if bytes.length <= reusableDecimalBytes.length =>
val signedByte = (if (bytes.head < 0) -1 else 0).toByte
util.Arrays.fill(reusableDecimalBytes, 0, numBytes - bytes.length, signedByte)
java.util.Arrays.fill(reusableDecimalBytes, 0, numBytes - bytes.length, signedByte)
System.arraycopy(bytes, 0, reusableDecimalBytes, numBytes - bytes.length, bytes.length)
Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)

Expand All @@ -241,6 +239,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo

writer.addBinary(binary)
}

// array used to write Timestamp as Int96 (fixed-length binary)
private[this] val int96buf = new Array[Byte](12)

Expand Down Expand Up @@ -290,36 +289,12 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case TimestampType => writeTimestamp(record.getLong(index))
case FloatType => writer.addFloat(record.getFloat(index))
case DoubleType => writer.addDouble(record.getDouble(index))
<<<<<<< HEAD
case StringType =>
writer.addBinary(Binary.fromByteArray(record.getUTF8String(index).getBytes))
case BinaryType =>
writer.addBinary(Binary.fromByteArray(record.getBinary(index)))
case d: DecimalType =>
if (d.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
}
writeDecimal(record.getDecimal(index), d.precision)
||||||| merged common ancestors
case StringType => writer.addBinary(
Binary.fromByteArray(record(index).asInstanceOf[UTF8String].getBytes))
case BinaryType => writer.addBinary(
Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]]))
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
}
writeDecimal(record(index).asInstanceOf[Decimal], d.precisionInfo.get.precision)
=======
case StringType => writer.addBinary(
Binary.fromByteArray(record(index).asInstanceOf[UTF8String].getBytes))
case BinaryType => writer.addBinary(
Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]]))
case DecimalType.Fixed(precision, _) =>
writeDecimal(record(index).asInstanceOf[Decimal], precision)
case d @ DecimalType.Unlimited =>
sys.error(s"Unsupported data type $d, cannot write to consumer")
>>>>>>> Supports decimals with precision > 18 for Parquet
writeDecimal(record.getDecimal(index), precision)
case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
checkAnswer(sqlContext.read.parquet(dir.getCanonicalPath), data.collect().toSeq)
}
}

// Unlimited-length decimals are not yet supported
intercept[Throwable] {
withTempPath { dir =>
makeDecimalRDD(DecimalType.Unlimited).write.parquet(dir.getCanonicalPath)
sqlContext.read.parquet(dir.getCanonicalPath).collect()
}
}
}

test("date type") {
Expand Down