-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21417][SQL] Infer join conditions using propagated constraints #18692
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #79804 has started for PR 18692 at commit |
|
I think we already did it via constraint propagation, didn't we? |
|
@cloud-fan which rule do you mean? |
|
We need to use the propagated constraints to infer the join conditions. |
|
@gatorsmile thanks for the input. Let me check that I understood everything correctly. So, I keep it as a separate rule that is applied only if constraint propagation enabled. Inside the rule, I rely on I guess that |
|
You are on the right track. You might find some bugs/issues when you implement it. Sorry, too busy recently. |
|
Test build #80056 has finished for PR 18692 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InnerLike?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought about this but decided to start with a smaller scope. The motivation was that "SELECT * FROM t1, t2" is resolved into an Inner Join and one has to explicitly use the Cross Join syntax to allow cartesian products. I was not sure if it was OK to replace an explicit Cross Join with a join of a different type. Semantically, we can have InnerLike here.
|
You also need to resolve another case: Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
Seq(1, 2).toDF("col").write.saveAsTable("t2")
sql("SELECT * FROM t1, t2 WHERE t1.col1 = t1.col2 AND t1.col1 = 1 AND t2.col = 1")This new rule can infer the unneeded join conditions, |
|
Test build #80058 has finished for PR 18692 at commit
|
|
BTW, your PR title and descriptions are out of dated. |
|
@gatorsmile I took a look at the case above. Indeed, the proposed rule triggers this issue but only indirectly. In the example above, the optimizer will never reach a fixed point. Please, find my investigation below. After that, Correct me if I am wrong, but it seems like an issue with the existing rules. |
|
Test build #80362 has finished for PR 18692 at commit
|
|
@gatorsmile I updated the rule to cover cross join cases. Regarding the case with the redundant condition mentioned by you, I opened SPARK-21652. It is an existing issue and is not caused by the proposed rule. BTW, I can try to fix it once we agree on a solution. |
|
@aokolnychyi Thanks for finding the non-convergent case! Let me see how to fix it. |
|
@gatorsmile what is our decision here? Shall we wait until SPARK-21652 is resolved? In the meantime, I can add some tests and see how the proposed rule works together with all others. |
|
Sorry for the delay. @jiangxb1987 will submit a simple fix for the issue you mentioned. That will not be a perfect fix but it partially resolve the issue. In the future, we need to move the filter removal to a separate batch for cost-based optimization instead of doing it with filter inference in the same RBO batch. |
|
Can we restrict this to cartesian product ONLY ? One clear downside of doing this for other joins is that it will potentially add shuffle in case of (bucketing queries) and (subqueries in general). After adding the inferred join conditions, it might lead to the child node's partitioning NOT satisfying the JOIN node's requirements which otherwise could have. |
|
In this PR, we should limit it to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also mention the reason why we are restricting this to cross joins only ?
For other join types, adding inferred join conditions would potentially shuffle children as child node's partitioning won't satisfying the JOIN node's requirements which otherwise could have.
|
Test build #81390 has finished for PR 18692 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the discussion we did above, we only enable this rule for cartesian products. That means, the above codes should be like
case join @ Join(left, right, Cross, None) =>
val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
val newConditionOpt = inferredJoinPredicates.reduceOption(And)
if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else joinThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And what about CROSS joins with join conditions? Not sure if they will benefit from the proposed rule, but it is better to ask.
Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t2")
val df = spark.sql("SELECT * FROM t1 CROSS JOIN t2 ON t1.col1 >= t2.col1 WHERE t1.col1 = 1 AND t2.col1 = 1")
df.explain(true)
== Optimized Logical Plan ==
Join Cross, (col1#40 >= col1#42)
:- Filter (isnotnull(col1#40) && (col1#40 = 1))
: +- Relation[col1#40,col2#41] parquet
+- Filter (isnotnull(col1#42) && (col1#42 = 1))
+- Relation[col1#42,col2#43] parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the rule CheckCartesianProducts . The example above is not a CROSS join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Thanks for getting back.
CheckCartesianProducts identifies a join of type Inner | LeftOuter | RightOuter | FullOuter as a cartesian product if there is no join predicate that has references to both relations.
If we agree to ignore joins of type Cross that have a condition (in this PR), then the use case in this discussion is no longer possible (even if you remove t1.col1 >= t2.col1). Correct? PushPredicateThroughJoin will push t1.col1 = t1.col2 + t2.col2 and t2.col1 = t1.col2 + t2.col2 into the join condition and the proposed rule will not infer anything and the
final join will be of type Cross with a condition that covers both relations. According to the logic of CheckCartesianProducts, it is not considered to be a cartesian product (since there exists a join predicate that covers both relations, e.g. t1.col1 = t1.col2 + t2.col2).
So, if I have a confirmation that we need to consider only joins of type Cross and without any join conditions, I can update the PR accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. In this PR, we just need to consider cross join without any join condition.
In the future, we can extend it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a new rule, we should improve the rule InferFiltersFromConstraints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought about this but InferFiltersFromConstraints does not change considered join types. Therefore, I kept them separated. In addition, I thought about renaming it to EliminateCrossJoin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Since we decide to focus on cross join only, we should rename it to EliminateCrossJoin , like what you proposed.
Isn't it an existing problem? the current constraint propagation framework infers as many predicates as possible, so we may already hit this problem. I think we should revisit the constraint propagation framework to think about how to avoid adding more shuffles, instead of stopping improving this framework to infer more predicates. |
|
@cloud-fan : In event when the (set of join keys) is a superset of (child node's partitioning keys), its possible to avoid shuffle : #19054 ... this can help with 2 cases - when users unknowingly join over extra columns in addition to bucket columns
|
|
cc @gengliangwang Review this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to separate the constraints as left only and right only.
The following case can infer t1.col1 = t2.col1:
Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t2")
val df = spark.sql("SELECT * FROM t1 CROSS JOIN t2 ON t1.col1 >= t2.col1 " +
"WHERE t1.col1 = t1.col2 + t2.col2 and t2.col1 = t1.col2 + t2.col2")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang Yeah, makes sense. So, PushPredicateThroughJoin would push the where clause into the join and the proposed rule will infer t1.col1 = t2.col1 and change the join type to INNER. As a result, the final join condition will be t1.col1 = t2.col1 and t1.col1 >= t2.col1 and (t1.col1 = t1.col2 + t2.col2 and t2.col1 = t1.col2 + t2.col2). Am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right.
ab7d966 to
3e090f9
Compare
|
Test build #84176 has finished for PR 18692 at commit
|
|
Test build #84177 has finished for PR 18692 at commit
|
| * node's requirements which otherwise could have. | ||
| * | ||
| * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right | ||
| * relation has 'b = 1', the rule infers 'a = b' as a join predicate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, given a CROSS join with the constraint 'a = 1' from the left child and the constraint 'b = 1' from the right child, this rule infers a new join predicate 'a = b' and convert it to an Inner join.
| } | ||
|
|
||
| private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform { | ||
| case join@Join(leftPlan, rightPlan, Cross, None) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: join@Join -> join @ Join
| } | ||
|
|
||
| // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions | ||
| implicit class SemanticExpression(private val expr: Expression) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse EquivalentExpressions? You can search the code base and see how the others use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we just need the case class inside EquivalentExpressions since we have to map all semantically equivalent expressions into a set of attributes (as opposed to mapping an expression into a set of equivalent expressions).
I see two ways to go:
- Expose the case class inside
EquivalentExpressionswith minimum changes in the code base (e.g., using a companion object):
object EquivalentExpressions {
/**
* Wrapper around an Expression that provides semantic equality.
*/
implicit class SemanticExpr(private val e: Expression) {
override def equals(o: Any): Boolean = o match {
case other: SemanticExpr => e.semanticEquals(other.e)
case _ => false
}
override def hashCode: Int = e.semanticHash()
}
}
- Keep
EquivalentExpressionsas it is and maintain a separate map from expressions to attributes in the proposed rule.
Personally, I lean toward the first idea since it might be useful to have SemanticExpr alone. However, there can be other drawbacks that did not come into my mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about building a new class to process all the cases similar to this one?
An Attribute is also an Expression. Basically, the internal will be still a hash map mutable.HashMap.empty[SemanticEqualExpr, mutable.MutableList[Expression]]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a Set instead of a List might be beneficial in the proposed rule. What about the following?
class EquivalentExpressionMap {
private val equivalenceMap = mutable.HashMap.empty[SemanticallyEqualExpr, mutable.Set[Expression]]
def put(expression: Expression, equivalentExpression: Expression): Unit = {
val equivalentExpressions = equivalenceMap.getOrElse(expression, mutable.Set.empty)
if (!equivalentExpressions.contains(equivalentExpression)) {
equivalenceMap(expression) = equivalentExpressions += equivalentExpression
}
}
// produce an immutable copy to avoid any modifications from outside
def get(expression: Expression): Set[Expression] =
equivalenceMap.get(expression).fold(Set.empty[Expression])(_.toSet)
}
object EquivalentExpressionMap {
private implicit class SemanticallyEqualExpr(private val expr: Expression) {
override def equals(o: Any): Boolean = o match {
case other: SemanticallyEqualExpr => expr.semanticEquals(other.expr)
case _ => false
}
override def hashCode: Int = expr.semanticHash()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not check it carefully, but how about ExpressionSet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid ExpressionSet will not help here since we need to map a semantically equivalent expression into a set of attributes that correspond to it. It is not enough to check if there is an equivalent expression. Therefore, EquivalentExpressions and ExpressionSet are not applicable (as far as I see).
EquivalentExpressionMap from the previous comment assumes the following workflow:
val equivalentExressionMap = new EquivalentExpressionMap
...
equivalentExressionMap.put(1 * 2, t1.a)
equivalentExressionMap.put(3, t1.b)
...
equivalentExressionMap.get(1 * 2) // Set(t1.a)
equivalentExressionMap.get(2 * 1) // Set(t1.a)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean using ExpressionSet in EquivalentExpressionMap
private val equivalenceMap = mutable.HashMap.empty[SemanticallyEqualExpr, ExpressionSet]
def get(expression: Expression): Set[Expression]
def put(expression: Expression, equivalentExpression: Expression): Unit|
Also add a test case for non-deterministic cases. For example, given the left child has |
| set | ||
| } | ||
|
|
||
| val empty: ExpressionSet = ExpressionSet(Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that writing ExpressionSet.empty would be more readable than ExpressionSet(Nil). Usually, mutable collections have def empty() and immutable ones have separate objects that represent empty collections (e.g., Nil, Stream.Empty). I defined val empty since ExpressionSet is immutable.
|
Test build #84349 has finished for PR 18692 at commit
|
|
Test build #84351 has finished for PR 18692 at commit
|
|
LGTM Thanks for your patience! It looks much good now. Really appreciate for your contributions! Welcome to make more contributions! Thanks! Merged to master. |
| /** | ||
| * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints. | ||
| * | ||
| * The optimization is applicable only to CROSS joins. For other join types, adding inferred join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we apply this optimization to all joins after #19054?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds promising.
|
@aokolnychyi After rethinking about it, we might need to revert this PR. Although it converts a CROSS Join to an Inner join, it does not improve the performance. What do you think? |
|
Yea I have the same feeling. If the left side has a |
|
Sure, if you guys think it does not give any performance benefits, then let's revert it. I also had similar concerns but my understanding was that having an inner join with some equality condition can be beneficial during the generation of a physical plan. In other words, Spark should be able to select a more efficient join implementation. I am not sure how it is right now but previously you could have only |
|
I took a look at |
|
Even if we use |
|
Yeah, correct. So, we should revert then. |
|
Will do it. Thanks! |
|
Done. Reverted. |
|
Hi, All. |
|
@aokolnychyi Could you rethink about it by using some cases like |
|
I am not sure we can infer table 'a' table 'b' Do I miss anything? |
|
you are right, then I don't know if there is any valid use case for inferring join condition from literals... |
|
Yeah. That is a wrong case. Let us revisit it if we can find any useful case here. Thank you! |
|
Hi, its unfortunate to see this PR having gotten reverted
The point of this issue is not performance improvement, but that some (in our case automatically generated) queries do not work at all with SPARK, whereas there is no problem with these queries in PostgreSQL and MySQL. @aokolnychyi |
I'm surprised to hear this, did you turn on CROSS JOIN via |
What changes were proposed in this pull request?
This PR adds an optimization rule that infers join conditions using propagated constraints.
For instance, if there is a join, where the left relation has 'a = 1' and the right relation has 'b = 1', then the rule infers 'a = b' as a join predicate. Only semantically new predicates are appended to the existing join condition.
Refer to the corresponding ticket and tests for more details.
How was this patch tested?
This patch comes with a new test suite to cover the implemented logic.