Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ atomExpression
| whenExpression
| (functionName LPAREN) => function
| tableOrColumn
| (LPAREN KW_SELECT) => subQueryExpression
-> ^(TOK_SUBQUERY_EXPR ^(TOK_SUBQUERY_OP) subQueryExpression)
| LPAREN! expression RPAREN!
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
UnresolvedAttribute(nameParts :+ cleanIdentifier(attr))
case other => UnresolvedExtractValue(other, Literal(cleanIdentifier(attr)))
}
case Token("TOK_SUBQUERY_EXPR", Token("TOK_SUBQUERY_OP", Nil) :: subquery :: Nil) =>
ScalarSubquery(nodeToPlan(subquery))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might sound excedingly dumb but I cannot find ScalarSubquery or SubqueryExpression. Are they already in the code base? Or did you create branch on top of another branch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind I just found the other PR...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed a file, sorry


/* Stars (*) */
case Token("TOK_ALLCOLREF", Nil) => UnresolvedStar(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Analyzer(
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ResolveSubquery ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalJoin ::
Expand Down Expand Up @@ -120,7 +121,13 @@ class Analyzer(
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
case other =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick comment on why this isn't in ResolveSubquery

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

other transformExpressions {
case e: SubqueryExpression =>
e.withNewPlan(substituteCTE(e.query, cteRelations))
}
}

}
}

Expand Down Expand Up @@ -693,6 +700,28 @@ class Analyzer(
}
}

/**
* This rule resolve subqueries inside expressions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent.

maybe comment that CTEs are handled elsewhere.

*/
object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {

private def hasSubquery(e: Expression): Boolean = {
e.find(_.isInstanceOf[SubqueryExpression]).isDefined
}

private def hasSubquery(q: LogicalPlan): Boolean = {
q.expressions.exists(hasSubquery)
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case q: LogicalPlan if q.childrenResolved && hasSubquery(q) =>
q transformExpressions {
case e: SubqueryExpression if !e.query.resolved =>
e.withNewPlan(execute(e.query))
}
}
}

/**
* Turns projections that contain aggregate expressions into aggregations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,12 @@ class CatalystQlSuite extends PlanTest {
parser.parsePlan("select sum(product + 1) over (partition by (product + (1)) order by 2) " +
"from windowData")
}

test("subquery") {
parser.parsePlan("select (select max(b) from s) ss from t")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing we are testing here is that things don't go really really wrong. I'd prefer it if we test the plan as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since plan checking is too easy to break, I added test for plan, finally remove them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that makes sense.

parser.parsePlan("select * from t where a = (select b from s)")
parser.parsePlan("select * from t where a > (select b from s)")
parser.parsePlan("select * from t group by g having a = (select b from s)")
parser.parsePlan("select * from t group by g having a > (select b from s)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@ package org.apache.spark.sql.execution
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.Logging
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric}
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.ThreadUtils

/**
* The base class for physical operators.
Expand Down Expand Up @@ -122,7 +125,33 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
final def prepare(): Unit = {
if (prepareCalled.compareAndSet(false, true)) {
doPrepare()

// collect all the subqueries and submit jobs to execute them in background
val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]()
val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e})
allSubqueries.foreach { e =>
val futureResult = scala.concurrent.future {
val df = DataFrame(sqlContext, e.query)
df.queryExecution.toRdd.collect()
}(SparkPlan.subqueryExecutionContext)
queryResults += e -> futureResult
}

children.foreach(_.prepare())

// fill in the result of subqueries
queryResults.foreach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should move the blocking phase into execute, otherwise if multiple nodes have subqueries, it becomes blocking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok you can't have a general execute.

I guess this is why some query engines have init and then prepare.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or a subquery is now blocking broadcasting ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called after doPrepare(), and after prepare() of it's children, so it will NOT block broadcasting (will happen in the same time).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there is a broadcast join after this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadcast will be issued before this.

case (e, futureResult) =>
val rows = Await.result(futureResult, Duration.Inf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to wait that long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will use sqlContext.conf.broadcastTimeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After offline discussion with @rxin and @marmbrus , we decided to not have a timeout here, we can see the spark job in UI and could cancel it anytime if it's blocked.

if (rows.length > 1) {
sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use postgres' error message: "more than one row returned by a subquery used as an expression"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have never thought we should match the exactly error message with PostgreSQL, that's great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current error message has more information than postgres', should we change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not 100% sure. maybe it's better to just say more than one, so we don't need to run the whole plan (e..g i'm thinking maybe we should inject a limit of 2 to subquery)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, changed to call executeTake(2)

s"${e.query.treeString}")
}
// Analyzer will make sure that it only return on column
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Analyzer should make sure this only returns one column"

and add an assert after this.

if (rows.length > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can rows.length ever be 0 here? if it can only be 1, why we are testing > 0 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the length could be zero, then the value is null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we write a query with 0 column? The comment above said the analyzer would make sure there's only one column.

If it is possible to have 0 column, then I'd make it explicitly here to set the value to null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if it is possible to have 0 column, we also need to add a test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rows.length means number of rows, not number of columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense.

please change the check to rows.length == 1

it's pretty confusing to first check it's greater than 1, and then check it is greater than 0, when you are just expecting 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and same thing applies - the test coverage for this is pretty bad. add a test case where the subquery returns 0 or more than 1 rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated: rows.length could be 0, then the value will be null, will add a comment for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it'd be better to make it more explicit, e.g.

if (rows.length == 0) {
  e.updateResult(null)
} else {
  assert(rows.length == 1)
  e.updateResult(rows(0).get(0, e.dataType))
}

e.updateResult(rows(0).get(0, e.dataType))
}
}
}
}

Expand Down Expand Up @@ -231,6 +260,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
}

object SparkPlan {
private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What threadpool are broadcasts done on? Should it be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be refactored later, use the same thread pool for all of them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BroadcastHashJoin defines a ThreadPool for broadcasting. I am moving that as part of #11083 into exchange.scala. We could use that one.

ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
}

private[sql] trait LeafNode extends SparkPlan {
override def children: Seq[SparkPlan] = Nil
override def producedAttributes: AttributeSet = outputSet
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,18 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
assert(error.getMessage contains "grouping_id() can only be used with GroupingSets/Cube/Rollup")
}

test("uncorrelated scalar subquery") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About test coverage:

let's create a subquery suite and move the test cases there.

Also it would be great to have at least once test case that actually runs on a dataset that is not generated by just select x, because I worry in the future we add some special optimizations and then all the test cases here become no-op.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we should test the behavior when there is no rows returned.

assertResult(Array(Row(1))) {
sql("select (select 1 as b) as b").collect()
}

assertResult(Array(Row(1))) {
sql("with t2 as (select 1 as b, 2 as c) " +
"select a from (select 1 as a union all select 2 as a) t " +
"where a = (select max(b) from t2) ").collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we support nested subqueries, can we add a test case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}
}

test("SPARK-13056: Null in map value causes NPE") {
val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value")
withTempTable("maptest") {
Expand Down