From 49e162480e33a0379e1b04979666533e66e33f6e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 12 Nov 2015 10:00:56 +0900 Subject: [PATCH 1/2] [SPARK-10113][SQL] Support for unsigned Parquet logical types --- .../parquet/CatalystSchemaConverter.scala | 7 ++++++ .../datasources/parquet/ParquetIOSuite.scala | 23 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 7f3394c20ed3..f28a18e2756e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -108,6 +108,9 @@ private[parquet] class CatalystSchemaConverter( def typeString = if (originalType == null) s"$typeName" else s"$typeName ($originalType)" + def typeNotSupported() = + throw new AnalysisException(s"Parquet type not supported: $typeString") + def typeNotImplemented() = throw new AnalysisException(s"Parquet type not yet supported: $typeString") @@ -142,6 +145,9 @@ private[parquet] class CatalystSchemaConverter( case INT_32 | null => IntegerType case DATE => DateType case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT32) + case UINT_8 => typeNotSupported() + case UINT_16 => typeNotSupported() + case UINT_32 => typeNotSupported() case TIME_MILLIS => typeNotImplemented() case _ => illegalType() } @@ -150,6 +156,7 @@ private[parquet] class CatalystSchemaConverter( originalType match { case INT_64 | null => LongType case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT64) + case UINT_64 => typeNotSupported() case TIMESTAMP_MILLIS => typeNotImplemented() case _ => illegalType() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 72744799897b..7e1bc284bc94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -206,6 +206,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("SPARK-10113 Support for unsigned Parquet logical types") { + val parquetSchema = MessageTypeParser.parseMessageType( + """message root { + | required int32 c(UINT_32); + |} + """.stripMargin) + + withTempPath { location => + val fileMetadata = new FileMetaData(parquetSchema, Map.empty[String, String].asJava ,"Spark") + val path = new Path(location.getCanonicalPath) + val footer = List( + new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) + ).asJava + + ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) + + val errorMessage = intercept[Throwable] { + sqlContext.read.parquet(path.toString).printSchema() + }.toString + assert(errorMessage.contains("Parquet type not supported")) + } + } + test("compression codec") { def compressionCodecFor(path: String, codecName: String): String = { val codecs = for { From 2752c14331539bda7ccd7e1bc5a2887da2a2b4cd Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 12 Nov 2015 10:01:26 +0900 Subject: [PATCH 2/2] [SPARK-10113][SQL] Add a variable for readability --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 7e1bc284bc94..82a42d68fedc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -214,7 +214,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) withTempPath { location => - val fileMetadata = new FileMetaData(parquetSchema, Map.empty[String, String].asJava ,"Spark") + val extraMetadata = Map.empty[String, String].asJava + val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) val footer = List( new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))