Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
extract method
  • Loading branch information
Hisoka-X committed Jul 18, 2023
commit d265a28e045d8b2b2a41b1098e840d819151a306
Original file line number Diff line number Diff line change
Expand Up @@ -102,211 +102,65 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
case p: LogicalPlan if p.isStreaming => (plan, false)

case m: MultiInstanceRelation =>
val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
if (existingRelations.contains(planWrapper)) {
val newNode = m.newInstance()
newNode.copyTagsFrom(m)
(newNode, true)
} else {
existingRelations.add(planWrapper)
(m, false)
}
deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(node => node.newInstance()
.asInstanceOf[LogicalPlan with MultiInstanceRelation])

case p: Project =>
val (newPlan, planChanged) = deduplicate(existingRelations, p)
var newProject = newPlan.asInstanceOf[Project]
if (newProject.resolved) {
val aliasAttrs = findAliases(newProject.projectList)
if (aliasAttrs.nonEmpty) {
val planWrapper = RelationWrapper(p.getClass, aliasAttrs.map(_.exprId.id).toSeq)
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newProject = newProject.copy(newAliases(newProject.projectList))
newProject.copyTagsFrom(p)
(newProject, true)
} else {
existingRelations.add(planWrapper)
(newProject, planChanged)
}
} else {
(newProject, planChanged)
}
} else {
(newProject, planChanged)
}
deduplicate(existingRelations, p)(Some(newProject => findAliases(newProject.projectList)
.map(_.exprId.id).toSeq))(newProject => newProject.copy(newAliases(newProject.projectList)))

case s @ SerializeFromObject(_, child) if s.resolved =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
plan, changed)
var newSer = newPlan.asInstanceOf[SerializeFromObject]
val planWrapper = RelationWrapper(newSer.getClass, newSer.serializer.map(_.exprId.id))
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newSer = newSer.copy(newSer.serializer.map(_.newInstance()))
newSer.copyTagsFrom(s)
(newSer, true)
} else {
existingRelations.add(planWrapper)
(newSer, planChanged)
}
case s: SerializeFromObject =>
deduplicate(existingRelations, s)(Some(_.serializer.map(_.exprId.id)))(newSer =>
newSer.copy(newSer.serializer.map(_.newInstance())))

case f @ FlatMapGroupsInPandas(_, _, _, child) if f.resolved =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
plan, changed)
var newFlatMap = newPlan.asInstanceOf[FlatMapGroupsInPandas]
val planWrapper = RelationWrapper(newFlatMap.getClass,
newFlatMap.output.map(_.exprId.id))
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newFlatMap = newFlatMap.copy(output = newFlatMap.output.map(_.newInstance()))
newFlatMap.copyTagsFrom(plan)
(newFlatMap, true)
} else {
existingRelations.add(planWrapper)
(newFlatMap, planChanged)
}
case f: FlatMapGroupsInPandas =>
deduplicate(existingRelations, f)(Some(_.output.map(_.exprId.id)))(newFlatMap =>
newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))

case f @ FlatMapCoGroupsInPandas(_, _, _, _, left, right) if f.resolved =>
val (leftRenew, leftChanged) = renewDuplicatedRelations(existingRelations, left)
val (rightRenew, rightChanged) = renewDuplicatedRelations(existingRelations, right)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(leftRenew,
rightRenew), plan, leftChanged || rightChanged)

var newFlatMap = newPlan.asInstanceOf[FlatMapCoGroupsInPandas]
val planWrapper = RelationWrapper(newFlatMap.getClass,
newFlatMap.output.map(_.exprId.id))
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newFlatMap = newFlatMap.copy(output = newFlatMap.output.map(_.newInstance()))
newFlatMap.copyTagsFrom(plan)
(newFlatMap, true)
} else {
existingRelations.add(planWrapper)
(newFlatMap, planChanged)
}
case f: FlatMapCoGroupsInPandas =>
deduplicate(existingRelations, f)(Some(_.output.map(_.exprId.id)))(newFlatMap =>
newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))

case m @ MapInPandas(_, _, child, _) if m.resolved =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
plan, changed)
var newMap = newPlan.asInstanceOf[MapInPandas]
val planWrapper = RelationWrapper(newMap.getClass,
newMap.output.map(_.exprId.id))
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newMap = newMap.copy(output = newMap.output.map(_.newInstance()))
newMap.copyTagsFrom(plan)
(newMap, true)
} else {
existingRelations.add(planWrapper)
(newMap, planChanged)
}
case m: MapInPandas =>
deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(newMap =>
newMap.copy(output = newMap.output.map(_.newInstance())))

case p @ PythonMapInArrow(_, _, child, _) if p.resolved =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
plan, changed)
var newMap = newPlan.asInstanceOf[PythonMapInArrow]
val planWrapper = RelationWrapper(newMap.getClass,
newMap.output.map(_.exprId.id))
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newMap = newMap.copy(output = newMap.output.map(_.newInstance()))
newMap.copyTagsFrom(plan)
(newMap, true)
} else {
existingRelations.add(planWrapper)
(newMap, planChanged)
}
case p: PythonMapInArrow =>
deduplicate(existingRelations, p)(Some(_.output.map(_.exprId.id)))(newMap =>
newMap.copy(output = newMap.output.map(_.newInstance())))

case a @ AttachDistributedSequence(_, child) if a.resolved =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
plan, changed)
var newAttach = newPlan.asInstanceOf[AttachDistributedSequence]
val planWrapper = RelationWrapper(newAttach.getClass,
newAttach.producedAttributes.map(_.exprId.id).toSeq)
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newAttach = newAttach.copy(sequenceAttr = newAttach.producedAttributes.map(_.newInstance())
.head)
newAttach.copyTagsFrom(plan)
(newAttach, true)
} else {
existingRelations.add(planWrapper)
(newAttach, planChanged)
}
case a: AttachDistributedSequence =>
deduplicate(existingRelations, a)(Some(_.producedAttributes.map(_.exprId.id).toSeq))(
newAttach => newAttach.copy(sequenceAttr = newAttach.producedAttributes
.map(_.newInstance()).head))

case g @ Generate(_, _, _, _, _, child) if g.resolved =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
plan, changed)
var newGenerate = newPlan.asInstanceOf[Generate]
val planWrapper = RelationWrapper(newGenerate.getClass,
newGenerate.generatorOutput.map(_.exprId.id))
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newGenerate = newGenerate.copy(generatorOutput = newGenerate.generatorOutput.map(
_.newInstance()))
newGenerate.copyTagsFrom(plan)
(newGenerate, true)
} else {
existingRelations.add(planWrapper)
(newGenerate, planChanged)
}
case g: Generate =>
deduplicate(existingRelations, g)(Some(_.generatorOutput.map(_.exprId.id)))(newGenerate =>
newGenerate.copy(generatorOutput = newGenerate.generatorOutput.map(_.newInstance())))

case e @ Expand(_, _, child) if e.resolved =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
plan, changed)
var newExpand = newPlan.asInstanceOf[Expand]
val planWrapper = RelationWrapper(newExpand.getClass,
newExpand.producedAttributes.map(_.exprId.id).toSeq)
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newExpand = newExpand.copy(output = newExpand.output.map(_.newInstance()))
newExpand.copyTagsFrom(plan)
(newExpand, true)
} else {
existingRelations.add(planWrapper)
(newExpand, planChanged)
}
case e: Expand =>
deduplicate(existingRelations, e)(Some(_.producedAttributes.map(_.exprId.id).toSeq))(
newExpand => newExpand.copy(output = newExpand.output.map(_.newInstance())))

case w @ Window(_, _, _, child) if w.resolved =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
plan, changed)
var newWindow = newPlan.asInstanceOf[Window]
val planWrapper = RelationWrapper(newWindow.getClass,
newWindow.windowExpressions.map(_.exprId.id))
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newWindow = newWindow.copy(windowExpressions =
newWindow.windowExpressions.map(_.newInstance()))
newWindow.copyTagsFrom(plan)
(newWindow, true)
} else {
existingRelations.add(planWrapper)
(newWindow, planChanged)
}
case w: Window =>
deduplicate(existingRelations, w)(Some(_.windowExpressions
.map(_.exprId.id)))(newWindow => newWindow.copy(windowExpressions =
newWindow.windowExpressions.map(_.newInstance())))

case s @ ScriptTransformation(_, _, child, _) if s.resolved =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
plan, changed)
var newScript = newPlan.asInstanceOf[ScriptTransformation]
val planWrapper = RelationWrapper(newScript.getClass,
newScript.output.map(_.exprId.id))
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newScript = newScript.copy(output = newScript.output.map(_.newInstance()))
newScript.copyTagsFrom(plan)
(newScript, true)
} else {
existingRelations.add(planWrapper)
(newScript, planChanged)
}
case s: ScriptTransformation =>
deduplicate(existingRelations, s)(Some(_.output.map(_.exprId.id)))(
newScript => newScript.copy(output = newScript.output.map(_.newInstance())))

case plan: LogicalPlan =>
deduplicate(existingRelations, plan)
deduplicate(existingRelations, plan)()(p => p)
}

private def deduplicate(
existingRelations: mutable.HashSet[RelationWrapper],
plan: LogicalPlan): (LogicalPlan, Boolean) = {
private def deduplicate[T <: LogicalPlan](
existingRelations: mutable.HashSet[RelationWrapper], plan: T)
(getExprIds: Option[T => Seq[Long]] = Option.empty)
(copyNewPlan: T => T): (LogicalPlan, Boolean) = {
var planChanged = false
val newPlan = if (plan.children.nonEmpty) {
var newPlan = if (plan.children.nonEmpty) {
val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
for (c <- plan.children) {
val (renewed, changed) = renewDuplicatedRelations(existingRelations, c)
Expand All @@ -326,12 +180,8 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
if (planChanged) {
if (planWithNewSubquery.childrenResolved) {
val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq)
val attrMap = AttributeMap(
plan
.children
.flatMap(_.output).zip(newChildren.flatMap(_.output))
.filter { case (a1, a2) => a1.exprId != a2.exprId }
)
val attrMap = AttributeMap(plan.children.flatMap(_.output)
.zip(newChildren.flatMap(_.output)).filter { case (a1, a2) => a1.exprId != a2.exprId })
if (attrMap.isEmpty) {
planWithNewChildren
} else {
Expand All @@ -346,42 +196,24 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
} else {
plan
}
(newPlan, planChanged)
}

private def getNewPlanWithNewChildren(
existingRelations: mutable.HashSet[RelationWrapper],
newChildren: Array[LogicalPlan],
plan: LogicalPlan, changed: Boolean): (LogicalPlan, Boolean) = {
var planChanged = changed
val planWithNewSubquery = plan.transformExpressions {
case subquery: SubqueryExpression =>
val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
if (changed) planChanged = true
subquery.withNewPlan(renewed)
}

val newPlan = if (planChanged) {
if (planWithNewSubquery.childrenResolved) {
val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq)
val attrMap = AttributeMap(
plan
.children
.flatMap(_.output).zip(newChildren.flatMap(_.output))
.filter { case (a1, a2) => a1.exprId != a2.exprId }
)
if (attrMap.isEmpty) {
planWithNewChildren
if (newPlan.resolved && getExprIds.isDefined) {
val exprIds = getExprIds.get(newPlan.asInstanceOf[T])
if (exprIds.nonEmpty) {
val planWrapper = RelationWrapper(newPlan.getClass, exprIds)
if (existDuplicatedExprId(existingRelations, planWrapper)) {
newPlan = copyNewPlan(newPlan.asInstanceOf[T])
newPlan.copyTagsFrom(plan)
(newPlan, true)
} else {
planWithNewChildren.rewriteAttrs(attrMap)
existingRelations.add(planWrapper)
(newPlan, planChanged)
}
} else {
planWithNewSubquery.withNewChildren(newChildren.toSeq)
(newPlan, planChanged)
}
} else {
plan
(newPlan, planChanged)
}
(newPlan, planChanged)
}

/**
Expand Down