Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-22431][SQL] Ensure that the datatype in the schema for the tab…
…le/view metadata is parseable by Spark before persisting it

## What changes were proposed in this pull request?
* JIRA:  [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431)  : Creating Permanent view with illegal type

**Description:**
- It is possible in Spark SQL to create a permanent view that uses an nested field with an illegal name.
- For example if we create the following view:
```create view x as select struct('a' as `$q`, 1 as b) q```
- A simple select fails with the following exception:

```
select * from x;

org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int>
  at org.apache.spark.sql.hive.client.HiveClientImpl$.fromHiveColumn(HiveClientImpl.scala:812)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
...
```
**Issue/Analysis**: Right now, we can create a view with a schema that cannot be read back by Spark from the Hive metastore.  For more details, please see the discussion about the analysis and proposed fix options in comment 1 and comment 2 in the [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431)

**Proposed changes**:
 - Fix the hive table/view codepath to check whether the schema datatype is parseable by Spark before persisting it in the metastore. This change is localized to HiveClientImpl to do the check similar to the check in FromHiveColumn. This is fail-fast and we will avoid the scenario where we write something to the metastore that we are unable to read it back.
- Added new unit tests
- Ran the sql related unit test suites ( hive/test, sql/test, catalyst/test) OK

With the fix:
```
create view x as select struct('a' as `$q`, 1 as b) q;
17/11/28 10:44:55 ERROR SparkSQLDriver: Failed in [create view x as select struct('a' as `$q`, 1 as b) q]
org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int>
	at org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$getSparkSQLDataType(HiveClientImpl.scala:884)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
...
```
## How was this patch tested?
- New unit tests have been added.

hvanhovell, Please review and share your thoughts/comments.  Thank you so much.

Author: Sunitha Kambhampati <[email protected]>

Closes apache#19747 from skambha/spark22431.
  • Loading branch information
skambha authored and hvanhovell committed Nov 28, 2017
commit a10b328dbc056aafaa696579f9a6e2b0cb8eb25f
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
}
}

test("SPARK-22431: table with nested type col with special char") {
withTable("t") {
spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING PARQUET")
checkAnswer(spark.table("t"), Nil)
}
}

test("SPARK-22431: view with nested type") {
withView("t", "v") {
spark.sql("CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
checkAnswer(spark.table("t"), Row(Row("a", 1)) :: Nil)
spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
checkAnswer(spark.table("t"), Row(Row("a", 1)) :: Nil)
}
}
}

abstract class DDLSuite extends QueryTest with SQLTestUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ private[hive] class HiveClientImpl(
}

override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
verifyColumnDataType(table.dataSchema)
client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists)
}

Expand All @@ -507,6 +508,7 @@ private[hive] class HiveClientImpl(
// these properties are still available to the others that share the same Hive metastore.
// If users explicitly alter these Hive-specific properties through ALTER TABLE DDL, we respect
// these user-specified values.
verifyColumnDataType(table.dataSchema)
val hiveTable = toHiveTable(
table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName))
// Do not use `table.qualifiedName` here because this may be a rename
Expand All @@ -520,6 +522,7 @@ private[hive] class HiveClientImpl(
newDataSchema: StructType,
schemaProps: Map[String, String]): Unit = withHiveState {
val oldTable = client.getTable(dbName, tableName)
verifyColumnDataType(newDataSchema)
val hiveCols = newDataSchema.map(toHiveColumn)
oldTable.setFields(hiveCols.asJava)

Expand Down Expand Up @@ -872,15 +875,19 @@ private[hive] object HiveClientImpl {
new FieldSchema(c.name, typeString, c.getComment().orNull)
}

/** Builds the native StructField from Hive's FieldSchema. */
def fromHiveColumn(hc: FieldSchema): StructField = {
val columnType = try {
/** Get the Spark SQL native DataType from Hive's FieldSchema. */
private def getSparkSQLDataType(hc: FieldSchema): DataType = {
try {
CatalystSqlParser.parseDataType(hc.getType)
} catch {
case e: ParseException =>
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}
}

/** Builds the native StructField from Hive's FieldSchema. */
def fromHiveColumn(hc: FieldSchema): StructField = {
val columnType = getSparkSQLDataType(hc)
val metadata = if (hc.getType != columnType.catalogString) {
new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
} else {
Expand All @@ -895,6 +902,10 @@ private[hive] object HiveClientImpl {
Option(hc.getComment).map(field.withComment).getOrElse(field)
}

private def verifyColumnDataType(schema: StructType): Unit = {
schema.foreach(col => getSparkSQLDataType(toHiveColumn(col)))
}

private def toInputFormat(name: String) =
Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
test("alter datasource table add columns - partitioned - orc") {
testAddColumnPartitioned("orc")
}

test("SPARK-22431: illegal nested type") {
val queries = Seq(
"CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
"CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
"CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")

queries.foreach(query => {
val err = intercept[SparkException] {
spark.sql(query)
}.getMessage
assert(err.contains("Cannot recognize hive type string"))
})

withView("v") {
spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)

val err = intercept[SparkException] {
spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
}.getMessage
assert(err.contains("Cannot recognize hive type string"))
}
}

test("SPARK-22431: table with nested type") {
withTable("t", "x") {
spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING PARQUET")
checkAnswer(spark.table("t"), Nil)
spark.sql("CREATE TABLE x (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
checkAnswer(spark.table("x"), Nil)
}
}

test("SPARK-22431: view with nested type") {
withView("v") {
spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
checkAnswer(spark.table("v"), Row(Row("a", 1)) :: Nil)

spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `b`, 1 AS b) q1")
val df = spark.table("v")
assert("q1".equals(df.schema.fields(0).name))
checkAnswer(df, Row(Row("a", 1)) :: Nil)
}
}

test("SPARK-22431: alter table tests with nested types") {
withTable("t1", "t2", "t3") {
spark.sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`col1`:STRING, col2:Int>)")
val newcol = spark.sql("SELECT * FROM t1").schema.fields(2).name
assert("newcol1".equals(newcol))

spark.sql("CREATE TABLE t2(q STRUCT<`a`:INT, col2:STRING>, i1 INT) USING PARQUET")
spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, col2:Int>)")

val df2 = spark.table("t2")
checkAnswer(df2, Nil)
assert("newcol1".equals(df2.schema.fields(2).name))
assert("newcol2".equals(df2.schema.fields(3).name))

spark.sql("CREATE TABLE t3(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING PARQUET")
spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, col2:Int>)")

val df3 = spark.table("t3")
checkAnswer(df3, Nil)
assert("newcol1".equals(df3.schema.fields(2).name))
assert("newcol2".equals(df3.schema.fields(3).name))
}
}

test("SPARK-22431: negative alter table tests with nested types") {
withTable("t1") {
spark.sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
val err = intercept[SparkException] {
spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
}.getMessage
assert(err.contains("Cannot recognize hive type string:"))
}
}
}

class HiveDDLSuite
Expand Down