Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented May 4, 2023

What changes were proposed in this pull request?

AQE can not reuse subquery if it is pushed into InMemoryTableScan. There are two issues:

  • ReuseAdaptiveSubquery can not support reuse subquery if two subquery have the same exprId
  • InMemoryTableScan miss apply ReuseAdaptiveSubquery when wrap TableCacheQueryStageExec

For example:

Seq(1).toDF("c1").cache().createOrReplaceTempView("t1")
Seq(2).toDF("c2").createOrReplaceTempView("t2")
spark.sql("SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)")

There are two subquery#27 but have no ReusedSubquery

AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Filter (c1#14 < Subquery subquery#27, [id=#20])
   :  +- Subquery subquery#27, [id=#20]
   :     +- AdaptiveSparkPlan isFinalPlan=true
   :        +- LocalTableScan [c2#25]
   +- TableCacheQueryStage 0
      +- InMemoryTableScan [c1#14], [(c1#14 < Subquery subquery#27, [id=#20])]
            :- InMemoryRelation [c1#14], StorageLevel(disk, memory, deserialized, 1 replicas)
            :     +- LocalTableScan [c1#14]
            +- Subquery subquery#27, [id=#20]
               +- AdaptiveSparkPlan isFinalPlan=true
                  +- LocalTableScan [c2#25]

Why are the changes needed?

Improve the coverage of reuse subquery.

Note that, it is not a real perf issue because the subquery has been already reused (the same Java object). This pr just makes the plan clearer about subquery reuse.

Does this PR introduce any user-facing change?

no

How was this patch tested?

add test

@github-actions github-actions bot added the SQL label May 4, 2023
reuseMap.get(sub.plan.canonicalized).map { subquery =>
sub.withNewPlan(ReusedSubqueryExec(subquery))
}.getOrElse {
reuseMap.put(sub.plan.canonicalized, sub.plan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have atomicity concerns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior should be same with getOrElseUpdate. I changed put to putIfAbsent to fully equivalent with getOrElseUpdate that the first put will win if there is race condition.

Copy link
Contributor

@peter-toth peter-toth May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? 2 concurent .get()s for 2 semantic equal plans can return Nones and then none of the plans will be reused (wrapped into ReusedSubqueryExec).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior follows the getOrElseUpdate. It does not affect perf and I'm not sure how many possible can that happen, so I do not lock it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think getOrElseUpdate() on TrieMap is atomic, a .get() followed by a .put() is not.

Copy link
Contributor

@peter-toth peter-toth May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, getOrElseUpdate() is basically a .get() followed by a .putIfAbsent() as you do. But you should then check if putIfAbsent() was successful the map contains the key you put into it and depending on that do the reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @peter-toth , I see it now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! 99f810c looks ok to me.

}
assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan")
assert(reusedSubqueryIds.size == 5,
"Missing or unexpected reused ReusedSubqueryExec in the plan")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reuse subquery bahavior is now same with non-AQE.

} else {
// `InsertAdaptiveSparkPlan` compiles subquery for each exprId, then the java object
// is always `eq` if two subqueries have same exprId.
// Check if the subquery can be reused manually instead of call `getOrElseUpdate`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make the comment more explicit

// The subquery can be already reused (the same Java object) due to filter pushdown
// of table cache. If it happens, we just need to wrap the current subquery with `ReusedSubqueryExec`
// and no need to update te `reuseMap`.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

@cloud-fan
Copy link
Contributor

Can we mention it in the PR description that this is not a real perf issue, but just make the plan clearer about subquery reuse?

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in b54a94b May 6, 2023
@ulysses-you
Copy link
Contributor Author

thank you @cloud-fan @peter-toth @dongjoon-hyun

@ulysses-you ulysses-you deleted the aqe-subquery branch May 6, 2023 06:31
LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request May 10, 2023
### What changes were proposed in this pull request?

AQE can not reuse subquery if it is pushed into `InMemoryTableScan`. There are two issues:
- `ReuseAdaptiveSubquery` can not support reuse subquery if two subquery have the same exprId
-  `InMemoryTableScan` miss apply `ReuseAdaptiveSubquery` when wrap `TableCacheQueryStageExec`

For example:
```
Seq(1).toDF("c1").cache().createOrReplaceTempView("t1")
Seq(2).toDF("c2").createOrReplaceTempView("t2")
spark.sql("SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)")
```
There are two `subquery#27` but have no `ReusedSubquery`

```
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Filter (c1#14 < Subquery subquery#27, [id=apache#20])
   :  +- Subquery subquery#27, [id=apache#20]
   :     +- AdaptiveSparkPlan isFinalPlan=true
   :        +- LocalTableScan [c2#25]
   +- TableCacheQueryStage 0
      +- InMemoryTableScan [c1#14], [(c1#14 < Subquery subquery#27, [id=apache#20])]
            :- InMemoryRelation [c1#14], StorageLevel(disk, memory, deserialized, 1 replicas)
            :     +- LocalTableScan [c1#14]
            +- Subquery subquery#27, [id=apache#20]
               +- AdaptiveSparkPlan isFinalPlan=true
                  +- LocalTableScan [c2#25]
```

### Why are the changes needed?

Improve the coverage of reuse subquery.

Note that, it is not a real perf issue because the subquery has been already reused (the same Java object). This pr just makes the plan clearer about subquery reuse.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes apache#41046 from ulysses-you/aqe-subquery.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
// The subquery can be already reused (the same Java object) due to filter pushdown
// of table cache. If it happens, we just need to wrap the current subquery with
// `ReusedSubqueryExec` and no need to update the `reuseMap`.
reuseMap.get(sub.plan.canonicalized).map { subquery =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that this rule is not idempotent now. If we run it twice, then all subqueries will become ReusedSubqueryExec because all subqueries are in the reuseMap.

cloud-fan pushed a commit that referenced this pull request Jun 7, 2023
… subquery

### What changes were proposed in this pull request?

#41046 make `ReuseAdaptiveSubquery` become not idempotent. This pr reverts the change in `ReuseAdaptiveSubquery`.

To solve the same instance issue when planning and reusing subquery in AQE, this pr makes `subqueryMap` hold the executed plan in subquery. Then in `PlanAdaptiveSubqueries`, each logical subquery plan can build their own instance of physical subquery.

### Why are the changes needed?

To improve reuse subquery in AQE.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Pass CI

Closes #41454 from ulysses-you/SPARK-43376-2.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
… subquery

### What changes were proposed in this pull request?

apache#41046 make `ReuseAdaptiveSubquery` become not idempotent. This pr reverts the change in `ReuseAdaptiveSubquery`.

To solve the same instance issue when planning and reusing subquery in AQE, this pr makes `subqueryMap` hold the executed plan in subquery. Then in `PlanAdaptiveSubqueries`, each logical subquery plan can build their own instance of physical subquery.

### Why are the changes needed?

To improve reuse subquery in AQE.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Pass CI

Closes apache#41454 from ulysses-you/SPARK-43376-2.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants