Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-12719] SQL generation support for generators (including UDTF)
  • Loading branch information
dilipbiswal committed Mar 11, 2016
commit 1088c30c0c965ee9f32170658c7b24c4c3b836af
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ case class MultiAlias(child: Expression, names: Seq[String])

override def toString: String = s"$child AS $names"

override def sql: String = {
val aliasNames = names.map(quoteIdentifier(_)).mkString(",")
s"${child.sql} AS ($aliasNames)"
}
}

/**
Expand Down
56 changes: 53 additions & 3 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiAlias
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -94,6 +95,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case Distinct(p: Project) =>
projectToSQL(p, isDistinct = true)

case g : Generate =>
generateToSQL(g)

case p: Project =>
projectToSQL(p, isDistinct = false)

Expand Down Expand Up @@ -208,15 +212,36 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
segments.map(_.trim).filter(_.nonEmpty).mkString(" ")

private def projectToSQL(plan: Project, isDistinct: Boolean): String = {
val (projectListExprs, planToProcess) = getProjectListExprs(plan)
build(
"SELECT",
if (isDistinct) "DISTINCT" else "",
plan.projectList.map(_.sql).mkString(", "),
if (plan.child == OneRowRelation) "" else "FROM",
toSQL(plan.child)
projectListExprs.map(_.sql).mkString(", "),
if (planToProcess == OneRowRelation) "" else "FROM",
toSQL(planToProcess)
)
}

private def getProjectListExprs(plan: Project): (Seq[NamedExpression], LogicalPlan) = {
plan match {
case p @ Project(_, g: Generate) if g.qualifier.isEmpty =>
// Only keep the first generated column in the list so that we can
// transform it to a Generator expression in the following step.
val projList = p.projectList.filter {
case e: Expression if g.generatorOutput.tail.exists(_.semanticEquals(e)) => false
case _ => true
}
val exprs = projList.map {
case e: Expression if g.generatorOutput.exists(_.semanticEquals(e)) =>
val names = g.generatorOutput.map(_.name)
MultiAlias(g.generator, names)
case other => other
}
(exprs, g.child)
case _ => (plan.projectList, plan.child)
}
}

private def aggregateToSQL(plan: Aggregate): String = {
val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ")
build(
Expand Down Expand Up @@ -305,6 +330,30 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
(w.child.output ++ w.windowExpressions).map(_.sql).mkString(", "),
if (w.child == OneRowRelation) "" else "FROM",
toSQL(w.child)
}

/* This function handles the SQL generation when generators are specified in the
* LATERAL VIEW clause. SQL generation of generators specified in projection lists
* are handled in projectToSQL.
* sample plan :
* +- Project [mycol2#192]
* +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192]
* +- Generate explode(array(array(1, 2, 3))), true, false, Some(mytable), [mycol#191]
* +- MetastoreRelation default, src, None
*
*/
private def generateToSQL(plan: Generate): String = {
val columnAliases = plan.generatorOutput.map(a => quoteIdentifier(a.name)).mkString(",")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just do plan.generatorOutput.map(_.sql).mkString(", ")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Thx

val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get
val outerClause = if (plan.outer) "OUTER" else ""
build(
toSQL(plan.child),
"LATERAL VIEW ",
outerClause,
plan.generator.sql,
quoteIdentifier(generatorAlias),
"AS",
columnAliases
)
}

Expand Down Expand Up @@ -360,6 +409,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi

case plan @ Project(_,
_: SubqueryAlias
| _: Generate
| _: Filter
| _: Join
| _: MetastoreRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
sql("DROP TABLE IF EXISTS parquet_t1")
sql("DROP TABLE IF EXISTS parquet_t2")
sql("DROP TABLE IF EXISTS t0")
sql("DROP TABLE IF EXISTS t3")
sql("DROP TABLE IF EXISTS t4")

val tuples: Seq[(String, String)] =
("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") ::
("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") ::
("3", """{"f1": "value13", "f4": "value44", "f3": "value33", "f2": 2, "f5": 5.01}""") ::
("4", null) ::
("5", """{"f1": "", "f5": null}""") ::
("6", "[invalid JSON string]") ::
Nil

sqlContext.range(10).write.saveAsTable("parquet_t0")
sql("CREATE TABLE t0 AS SELECT * FROM parquet_t0")
Expand All @@ -45,13 +56,20 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
.select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.write
.saveAsTable("parquet_t2")

sqlContext.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write.saveAsTable("t2")

tuples.toDF("key", "jstring").write.saveAsTable("t3")
sql("CREATE TABLE t4 as select key, array(value) as value from parquet_t1 limit 20")
}

override protected def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS parquet_t0")
sql("DROP TABLE IF EXISTS parquet_t1")
sql("DROP TABLE IF EXISTS parquet_t2")
sql("DROP TABLE IF EXISTS t0")
sql("DROP TABLE IF EXISTS t3")
sql("DROP TABLE IF EXISTS t4")
}

private def checkHiveQl(hiveQl: String): Unit = {
Expand Down Expand Up @@ -550,4 +568,86 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|WINDOW w AS (PARTITION BY key % 5 ORDER BY key)
""".stripMargin)
}

test("SQL generation for generate") {
sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}")
// The function source code can be found at:
// https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
sql(
"""
|CREATE TEMPORARY FUNCTION udtf_count2
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
""".stripMargin)

// Basic Explode
checkHiveQl("SELECT explode(array(1,2,3)) FROM src")

// Explode with Alias
checkHiveQl("SELECT explode(array(1,2,3)) as value FROM src")

// Explode without FROM
checkHiveQl("select explode(array(1,2,3)) AS gencol")

// Explode with columns other than generated columns in projection list
checkHiveQl("SELECT key as c1, explode(array(1,2,3)) as c2, value as c3 FROM t4")

// Explode with a column reference as input to generator
checkHiveQl("SELECT key, value from t4 LATERAL VIEW explode(value) gentab")

// json_tuple
checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM t3")

// udtf
checkHiveQl("SELECT key, gencol FROM t4 LATERAL VIEW udtf_count2(value) gentab AS gencol")

// udtf
checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t4 LIMIT 3) g1")

// Filter and OUTER clause
checkHiveQl(
"""SELECT key, value
|FROM t4 LATERAL VIEW OUTER explode(value) gentab as gencol
|WHERE key = 1
""".stripMargin
)

// single lateral view
checkHiveQl(
"""SELECT *
|FROM t4 LATERAL VIEW explode(array(1,2,3)) gentab AS gencol
|SORT BY key ASC, gencol ASC LIMIT 1
""".stripMargin
)

// multiple lateral views
checkHiveQl(
"""SELECT gentab2.*
|FROM t4
|LATERAL VIEW explode(array(array(1,2,3))) gentab1 AS gencol1
|LATERAL VIEW explode(gentab1.gencol1) gentab2 AS gencol2 LIMIT 3
""".stripMargin
)

// Subquries in FROM clause using Generate
checkHiveQl(
"""SELECT subq.gencol
|FROM
|(SELECT * from t4 LATERAL VIEW explode(value) gentab AS gencol) subq
""".stripMargin)

checkHiveQl(
"""SELECT subq.key
|FROM
|(SELECT key, value from t4 LATERAL VIEW explode(value) gentab AS gencol) subq
""".stripMargin
)

checkHiveQl(
"""SELECT gentab.*
|FROM
|t4 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2
""".stripMargin
)
sql("DROP TEMPORARY FUNCTION udtf_count2")
}
}