From 8306ab283272637526d3896c077377b7bc5e1fdd Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 19 Feb 2021 22:29:03 +0800 Subject: [PATCH 1/3] avoid NPE in DataFrameReader.schema(StructType) --- .../scala/org/apache/spark/sql/DataFrameReader.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 1a1954281cd06..9954ae4c44512 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -73,8 +73,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def schema(schema: StructType): DataFrameReader = { - val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] - this.userSpecifiedSchema = Option(replaced) + if (schema != null) { + val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] + this.userSpecifiedSchema = Option(replaced) + } this } @@ -90,10 +92,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 2.3.0 */ def schema(schemaString: String): DataFrameReader = { - val rawSchema = StructType.fromDDL(schemaString) - val schema = CharVarcharUtils.failIfHasCharVarchar(rawSchema).asInstanceOf[StructType] - this.userSpecifiedSchema = Option(schema) - this + schema(StructType.fromDDL(schemaString)) } /** From 8a38f52f293daa520dde845949fd8449ebaa682f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 22 Feb 2021 11:32:31 +0800 Subject: [PATCH 2/3] update streaming side --- .../org/apache/spark/sql/streaming/DataStreamReader.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 2ed2487c83b01..2365f9b93a6bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -64,8 +64,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * @since 2.0.0 */ def schema(schema: StructType): DataStreamReader = { - val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] - this.userSpecifiedSchema = Option(replaced) + if (schema != null) { + val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] + this.userSpecifiedSchema = Option(replaced) + } this } From 643f002379a712a7fe1da3245ac396a1c4c8ad42 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 22 Feb 2021 17:18:29 +0800 Subject: [PATCH 3/3] address comments --- .../org/apache/spark/sql/streaming/DataStreamReader.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 2365f9b93a6bb..a6913fab97a40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -79,10 +79,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * @since 2.3.0 */ def schema(schemaString: String): DataStreamReader = { - val rawSchema = StructType.fromDDL(schemaString) - val schema = CharVarcharUtils.failIfHasCharVarchar(rawSchema).asInstanceOf[StructType] - this.userSpecifiedSchema = Option(schema) - this + schema(StructType.fromDDL(schemaString)) } /**