Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,10 @@ case class AdaptiveSparkPlanExec(
BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized)
}
case i: InMemoryTableScanExec =>
// No need to optimize `InMemoryTableScanExec` as it's a leaf node.
TableCacheQueryStageExec(currentStageId, i)
// Apply `queryStageOptimizerRules` so that we can reuse subquery.
// No need to apply `postStageCreationRules` for `InMemoryTableScanExec`
// as it's a leaf node.
TableCacheQueryStageExec(currentStageId, optimizeQueryStage(i, isFinalStage = false))
}
currentStageId += 1
setLogicalLinkForNewQueryStage(queryStage, plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ case class InsertAdaptiveSparkPlan(
/**
* Returns an expression-id-to-execution-plan map for all the sub-queries.
* For each sub-query, generate the adaptive execution plan for each sub-query by applying this
* rule, or reuse the execution plan from another sub-query of the same semantics if possible.
* rule.
*/
private def buildSubqueryMap(plan: SparkPlan): Map[Long, BaseSubqueryExec] = {
val subqueryMap = mutable.HashMap.empty[Long, BaseSubqueryExec]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ case class ReuseAdaptiveSubquery(

plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case sub: ExecSubqueryExpression =>
val newPlan = reuseMap.getOrElseUpdate(sub.plan.canonicalized, sub.plan)
if (newPlan.ne(sub.plan)) {
sub.withNewPlan(ReusedSubqueryExec(newPlan))
} else {
// `InsertAdaptiveSparkPlan` compiles subquery for each exprId, then the java object
// is always `eq` if two subqueries have same exprId.
// Check if the subquery can be reused manually instead of call `getOrElseUpdate`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make the comment more explicit

// The subquery can be already reused (the same Java object) due to filter pushdown
// of table cache. If it happens, we just need to wrap the current subquery with `ReusedSubqueryExec`
// and no need to update te `reuseMap`.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

reuseMap.get(sub.plan.canonicalized).map { subquery =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that this rule is not idempotent now. If we run it twice, then all subqueries will become ReusedSubqueryExec because all subqueries are in the reuseMap.

sub.withNewPlan(ReusedSubqueryExec(subquery))
}.getOrElse {
reuseMap.put(sub.plan.canonicalized, sub.plan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have atomicity concerns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior should be same with getOrElseUpdate. I changed put to putIfAbsent to fully equivalent with getOrElseUpdate that the first put will win if there is race condition.

Copy link
Contributor

@peter-toth peter-toth May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? 2 concurent .get()s for 2 semantic equal plans can return Nones and then none of the plans will be reused (wrapped into ReusedSubqueryExec).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior follows the getOrElseUpdate. It does not affect perf and I'm not sure how many possible can that happen, so I do not lock it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think getOrElseUpdate() on TrieMap is atomic, a .get() followed by a .put() is not.

Copy link
Contributor

@peter-toth peter-toth May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, getOrElseUpdate() is basically a .get() followed by a .putIfAbsent() as you do. But you should then check if putIfAbsent() was successful the map contains the key you put into it and depending on that do the reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @peter-toth , I see it now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! 99f810c looks ok to me.

sub
}
}
Expand Down
44 changes: 28 additions & 16 deletions sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEPropagateEmptyRelation}
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -823,21 +823,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils

test("SPARK-19993 subquery with cached underlying relation") {
withTempView("t1") {
Seq(1).toDF("c1").createOrReplaceTempView("t1")
spark.catalog.cacheTable("t1")

// underlying table t1 is cached as well as the query that refers to it.
val sqlText =
"""
|SELECT * FROM t1
|WHERE
|NOT EXISTS (SELECT * FROM t1)
""".stripMargin
val ds = sql(sqlText)
assert(getNumInMemoryRelations(ds) == 2)

val cachedDs = sql(sqlText).cache()
assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.sparkPlan) == 3)
Seq(false, true).foreach { enabled =>
withSQLConf(
SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> enabled.toString,
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key ->
AQEPropagateEmptyRelation.ruleName) {

Seq(1).toDF("c1").createOrReplaceTempView("t1")
spark.catalog.cacheTable("t1")

// underlying table t1 is cached as well as the query that refers to it.
val sqlText =
"""
|SELECT * FROM t1
|WHERE
|NOT EXISTS (SELECT * FROM t1)
""".stripMargin
val ds = sql(sqlText)
assert(getNumInMemoryRelations(ds) == 2)

val cachedDs = sql(sqlText).cache()
cachedDs.collect()
assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.executedPlan) == 3)

cachedDs.unpersist()
spark.catalog.uncacheTable("t1")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2826,6 +2826,21 @@ class AdaptiveQueryExecSuite
.executedPlan.isInstanceOf[LocalTableScanExec])
}
}

test("SPARK-43376: Improve reuse subquery with table cache") {
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
withTable("t1", "t2") {
withCache("t1") {
Seq(1).toDF("c1").cache().createOrReplaceTempView("t1")
Seq(2).toDF("c2").createOrReplaceTempView("t2")

val (_, adaptive) = runAdaptiveAndVerifyResult(
"SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)")
assert(findReusedSubquery(adaptive).size == 1)
}
}
}
}
}

/**
Expand Down