diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 760335bba5d4..71c78313e861 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -94,6 +94,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case Distinct(p: Project) => projectToSQL(p, isDistinct = true) + case p @ Project(_, g: Generate) => + generateToSQL(p) + + case g: Generate => + generateToSQL(g) + case p: Project => projectToSQL(p, isDistinct = false) @@ -308,6 +314,65 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi ) } + /** + * This function handles the SQL generation for generators. + * sample plan : + * +- Project [mycol2#192] + * +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192] + * +- Generate explode(array(array(1, 2, 3))), true, false, Some(mytable), [mycol#191] + * +- MetastoreRelation default, src, None + */ + private def generateToSQL(plan: Generate): String = { + val columnAliases = plan.generatorOutput.map(_.sql).mkString(",") + val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get + val outerClause = if (plan.outer) "OUTER" else "" + build( + if (plan.child == OneRowRelation) { + s"(SELECT 1) ${SQLBuilder.newSubqueryName}" + } + else { + toSQL(plan.child) + }, + "LATERAL VIEW", + outerClause, + plan.generator.sql, + quoteIdentifier(generatorAlias), + "AS", + columnAliases + ) + } + + private def generateToSQL(plan: Project): String = { + val generate = plan.child.asInstanceOf[Generate] + assert(generate.join == true || plan.projectList.size == 1) + // Generators that appear in projection list will be expressed as LATERAL VIEW. + // A qualifier is needed for a LATERAL VIEw. + val generatorAlias: String = generate.qualifier.getOrElse(SQLBuilder.newGeneratorName) + + // Qualify the attributes in projection list. + val newProjList = plan.projectList.map { + case a if generate.generatorOutput.exists(_.semanticEquals(a)) => + a.toAttribute.withQualifiers(Seq(generatorAlias)) + case o => o + } + + // If Generate is missing the qualifier (its in projection list) , add one here. + val planToProcess = + if (generate.qualifier.isEmpty) { + generate.copy(qualifier = Some(generatorAlias)) + } + else { + generate + } + + build( + "SELECT", + newProjList.map(a => a.sql).mkString(","), + "FROM", + toSQL(planToProcess) + ) + } + object Canonicalizer extends RuleExecutor[LogicalPlan] { override protected def batches: Seq[Batch] = Seq( Batch("Collapse Project", FixedPoint(100), @@ -360,6 +425,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case plan @ Project(_, _: SubqueryAlias + | _: Generate | _: Filter | _: Join | _: MetastoreRelation @@ -411,4 +477,8 @@ object SQLBuilder { private val nextSubqueryId = new AtomicLong(0) private def newSubqueryName: String = s"gen_subquery_${nextSubqueryId.getAndIncrement()}" + + private val nextGeneratorId = new AtomicLong(0) + + private def newGeneratorName: String = s"gen_generator_${nextGeneratorId.getAndIncrement()}" } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 198652b355fe..8ecbe1fd33b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -29,7 +29,18 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { sql("DROP TABLE IF EXISTS parquet_t0") sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") + sql("DROP TABLE IF EXISTS parquet_t3") sql("DROP TABLE IF EXISTS t0") + sql("DROP TABLE IF EXISTS t1") + + val tuples: Seq[(String, String)] = + ("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") :: + ("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") :: + ("3", """{"f1": "value13", "f4": "value44", "f3": "value33", "f2": 2, "f5": 5.01}""") :: + ("4", null) :: + ("5", """{"f1": "", "f5": null}""") :: + ("6", "[invalid JSON string]") :: + Nil sqlContext.range(10).write.saveAsTable("parquet_t0") sql("CREATE TABLE t0 AS SELECT * FROM parquet_t0") @@ -45,13 +56,18 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) .write .saveAsTable("parquet_t2") + + tuples.toDF("key", "jstring").write.saveAsTable("parquet_t3") + sql("CREATE TABLE t1 as select key, array(value) as value from parquet_t1 limit 20") } override protected def afterAll(): Unit = { sql("DROP TABLE IF EXISTS parquet_t0") sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") + sql("DROP TABLE IF EXISTS parquet_t3") sql("DROP TABLE IF EXISTS t0") + sql("DROP TABLE IF EXISTS t1") } private def checkHiveQl(hiveQl: String): Unit = { @@ -550,4 +566,107 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { |WINDOW w AS (PARTITION BY key % 5 ORDER BY key) """.stripMargin) } + + test("SQL generator for explode in projection list") { + // Basic Explode + checkHiveQl("SELECT explode(array(1,2,3)) FROM src") + + // Explode with Alias + checkHiveQl("SELECT explode(array(1,2,3)) as value FROM src") + + // Explode without FROM + checkHiveQl("select explode(array(1,2,3)) AS gencol") + + // non-generated columns in projection list + checkHiveQl("SELECT key as c1, explode(array(1,2,3)) as c2, value as c3 FROM t1") + } + + test("SQL generation for json_tuple as generator") { + checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM parquet_t3") + } + + test("Lateral view with join") { + checkHiveQl( + """ + |SELECT gencol, explode(array(1,2,3)), x1.key + |FROM (t1 LATERAL VIEW OUTER explode(value) gentab as gencol), t1 as x1 + """.stripMargin + ) + } + + test("SQL generation for lateral views") { + // Filter and OUTER clause + checkHiveQl( + """ + |SELECT key, value + |FROM t1 + |LATERAL VIEW OUTER explode(value) gentab as gencol + |WHERE key = 1 + """.stripMargin + ) + + // single lateral view + checkHiveQl( + """ + |SELECT * + |FROM t1 + |LATERAL VIEW explode(array(1,2,3)) gentab AS gencol + |SORT BY key ASC, gencol ASC LIMIT 1 + """.stripMargin + ) + + // multiple lateral views + checkHiveQl( + """ + |SELECT gentab2.* + |FROM t1 + |LATERAL VIEW explode(array(array(1,2,3))) gentab1 AS gencol1 + |LATERAL VIEW explode(gentab1.gencol1) gentab2 AS gencol2 LIMIT 3 + """.stripMargin + ) + + // No generated column aliases + checkHiveQl( + """SELECT gentab.* + |FROM t1 + |LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2 + """.stripMargin + ) + } + + test("SQL generation for lateral views in subquery") { + // Subquries in FROM clause using Generate + checkHiveQl( + """ + |SELECT subq.gencol + |FROM + |(SELECT * from t1 LATERAL VIEW explode(value) gentab AS gencol) subq + """.stripMargin) + + checkHiveQl( + """ + |SELECT subq.key + |FROM + |(SELECT key, value from t1 LATERAL VIEW explode(value) gentab AS gencol) subq + """.stripMargin + ) + } + + test("SQL generation for UDTF") { + sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}") + + // The function source code can be found at: + // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF + sql( + """ + |CREATE TEMPORARY FUNCTION udtf_count2 + |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' + """.stripMargin) + + checkHiveQl("SELECT key, gencol FROM t1 LATERAL VIEW udtf_count2(value) gentab AS gencol") + + checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t1 LIMIT 3) g1") + + sql("DROP TEMPORARY FUNCTION udtf_count2") + } }