Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
Davies Liu committed Feb 20, 2016
commit 3a8f08d286a70ecf00055c51c41baf567c8bcb7a
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@ import org.apache.spark.sql.types.DataType
/**
* An interface for subquery that is used in expressions.
*/
abstract class SubqueryExpression extends LeafExpression{
abstract class SubqueryExpression extends LeafExpression {

/**
* The logical plan of the query.
*/
def query: LogicalPlan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an helper function used in Analyzer and Optimizer, or we need to do type conversion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the base class for both logical plan and physical plan, kind of weird. This is to make the generateTreeString works in QueryPlan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Analyzer and Optimizer only applies to logical plan right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


/**
* The underline plan for the query, could be logical plan or physical plan.
*
* This is used to generate tree string.
* Either a logical plan or a physical plan. The generated tree string (explain output) uses this
* field to explain the subquery.
*/
def plan: QueryPlan[_]

Expand All @@ -48,7 +47,9 @@ abstract class SubqueryExpression extends LeafExpression{
/**
* A subquery that will return only one row and one column.
*
* This is not evaluable, it should be converted into SparkScalaSubquery.
* This will be converted into [[execution.ScalarSubquery]] during physical planning.
*
* Note: `exprId` is used to have unique name in explain string output.
*/
case class ScalarSubquery(
query: LogicalPlan,
Expand All @@ -63,7 +64,7 @@ case class ScalarSubquery(

override def checkInputDataTypes(): TypeCheckResult = {
if (query.schema.length != 1) {
TypeCheckResult.TypeCheckFailure("Scalar subquery can only have 1 column, but got " +
TypeCheckResult.TypeCheckFailure("Scalar subquery must return only one column, but got " +
query.schema.length.toString)
} else {
TypeCheckResult.TypeCheckSuccess
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
Batch("LocalRelation", FixedPoint(100),
ConvertToLocalRelation) ::
Batch("Subquery", Once,
Subquery) :: Nil
OptimizeSubqueries) :: Nil
}

/**
* Optimize all the subqueries inside expression.
*/
object Subquery extends Rule[LogicalPlan] {
object OptimizeSubqueries extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case subquery: SubqueryExpression =>
subquery.withNewPlan(execute(subquery.query))
subquery.withNewPlan(Optimizer.this.execute(subquery.query))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,47 +204,8 @@ class CatalystQlSuite extends PlanTest {
}

test("subquery") {
comparePlans(
parser.parsePlan("select (select max(b) from s) ss from t"),
Project(
UnresolvedAlias(
Alias(
ScalarSubquery(
Project(
UnresolvedAlias(
UnresolvedFunction("max", UnresolvedAttribute("b") :: Nil, false)) :: Nil,
UnresolvedRelation(TableIdentifier("s")))),
"ss")(ExprId(0))) :: Nil,
UnresolvedRelation(TableIdentifier("t"))))
comparePlans(
parser.parsePlan("select * from t where a = (select b from s)"),
Project(
UnresolvedAlias(
UnresolvedStar(None)) :: Nil,
Filter(
EqualTo(
UnresolvedAttribute("a"),
ScalarSubquery(
Project(
UnresolvedAlias(
UnresolvedAttribute("b")) :: Nil,
UnresolvedRelation(TableIdentifier("s"))))),
UnresolvedRelation(TableIdentifier("t")))))
comparePlans(
parser.parsePlan("select * from t group by g having a > (select b from s)"),
Filter(
Cast(
GreaterThan(
UnresolvedAttribute("a"),
ScalarSubquery(
Project(
UnresolvedAlias(
UnresolvedAttribute("b")) :: Nil,
UnresolvedRelation(TableIdentifier("s"))))),
BooleanType),
Aggregate(
UnresolvedAttribute("g") :: Nil,
UnresolvedAlias(UnresolvedStar(None)) :: Nil,
UnresolvedRelation(TableIdentifier("t")))))
parser.parsePlan("select (select max(b) from s) ss from t")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing we are testing here is that things don't go really really wrong. I'd prefer it if we test the plan as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since plan checking is too easy to break, I added test for plan, finally remove them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that makes sense.

parser.parsePlan("select * from t where a = (select b from s)")
parser.parsePlan("select * from t group by g having a > (select b from s)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,15 @@ class AnalysisErrorSuite extends AnalysisTest {
val dateLit = Literal.create(null, DateType)

errorTest(
"invalid scalar subquery",
"scalar subquery with 2 columns",
testRelation.select(
(ScalarSubquery(testRelation.select('a, dateLit.as('b))) + Literal(1)).as('a)),
"Scalar subquery can only have 1 column, but got 2" :: Nil)
"Scalar subquery must return only one column, but got 2" :: Nil)

errorTest(
"scalar subquery with no column",
testRelation.select(ScalarSubquery(LocalRelation()).as('a)),
"Scalar subquery must return only one column, but got 0" :: Nil)

errorTest(
"single invalid type, single arg",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ class SQLContext private[sql](
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches = Seq(
Batch("Subquery", Once, ConvertSubquery(self)),
Batch("Subquery", Once, PlanSubqueries(self)),
Batch("Add exchange", Once, EnsureRequirements(self)),
Batch("Whole stage codegen", Once, CollapseCodegenStages(self))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
doPrepare()

// collect all the subqueries and submit jobs to execute them in background
val queryResults = ArrayBuffer[(SparkScalarSubquery, Future[Array[InternalRow]])]()
val allSubqueries = expressions.flatMap(_.collect {case e: SparkScalarSubquery => e})
val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]()
val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e})
allSubqueries.foreach { e =>
val futureResult = Future {
e.plan.executeCollect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.executedPlan rather than e.plan, if you want to keep executedPlan

Expand All @@ -146,9 +146,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use postgres' error message: "more than one row returned by a subquery used as an expression"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have never thought we should match the exactly error message with PostgreSQL, that's great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current error message has more information than postgres', should we change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not 100% sure. maybe it's better to just say more than one, so we don't need to run the whole plan (e..g i'm thinking maybe we should inject a limit of 2 to subquery)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, changed to call executeTake(2)

s"${e.query.treeString}")
}
// Analyzer will make sure that it only return on column
if (rows.length > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can rows.length ever be 0 here? if it can only be 1, why we are testing > 0 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the length could be zero, then the value is null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we write a query with 0 column? The comment above said the analyzer would make sure there's only one column.

If it is possible to have 0 column, then I'd make it explicitly here to set the value to null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if it is possible to have 0 column, we also need to add a test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rows.length means number of rows, not number of columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense.

please change the check to rows.length == 1

it's pretty confusing to first check it's greater than 1, and then check it is greater than 0, when you are just expecting 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and same thing applies - the test coverage for this is pretty bad. add a test case where the subquery returns 0 or more than 1 rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated: rows.length could be 0, then the value will be null, will add a comment for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it'd be better to make it more explicit, e.g.

if (rows.length == 0) {
  e.updateResult(null)
} else {
  assert(rows.length == 1)
  e.updateResult(rows(0).get(0, e.dataType))
}

assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column")
e.updateResult(rows(0).get(0, e.dataType))
} else {
// the result should be null, since the expression already have null as default value,
// we don't need to update that.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not set it explicitly here so the behavior is defined entirely in this tiny function?

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExprId, ScalarSubquery, SubqueryExpression}
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{ExprId, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.DataType
*
* This is the physical copy of ScalarSubquery to be used inside SparkPlan.
*/
case class SparkScalarSubquery(
case class ScalarSubquery(
@transient executedPlan: SparkPlan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd just call this physicalPlan, or plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually plan is probably the best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan is already used to return a Subquery

exprId: ExprId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document why we need exprId (for explain string output?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please document it.

extends SubqueryExpression with CodegenFallback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this interact with whole stage codegen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is LeafExpression, whole stage codegen support that (passing null to eval()).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then it boxes everything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think it's fine, one boxing is not the end of world.

Expand Down Expand Up @@ -58,14 +58,13 @@ case class SparkScalarSubquery(
/**
* Convert the subquery from logical plan into executed plan.
*/
private[sql] case class ConvertSubquery(sqlContext: SQLContext) extends Rule[SparkPlan] {
private[sql] case class PlanSubqueries(sqlContext: SQLContext) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
plan.transformAllExpressions {
// Only scalar subquery will be executed separately, all others will be written as join.
case subquery: ScalarSubquery =>
case subquery: expressions.ScalarSubquery =>
val sparkPlan = sqlContext.planner.plan(ReturnAnswer(subquery.query)).next()
val executedPlan = sqlContext.prepareForExecution.execute(sparkPlan)
SparkScalarSubquery(executedPlan, subquery.exprId)
ScalarSubquery(executedPlan, subquery.exprId)
}
}
}