Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Alternative approach.
  • Loading branch information
dilipbiswal committed Mar 11, 2016
commit cd0c001367aa3a39b9a26e4162b733bea225812f
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,6 @@ case class MultiAlias(child: Expression, names: Seq[String])

override def toString: String = s"$child AS $names"

override def sql: String = {
val aliasNames = names.map(quoteIdentifier(_)).mkString(",")
s"${child.sql} AS ($aliasNames)"
}
}

/**
Expand Down
74 changes: 44 additions & 30 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiAlias
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -95,7 +94,10 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case Distinct(p: Project) =>
projectToSQL(p, isDistinct = true)

case g : Generate =>
case p@ Project(_, g: Generate) =>
generateToSQL(p)

case g: Generate =>
generateToSQL(g)

case p: Project =>
Expand Down Expand Up @@ -212,36 +214,15 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
segments.map(_.trim).filter(_.nonEmpty).mkString(" ")

private def projectToSQL(plan: Project, isDistinct: Boolean): String = {
val (projectListExprs, planToProcess) = getProjectListExprs(plan)
build(
"SELECT",
if (isDistinct) "DISTINCT" else "",
projectListExprs.map(_.sql).mkString(", "),
if (planToProcess == OneRowRelation) "" else "FROM",
toSQL(planToProcess)
plan.projectList.map(_.sql).mkString(", "),
if (plan.child == OneRowRelation) "" else "FROM",
toSQL(plan.child)
)
}

private def getProjectListExprs(plan: Project): (Seq[NamedExpression], LogicalPlan) = {
plan match {
case p @ Project(_, g: Generate) if g.qualifier.isEmpty =>
// Only keep the first generated column in the list so that we can
// transform it to a Generator expression in the following step.
val projList = p.projectList.filter {
case e: Expression if g.generatorOutput.tail.exists(_.semanticEquals(e)) => false
case _ => true
}
val exprs = projList.map {
case e: Expression if g.generatorOutput.exists(_.semanticEquals(e)) =>
val names = g.generatorOutput.map(_.name)
MultiAlias(g.generator, names)
case other => other
}
(exprs, g.child)
case _ => (plan.projectList, plan.child)
}
}

private def aggregateToSQL(plan: Aggregate): String = {
val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ")
build(
Expand Down Expand Up @@ -332,9 +313,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
toSQL(w.child)
}

/* This function handles the SQL generation when generators are specified in the
* LATERAL VIEW clause. SQL generation of generators specified in projection lists
* is handled in projectToSQL.
/* This function handles the SQL generation for generators.
* sample plan :
* +- Project [mycol2#192]
* +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192]
Expand All @@ -347,7 +326,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get
val outerClause = if (plan.outer) "OUTER" else ""
build(
toSQL(plan.child),
if (plan.child == OneRowRelation) s"(SELECT 1) ${SQLBuilder.newSubqueryName}" else toSQL(plan.child),
"LATERAL VIEW",
outerClause,
plan.generator.sql,
Expand All @@ -357,6 +336,37 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}

private def generateToSQL(plan: Project): String = {
// assert if child is a generate or not.
val generate = plan.child.asInstanceOf[Generate]
// Generators that appear in projection list will be expressed as LATERAL VIEW.
// A qualifier is needed for a LATERAL VIEw.
val generatorAlias: String = generate.qualifier.getOrElse(SQLBuilder.newGeneratorName)

// Qualify the attributes in projection list.
val newProjList = plan.projectList.map {
case a if generate.generatorOutput.exists(_.semanticEquals(a)) =>
a.toAttribute.withQualifiers(Seq(generatorAlias))
case o => o
}

//If Generate is missing the qualifier (its in projection list) , add one here.
val planToProcess =
if (generate.qualifier.isEmpty) {
generate.copy(qualifier = Some(generatorAlias))
}
else {
generate
}

build(
"SELECT",
newProjList.map(a => a.sql).mkString(","),
"FROM",
toSQL(planToProcess)
)
}

object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
Batch("Collapse Project", FixedPoint(100),
Expand Down Expand Up @@ -461,4 +471,8 @@ object SQLBuilder {
private val nextSubqueryId = new AtomicLong(0)

private def newSubqueryName: String = s"gen_subquery_${nextSubqueryId.getAndIncrement()}"

private val nextGeneratorId = new AtomicLong(0)

private def newGeneratorName: String = s"gen_generator_${nextGeneratorId.getAndIncrement()}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,23 +588,28 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
test("SQL generation for lateral views") {
// Filter and OUTER clause
checkHiveQl(
"""SELECT key, value
|FROM t1 LATERAL VIEW OUTER explode(value) gentab as gencol
"""
|SELECT key, value
|FROM t1
|LATERAL VIEW OUTER explode(value) gentab as gencol
|WHERE key = 1
""".stripMargin
)

// single lateral view
checkHiveQl(
"""SELECT *
|FROM t1 LATERAL VIEW explode(array(1,2,3)) gentab AS gencol
"""
|SELECT *
|FROM t1
|LATERAL VIEW explode(array(1,2,3)) gentab AS gencol
|SORT BY key ASC, gencol ASC LIMIT 1
""".stripMargin
)

// multiple lateral views
checkHiveQl(
"""SELECT gentab2.*
"""
|SELECT gentab2.*
|FROM t1
|LATERAL VIEW explode(array(array(1,2,3))) gentab1 AS gencol1
|LATERAL VIEW explode(gentab1.gencol1) gentab2 AS gencol2 LIMIT 3
Expand All @@ -614,22 +619,24 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
// No generated column aliases
checkHiveQl(
"""SELECT gentab.*
|FROM
|t1 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2
|FROM t1
|LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2
""".stripMargin
)
}

test("SQL generation for lateral views in subquery") {
// Subquries in FROM clause using Generate
checkHiveQl(
"""SELECT subq.gencol
"""
|SELECT subq.gencol
|FROM
|(SELECT * from t1 LATERAL VIEW explode(value) gentab AS gencol) subq
""".stripMargin)

checkHiveQl(
"""SELECT subq.key
"""
|SELECT subq.key
|FROM
|(SELECT key, value from t1 LATERAL VIEW explode(value) gentab AS gencol) subq
""".stripMargin
Expand Down