Skip to content

Conversation

@EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Jan 20, 2023

What changes were proposed in this pull request?

This deduplicate attributes that exist on both sides of a CoGroup by aliasing the occurrence on the right side.

Why are the changes needed?

Usually, DeduplicateRelations rule does exactly this. But the generic QueryPlan.rewriteAttrs replaces all occurrences of the duplicate reference with the new reference, but CoGroup uses the old reference for left and right group attributes, value attributes, and group order. Only the occurrences in the right attributes must be replaced.

Further, the right deserialization expression is not touched at all.

The following DataFrame cannot be evaluated:

val df = spark.range(3)

val left_grouped_df = df.groupBy("id").as[Long, Long]
val right_grouped_df = df.groupBy("id").as[Long, Long]

val cogroup_df = left_grouped_df.cogroup(right_grouped_df) {
  case (key, left, right) => left
}

The query plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SerializeFromObject [input[0, bigint, false] AS value#12L]
   +- CoGroup, id#0: bigint, id#0: bigint, id#0: bigint, [id#13L], [id#13L], [id#13L], [id#13L], obj#11: bigint
      :- !Sort [id#13L ASC NULLS FIRST], false, 0
      :  +- !Exchange hashpartitioning(id#13L, 200), ENSURE_REQUIREMENTS, [plan_id=16]
      :     +- Range (0, 3, step=1, splits=16)
      +- Sort [id#13L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#13L, 200), ENSURE_REQUIREMENTS, [plan_id=17]
            +- Range (0, 3, step=1, splits=16)

Evaluating this plan fails with:

Caused by: java.lang.IllegalStateException: Couldn't find id#19L in [id#0L]

Does this PR introduce any user-facing change?

This fixes correctness.

How was this patch tested?

Unit test in DataFrameSuite.

@github-actions github-actions bot added the SQL label Jan 20, 2023
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Jan 20, 2023

Ideally, QueryPlan.rewriteAttrs would not replace occurrences id#0L# with id#13L in all fields of CoGroup, but only in rightDeserializer, rightGroup, rightAttr, rightOrder:

case class CoGroup(
    func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any],
    keyDeserializer: Expression,
    leftDeserializer: Expression,
    rightDeserializer: Expression,
    leftGroup: Seq[Attribute],
    rightGroup: Seq[Attribute],
    leftAttr: Seq[Attribute],
    rightAttr: Seq[Attribute],
    leftOrder: Seq[SortOrder],
    rightOrder: Seq[SortOrder],
    outputObjAttr: Attribute,
    left: LogicalPlan,
    right: LogicalPlan) extends BinaryNode with ObjectProducer { ... }

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@EnricoMi EnricoMi marked this pull request as draft January 25, 2023 10:37
@EnricoMi EnricoMi marked this pull request as ready for review January 26, 2023 16:16
@EnricoMi EnricoMi force-pushed the branch-cogroup-self-deduplication-bug branch 3 times, most recently from 4e5901e to 16721f1 Compare January 30, 2023 16:07
@shardulm94
Copy link
Contributor

cc: @cloud-fan @viirya @gengliangwang For help with reviews.

@mridulm
Copy link
Contributor

mridulm commented Apr 26, 2023

+CC @HyukjinKwon, @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit weird to do dedup here. Can we update the DeduplicateRelations rule to handle CoGroup specially?

Copy link
Contributor Author

@EnricoMi EnricoMi Apr 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see how DeduplicateRelations can be modified to not rewrite all attributes of CoGroup.

In DeduplicateRelations.apply method renewDuplicatedRelations is called, which calls rewriteAttrs(attrMap) on the CoGroup, which rewrites all attributes.

If you are suggesting to add case cogroup @ CoGroup(...) => to

newPlan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND),
ruleId) {
case p: LogicalPlan if !p.childrenResolved => p
// To resolve duplicate expression IDs for Join.
case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
// Resolve duplicate output for LateralJoin.
case j @ LateralJoin(left, right, _, _) if right.resolved && !j.duplicateResolved =>
j.copy(right = right.withNewPlan(dedupRight(left, right.plan)))
// Resolve duplicate output for AsOfJoin.
case j @ AsOfJoin(left, right, _, _, _, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
// intersect/except will be rewritten to join at the beginning of optimizer. Here we need to
// deduplicate the right side plan, so that we won't produce an invalid self-join later.
case i @ Intersect(left, right, _) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))
case e @ Except(left, right, _) if !e.duplicateResolved =>
e.copy(right = dedupRight(left, right))
// Only after we finish by-name resolution for Union
case u: Union if !u.byName && !u.duplicateResolved =>
// Use projection-based de-duplication for Union to avoid breaking the checkpoint sharing
// feature in streaming.
val newChildren = u.children.foldRight(Seq.empty[LogicalPlan]) { (head, tail) =>
head +: tail.map {
case child if head.outputSet.intersect(child.outputSet).isEmpty =>
child
case child =>
val projectList = child.output.map { attr =>
Alias(attr, attr.name)()
}
Project(projectList, child)
}
}
u.copy(children = newChildren)
case merge: MergeIntoTable if !merge.duplicateResolved =>
merge.copy(sourceTable = dedupRight(merge.targetTable, merge.sourceTable))
}
}

then this won't work because all attributes of CoGroup have been rewritten at this point.

I'd appreciate some pointers or sketch of a solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X is working on it: #41554

@EnricoMi EnricoMi force-pushed the branch-cogroup-self-deduplication-bug branch from 16721f1 to cc5773c Compare June 30, 2023 08:04
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Oct 9, 2023

Fixed in #41554.

@EnricoMi EnricoMi closed this Oct 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants