Skip to content
Prev Previous commit
Next Next commit
Remove right order from deduplication
  • Loading branch information
EnricoMi committed Jun 30, 2023
commit 023c1993fc55a22c5e4435af6cc43c56d02745de
Original file line number Diff line number Diff line change
Expand Up @@ -689,18 +689,16 @@ object CoGroup {
attrs.map(attr => duplicateAttributes.getOrElse(attr, attr))
}

val (dedupRightGroup, dedupRightAttr, dedupRightOrder, dedupRight) =
// rightOrder is resolved against right plan, so deduplication not needed
val (dedupRightGroup, dedupRightAttr, dedupRight) =
if (duplicateAttributes.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit weird to do dedup here. Can we update the DeduplicateRelations rule to handle CoGroup specially?

Copy link
Contributor Author

@EnricoMi EnricoMi Apr 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see how DeduplicateRelations can be modified to not rewrite all attributes of CoGroup.

In DeduplicateRelations.apply method renewDuplicatedRelations is called, which calls rewriteAttrs(attrMap) on the CoGroup, which rewrites all attributes.

If you are suggesting to add case cogroup @ CoGroup(...) => to

newPlan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND),
ruleId) {
case p: LogicalPlan if !p.childrenResolved => p
// To resolve duplicate expression IDs for Join.
case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
// Resolve duplicate output for LateralJoin.
case j @ LateralJoin(left, right, _, _) if right.resolved && !j.duplicateResolved =>
j.copy(right = right.withNewPlan(dedupRight(left, right.plan)))
// Resolve duplicate output for AsOfJoin.
case j @ AsOfJoin(left, right, _, _, _, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
// intersect/except will be rewritten to join at the beginning of optimizer. Here we need to
// deduplicate the right side plan, so that we won't produce an invalid self-join later.
case i @ Intersect(left, right, _) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))
case e @ Except(left, right, _) if !e.duplicateResolved =>
e.copy(right = dedupRight(left, right))
// Only after we finish by-name resolution for Union
case u: Union if !u.byName && !u.duplicateResolved =>
// Use projection-based de-duplication for Union to avoid breaking the checkpoint sharing
// feature in streaming.
val newChildren = u.children.foldRight(Seq.empty[LogicalPlan]) { (head, tail) =>
head +: tail.map {
case child if head.outputSet.intersect(child.outputSet).isEmpty =>
child
case child =>
val projectList = child.output.map { attr =>
Alias(attr, attr.name)()
}
Project(projectList, child)
}
}
u.copy(children = newChildren)
case merge: MergeIntoTable if !merge.duplicateResolved =>
merge.copy(sourceTable = dedupRight(merge.targetTable, merge.sourceTable))
}
}

then this won't work because all attributes of CoGroup have been rewritten at this point.

I'd appreciate some pointers or sketch of a solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X is working on it: #41554

(
dedup(rightGroup).map(_.toAttribute),
dedup(rightAttr).map(_.toAttribute),
rightOrder.map(_.transformDown {
case a: Attribute => duplicateAttributes.getOrElse(a, a)
}.asInstanceOf[SortOrder]),
Project(dedup(right.output), right)
)
} else {
(rightGroup, rightAttr, rightOrder, right)
(rightGroup, rightAttr, right)
}

val cogrouped = CoGroup(
Expand All @@ -715,7 +713,7 @@ object CoGroup {
leftAttr,
dedupRightAttr,
leftOrder,
dedupRightOrder,
rightOrder,
CatalystSerde.generateObjAttr[OUT],
left,
dedupRight)
Expand Down