Skip to content

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Sep 10, 2022

What changes were proposed in this pull request?

Change canonicalization to a one pass process and move logic from Canonicalize.reorderCommutativeOperators to the respective commutative operators' canonicalize.

Why are the changes needed?

#34883 improved expression canonicalization performance but introduced regression when a commutative operator is under a BinaryComparison. This is because children reorder by their hashcode can't happen in preCanonicalized phase when children are not yet "final".

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added new UT.

@ahshahid
Copy link
Contributor

ahshahid commented Sep 10, 2022

Suggest you run the benchmark test of cloud_fan
May be something like nested add ( which his benchmar test uses) > 11

@peter-toth
Copy link
Contributor Author

peter-toth commented Sep 11, 2022

The benchmark:

  test("benchmark") {
    val col = Literal(true)
    var and = And(col, col)
    var i = 0
    while (i < 2000) {
      and = And(and, col)
      i += 1
    }
    and.canonicalized
  }

is fine, 140ms on my machine. That's because we don't alter the bottom-up traversal, just at the BinaryComparison nodes we can swap the already ordered children.

But I need to look into the test failures, though...

@ahshahid
Copy link
Contributor

Got it.. your change is better solution.

@peter-toth peter-toth force-pushed the SPARK-40362-fix-binarycomparison-canonicalization branch from 22222cf to a4e0e0e Compare September 11, 2022 16:27
@peter-toth peter-toth force-pushed the SPARK-40362-fix-binarycomparison-canonicalization branch from b45d6fe to 88cb006 Compare September 12, 2022 13:17
case bc: GreaterThanOrEqual => orderBinaryComparison(bc, LessThanOrEqual)
case bc: LessThanOrEqual => orderBinaryComparison(bc, GreaterThanOrEqual)

case _ => e.mapChildren(preCanonicalizeAndReorderOperators).preCanonicalized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we do reorder first and then do pre-canonicalization?

Copy link
Contributor Author

@peter-toth peter-toth Sep 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that we pre-canonicalize and reorder the node's children first and then pre-canonicalize the node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately 88cb006 didn't work because HigherOrderFunction.preCanonicalized modifies its children's NamedLambdaVariables and reorder should happen after these modifications.

But in 99f8b61 and 6a38a00 I'm proposing a change that we break the full tree traversal in Canonicalize.reorderOperators, and instead, we could let non commutative operator's preCanonicalized to call their children'scanonicalized to initialte Canonicalize.reorderOperators on their children.

Copy link
Contributor Author

@peter-toth peter-toth Sep 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach can be refectored further and we can move all logic from preCanonicalized to canonicalized, just we need to override commutative operators's canonicalized to call Canonicalize.reorderCommutativeOperators: 726f7f0

@ahshahid
Copy link
Contributor

ahshahid commented Sep 12, 2022

I see that pre-canonicalized has been removed, which makes sense & is a single pass.
This also caches the intermediate canonicalized expression which is a good thing in the sense it is doing with reorder..
It would have slightly detrimental impact on actual benchmark but well within tolerance.
and since tree traversal has been removed from the reorder for non-commutative expressions which makes sense and improves perf.

This is good to go I suppose

gatherCommutative(e, f).sortBy(_.hashCode())

def reorderCommutativeOperators(e: Expression): Expression = e match {
// TODO: do not reorder consecutive `Add`s or `Multiply`s with different `failOnError` flags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to keep this TODO somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added back in dead8b6

// the actual "user-facing API" of expression canonicalization. Only the root node of the
// expression tree will instantiate the `canonicalized` variable. This is different from
// `preCanonicalized`, because `canonicalized` does "global" canonicalization and most of the time
// you cannot reuse the canonicalization result of the children.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good that we now have a better and simplified version, but we should still have a detailed comment to explain the new workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a process comment in 6c68d8b, let me know if it needs more details.

Copy link
Contributor Author

@peter-toth peter-toth Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also updated the PR description.

f: PartialFunction[Expression, Seq[Expression]]): Seq[Expression] = e match {
case c if f.isDefinedAt(c) => f(c).flatMap(gatherCommutative(_, f))
case other => reorderCommutativeOperators(other) :: Nil
case other => other.canonicalized :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the key change. This means that we will first reorder leaf-most adjacent commutative operators, and then do this recursively bottom-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to have a new trait CommutativeExpression and remove this object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, fixed in ff2fe32

@peter-toth peter-toth force-pushed the SPARK-40362-fix-binarycomparison-canonicalization branch from 5d8a184 to 6c68d8b Compare September 13, 2022 08:13
@cloud-fan cloud-fan closed this in 3f97cd6 Sep 13, 2022
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan
Copy link
Contributor

@peter-toth can you open a backport PR for 3.3? it has conflicts. Thanks!

peter-toth added a commit to peter-toth/spark that referenced this pull request Sep 13, 2022
Change canonicalization to a one pass process and move logic from `Canonicalize.reorderCommutativeOperators` to the respective commutative operators' `canonicalize`.

apache#34883 improved expression canonicalization performance but introduced regression when a commutative operator is under a `BinaryComparison`. This is because children reorder by their hashcode can't happen in `preCanonicalized` phase when children are not yet "final".

No.

Added new UT.

Closes apache#37851 from peter-toth/SPARK-40362-fix-binarycomparison-canonicalization.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@peter-toth
Copy link
Contributor Author

peter-toth commented Sep 13, 2022

thanks, merging to master!

Thanks for the review!

@peter-toth can you open a backport PR for 3.3? it has conflicts. Thanks!

Sure, opened here: #37866

cloud-fan pushed a commit that referenced this pull request Feb 10, 2023
…imization for canonicalizing large trees of commutative expressions

### What changes were proposed in this pull request?
- This PR introduces a new expression called `MultiCommutativeOp` which is used by the commutative expressions (e.g., `Add`, `Multiply`, `And`, `Or`, `BitwiseOr`, `BitwiseAnd`, `BitwiseXor`) during canonicalization.
- During canonicalization, when there is a list of consecutive commutative expressions, we now create a MultiCommutative expression with references to original operands, instead of creating new objects.
- This new expression is added as a memory optimization to reduce generating a large number of intermediate objects during canonicalization.

### Why are the changes needed?
- With the [recent changes](#37851) in the expression canonicalization, a complex query with a large number of commutative operations could end up consuming significantly more (sometimes > 10X) memory on the executors.
- In our case, this issue happens for a specific complex query that has a huge expression tree containing Add operators interleaved by non Add operators.
- The issue is related to canonicalization and why it is causing issues in the executors is because the codegen component relies on expression canonicalization to deduplicate expressions.
- When we have a large number of Adds interleaved by non-Add operators, [this line](https://github.com/apache/spark/pull/37851/files#diff-7278f2db37934522ee7c74b71525153234cff245cefaf996957e4a9ff3dbaacdR1171) ends up materializing a new canonicalized expression tree at every non-Add operator.
- In our case, analyzing the executor heap histogram shows that the additional memory is consumed by a large number of Add objects.
- The high memory usage causes the executors to lose heartbeat signals and results in task failures.
- The proposed `MultiCommutativeOp` expression avoids generating new Add expressions and keeps the extra memory usage to a minimum.

### Does this PR introduce _any_ user-facing change?
- No

### How was this patch tested?
- Existing unit tests and new unit tests.

Closes #39722 from db-scnakandala/SPARK-42162.

Authored-by: Supun Nakandala <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Feb 10, 2023
…imization for canonicalizing large trees of commutative expressions

### What changes were proposed in this pull request?
- This PR introduces a new expression called `MultiCommutativeOp` which is used by the commutative expressions (e.g., `Add`, `Multiply`, `And`, `Or`, `BitwiseOr`, `BitwiseAnd`, `BitwiseXor`) during canonicalization.
- During canonicalization, when there is a list of consecutive commutative expressions, we now create a MultiCommutative expression with references to original operands, instead of creating new objects.
- This new expression is added as a memory optimization to reduce generating a large number of intermediate objects during canonicalization.

### Why are the changes needed?
- With the [recent changes](#37851) in the expression canonicalization, a complex query with a large number of commutative operations could end up consuming significantly more (sometimes > 10X) memory on the executors.
- In our case, this issue happens for a specific complex query that has a huge expression tree containing Add operators interleaved by non Add operators.
- The issue is related to canonicalization and why it is causing issues in the executors is because the codegen component relies on expression canonicalization to deduplicate expressions.
- When we have a large number of Adds interleaved by non-Add operators, [this line](https://github.com/apache/spark/pull/37851/files#diff-7278f2db37934522ee7c74b71525153234cff245cefaf996957e4a9ff3dbaacdR1171) ends up materializing a new canonicalized expression tree at every non-Add operator.
- In our case, analyzing the executor heap histogram shows that the additional memory is consumed by a large number of Add objects.
- The high memory usage causes the executors to lose heartbeat signals and results in task failures.
- The proposed `MultiCommutativeOp` expression avoids generating new Add expressions and keeps the extra memory usage to a minimum.

### Does this PR introduce _any_ user-facing change?
- No

### How was this patch tested?
- Existing unit tests and new unit tests.

Closes #39722 from db-scnakandala/SPARK-42162.

Authored-by: Supun Nakandala <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 99431e2)
Signed-off-by: Wenchen Fan <[email protected]>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…imization for canonicalizing large trees of commutative expressions

### What changes were proposed in this pull request?
- This PR introduces a new expression called `MultiCommutativeOp` which is used by the commutative expressions (e.g., `Add`, `Multiply`, `And`, `Or`, `BitwiseOr`, `BitwiseAnd`, `BitwiseXor`) during canonicalization.
- During canonicalization, when there is a list of consecutive commutative expressions, we now create a MultiCommutative expression with references to original operands, instead of creating new objects.
- This new expression is added as a memory optimization to reduce generating a large number of intermediate objects during canonicalization.

### Why are the changes needed?
- With the [recent changes](apache#37851) in the expression canonicalization, a complex query with a large number of commutative operations could end up consuming significantly more (sometimes > 10X) memory on the executors.
- In our case, this issue happens for a specific complex query that has a huge expression tree containing Add operators interleaved by non Add operators.
- The issue is related to canonicalization and why it is causing issues in the executors is because the codegen component relies on expression canonicalization to deduplicate expressions.
- When we have a large number of Adds interleaved by non-Add operators, [this line](https://github.com/apache/spark/pull/37851/files#diff-7278f2db37934522ee7c74b71525153234cff245cefaf996957e4a9ff3dbaacdR1171) ends up materializing a new canonicalized expression tree at every non-Add operator.
- In our case, analyzing the executor heap histogram shows that the additional memory is consumed by a large number of Add objects.
- The high memory usage causes the executors to lose heartbeat signals and results in task failures.
- The proposed `MultiCommutativeOp` expression avoids generating new Add expressions and keeps the extra memory usage to a minimum.

### Does this PR introduce _any_ user-facing change?
- No

### How was this patch tested?
- Existing unit tests and new unit tests.

Closes apache#39722 from db-scnakandala/SPARK-42162.

Authored-by: Supun Nakandala <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 99431e2)
Signed-off-by: Wenchen Fan <[email protected]>
jlfsdtc pushed a commit to jlfsdtc/spark that referenced this pull request Jan 24, 2024
…imization for canonicalizing large trees of commutative expressions

### What changes were proposed in this pull request?
- This PR introduces a new expression called `MultiCommutativeOp` which is used by the commutative expressions (e.g., `Add`, `Multiply`, `And`, `Or`, `BitwiseOr`, `BitwiseAnd`, `BitwiseXor`) during canonicalization.
- During canonicalization, when there is a list of consecutive commutative expressions, we now create a MultiCommutative expression with references to original operands, instead of creating new objects.
- This new expression is added as a memory optimization to reduce generating a large number of intermediate objects during canonicalization.

### Why are the changes needed?
- With the [recent changes](apache#37851) in the expression canonicalization, a complex query with a large number of commutative operations could end up consuming significantly more (sometimes > 10X) memory on the executors.
- In our case, this issue happens for a specific complex query that has a huge expression tree containing Add operators interleaved by non Add operators.
- The issue is related to canonicalization and why it is causing issues in the executors is because the codegen component relies on expression canonicalization to deduplicate expressions.
- When we have a large number of Adds interleaved by non-Add operators, [this line](https://github.com/apache/spark/pull/37851/files#diff-7278f2db37934522ee7c74b71525153234cff245cefaf996957e4a9ff3dbaacdR1171) ends up materializing a new canonicalized expression tree at every non-Add operator.
- In our case, analyzing the executor heap histogram shows that the additional memory is consumed by a large number of Add objects.
- The high memory usage causes the executors to lose heartbeat signals and results in task failures.
- The proposed `MultiCommutativeOp` expression avoids generating new Add expressions and keeps the extra memory usage to a minimum.

### Does this PR introduce _any_ user-facing change?
- No

### How was this patch tested?
- Existing unit tests and new unit tests.

Closes apache#39722 from db-scnakandala/SPARK-42162.

Authored-by: Supun Nakandala <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 99431e2)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants