Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
remove curry
  • Loading branch information
Hisoka-X committed Jul 19, 2023
commit f324892f34b05549c0d98d1737b7faaa4d1cb9ee
Original file line number Diff line number Diff line change
Expand Up @@ -102,61 +102,98 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
case p: LogicalPlan if p.isStreaming => (plan, false)

case m: MultiInstanceRelation =>
deduplicateAndRenew(existingRelations, m)(_.output.map(_.exprId.id))(node =>
node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])
deduplicateAndRenew[LogicalPlan with MultiInstanceRelation](
existingRelations,
m,
_.output.map(_.exprId.id),
node => node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])

case p: Project =>
deduplicateAndRenew(existingRelations, p)(newProject => findAliases(newProject.projectList)
.map(_.exprId.id).toSeq)(newProject => newProject.copy(newAliases(newProject.projectList)))
deduplicateAndRenew[Project](
existingRelations,
p,
newProject => findAliases(newProject.projectList).map(_.exprId.id).toSeq,
newProject => newProject.copy(newAliases(newProject.projectList)))

case s: SerializeFromObject =>
deduplicateAndRenew(existingRelations, s)(_.serializer.map(_.exprId.id))(newSer =>
newSer.copy(newSer.serializer.map(_.newInstance())))
deduplicateAndRenew[SerializeFromObject](
existingRelations,
s,
_.serializer.map(_.exprId.id),
newSer => newSer.copy(newSer.serializer.map(_.newInstance())))

case f: FlatMapGroupsInPandas =>
deduplicateAndRenew(existingRelations, f)(_.output.map(_.exprId.id))(newFlatMap =>
newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
deduplicateAndRenew[FlatMapGroupsInPandas](
existingRelations,
f,
_.output.map(_.exprId.id),
newFlatMap => newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))

case f: FlatMapCoGroupsInPandas =>
deduplicateAndRenew(existingRelations, f)(_.output.map(_.exprId.id))(newFlatMap =>
newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
deduplicateAndRenew[FlatMapCoGroupsInPandas](
existingRelations,
f,
_.output.map(_.exprId.id),
newFlatMap => newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))

case m: MapInPandas =>
deduplicateAndRenew(existingRelations, m)(_.output.map(_.exprId.id))(newMap =>
newMap.copy(output = newMap.output.map(_.newInstance())))
deduplicateAndRenew[MapInPandas](
existingRelations,
m,
_.output.map(_.exprId.id),
newMap => newMap.copy(output = newMap.output.map(_.newInstance())))

case p: PythonMapInArrow =>
deduplicateAndRenew(existingRelations, p)(_.output.map(_.exprId.id))(newMap =>
newMap.copy(output = newMap.output.map(_.newInstance())))
deduplicateAndRenew[PythonMapInArrow](
existingRelations,
p,
_.output.map(_.exprId.id),
newMap => newMap.copy(output = newMap.output.map(_.newInstance())))

case a: AttachDistributedSequence =>
deduplicateAndRenew(existingRelations, a)(_.producedAttributes.map(_.exprId.id).toSeq)(
deduplicateAndRenew[AttachDistributedSequence](
existingRelations,
a,
_.producedAttributes.map(_.exprId.id).toSeq,
newAttach => newAttach.copy(sequenceAttr = newAttach.producedAttributes
.map(_.newInstance()).head))

case g: Generate =>
deduplicateAndRenew(existingRelations, g)(_.generatorOutput.map(_.exprId.id))(newGenerate =>
newGenerate.copy(generatorOutput = newGenerate.generatorOutput.map(_.newInstance())))
deduplicateAndRenew[Generate](
existingRelations,
g,
_.generatorOutput.map(_.exprId.id), newGenerate =>
newGenerate.copy(generatorOutput = newGenerate.generatorOutput.map(_.newInstance())))

case e: Expand =>
deduplicateAndRenew(existingRelations, e)(_.producedAttributes.map(_.exprId.id).toSeq)(
deduplicateAndRenew[Expand](
existingRelations,
e,
_.producedAttributes.map(_.exprId.id).toSeq,
newExpand => newExpand.copy(output = newExpand.output.map(_.newInstance())))

case w: Window =>
deduplicateAndRenew(existingRelations, w)(_.windowExpressions
.map(_.exprId.id))(newWindow => newWindow.copy(windowExpressions =
newWindow.windowExpressions.map(_.newInstance())))
deduplicateAndRenew[Window](
existingRelations,
w,
_.windowExpressions.map(_.exprId.id),
newWindow => newWindow.copy(windowExpressions =
newWindow.windowExpressions.map(_.newInstance())))

case s: ScriptTransformation =>
deduplicateAndRenew(existingRelations, s)(_.output.map(_.exprId.id))(
deduplicateAndRenew[ScriptTransformation](
existingRelations,
s,
_.output.map(_.exprId.id),
newScript => newScript.copy(output = newScript.output.map(_.newInstance())))

case plan: LogicalPlan =>
deduplicate(existingRelations, plan)
}

private def deduplicate(existingRelations: mutable.HashSet[RelationWrapper], plan: LogicalPlan):
(LogicalPlan, Boolean) = {
private def deduplicate(
existingRelations: mutable.HashSet[RelationWrapper],
plan: LogicalPlan): (LogicalPlan, Boolean) = {
var planChanged = false
val newPlan = if (plan.children.nonEmpty) {
val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
Expand Down Expand Up @@ -198,9 +235,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
}

private def deduplicateAndRenew[T <: LogicalPlan](
existingRelations: mutable.HashSet[RelationWrapper], plan: T)
(getExprIds: T => Seq[Long])
(copyNewPlan: T => T): (LogicalPlan, Boolean) = {
existingRelations: mutable.HashSet[RelationWrapper], plan: T,
getExprIds: T => Seq[Long],
copyNewPlan: T => T): (LogicalPlan, Boolean) = {
var (newPlan, planChanged) = deduplicate(existingRelations, plan)
if (newPlan.resolved) {
val exprIds = getExprIds(newPlan.asInstanceOf[T])
Expand Down