-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-43781][SQL] Fix IllegalStateException when cogrouping two datasets derived from the same source #41554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
d45eab2
785bcec
edfa2f3
f45b8c6
e111a40
45f0505
3a9bad5
f8b0a28
8bf5fb3
b7d4ece
f88d1ea
410b236
acb9455
7f54c5c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis | |
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, AttributeSet, NamedExpression, OuterReference, SubqueryExpression} | ||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, NamedExpression, OuterReference, SubqueryExpression} | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.catalyst.trees.TreePattern._ | ||
|
|
@@ -220,7 +220,46 @@ object DeduplicateRelations extends Rule[LogicalPlan] { | |
| if (attrMap.isEmpty) { | ||
| planWithNewChildren | ||
| } else { | ||
| planWithNewChildren.rewriteAttrs(attrMap) | ||
| def rewriteAttrsMatchWithSubPlan[T <: Expression]( | ||
| attrs: Seq[T], | ||
Hisoka-X marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| attrMap: AttributeMap[Attribute], | ||
| planOutput: Seq[Attribute]): Seq[T] = { | ||
Hisoka-X marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val canRewriteAttrs = attrMap.filter(a => planOutput.contains(a._2)) | ||
| attrs.map(attr => { | ||
Hisoka-X marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| attr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) { | ||
| case a: AttributeReference => | ||
| canRewriteAttrs.getOrElse(a, a) | ||
| }.asInstanceOf[T] | ||
| }) | ||
| } | ||
|
|
||
| planWithNewChildren match { | ||
| case c: CoGroup => | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // SPARK-43781: CoGroup is a special case, `rewriteAttrs` will incorrectly update | ||
| // some fields that do not need to be updated. We need to update the output | ||
| // attributes of CoGroup manually. | ||
| val newLeftAttr = c.leftAttr.map(attr => attrMap.getOrElse(attr, attr)) | ||
|
||
| val newRightAttr = rewriteAttrsMatchWithSubPlan(c.rightAttr, attrMap, | ||
| c.right.output) | ||
| val newLeftGroup = rewriteAttrsMatchWithSubPlan(c.leftGroup, attrMap, c.left.output) | ||
| val newRightGroup = rewriteAttrsMatchWithSubPlan(c.rightGroup, attrMap, | ||
| c.right.output) | ||
| val newLeftOrder = rewriteAttrsMatchWithSubPlan(c.leftOrder, attrMap, | ||
| c.left.output) | ||
| val newRightOrder = rewriteAttrsMatchWithSubPlan(c.rightOrder, attrMap, | ||
| c.right.output) | ||
| val newKeyDes = c.keyDeserializer.asInstanceOf[UnresolvedDeserializer] | ||
| .copy(inputAttributes = newLeftGroup) | ||
| val newLeftDes = c.leftDeserializer.asInstanceOf[UnresolvedDeserializer] | ||
| .copy(inputAttributes = newLeftAttr) | ||
| val newRightDes = c.rightDeserializer.asInstanceOf[UnresolvedDeserializer] | ||
| .copy(inputAttributes = newRightAttr) | ||
| c.copy(keyDeserializer = newKeyDes, leftDeserializer = newLeftDes, | ||
| rightDeserializer = newRightDes, leftGroup = newLeftGroup, | ||
| rightGroup = newRightGroup, leftAttr = newLeftAttr, rightAttr = newRightAttr, | ||
| leftOrder = newLeftOrder, rightOrder = newRightOrder) | ||
| case _ => planWithNewChildren.rewriteAttrs(attrMap) | ||
| } | ||
| } | ||
| } else { | ||
| planWithNewSubquery.withNewChildren(newChildren.toSeq) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.