Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-10113][SQL] Support for unsigned Parquet logical types
  • Loading branch information
HyukjinKwon committed Nov 12, 2015
commit 49e162480e33a0379e1b04979666533e66e33f6e
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ private[parquet] class CatalystSchemaConverter(
def typeString =
if (originalType == null) s"$typeName" else s"$typeName ($originalType)"

def typeNotSupported() =
throw new AnalysisException(s"Parquet type not supported: $typeString")

def typeNotImplemented() =
throw new AnalysisException(s"Parquet type not yet supported: $typeString")

Expand Down Expand Up @@ -142,6 +145,9 @@ private[parquet] class CatalystSchemaConverter(
case INT_32 | null => IntegerType
case DATE => DateType
case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT32)
case UINT_8 => typeNotSupported()
case UINT_16 => typeNotSupported()
case UINT_32 => typeNotSupported()
case TIME_MILLIS => typeNotImplemented()
case _ => illegalType()
}
Expand All @@ -150,6 +156,7 @@ private[parquet] class CatalystSchemaConverter(
originalType match {
case INT_64 | null => LongType
case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT64)
case UINT_64 => typeNotSupported()
case TIMESTAMP_MILLIS => typeNotImplemented()
case _ => illegalType()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

test("SPARK-10113 Support for unsigned Parquet logical types") {
val parquetSchema = MessageTypeParser.parseMessageType(
"""message root {
| required int32 c(UINT_32);
|}
""".stripMargin)

withTempPath { location =>
val fileMetadata = new FileMetaData(parquetSchema, Map.empty[String, String].asJava ,"Spark")
val path = new Path(location.getCanonicalPath)
val footer = List(
new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))
).asJava

ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer)

val errorMessage = intercept[Throwable] {
sqlContext.read.parquet(path.toString).printSchema()
}.toString
assert(errorMessage.contains("Parquet type not supported"))
}
}

test("compression codec") {
def compressionCodecFor(path: String, codecName: String): String = {
val codecs = for {
Expand Down