Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
init
  • Loading branch information
LuciferYang committed Jun 13, 2024
commit e7e78be65f6153e66db99bd818bf7dab94811917
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,14 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
}
}

@scala.annotation.nowarn("cat=deprecation")
def recursiveTransform(arg: Any): AnyRef = arg match {
case e: Expression => transformExpression(e)
case Some(value) => Some(recursiveTransform(value))
case m: Map[_, _] => m
case d: DataType => d // Avoid unpacking Structs
case stream: LazyList[_] => stream.map(recursiveTransform).force
case stream: Stream[_] => stream.map(recursiveTransform).force
case lazyList: LazyList[_] => lazyList.map(recursiveTransform).force
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @JoshRosen Did I understand your suggestion correctly? Thanks ~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang, yes, this is exactly what I had in mind.

case seq: Iterable[_] => seq.map(recursiveTransform)
case other: AnyRef => other
case null => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.trees

import java.util.UUID

import scala.annotation.nowarn
import scala.collection.{mutable, Map}
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -356,6 +357,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
* Returns a copy of this node with the children replaced.
* TODO: Validate somewhere (in debug mode?) that children are ordered correctly.
*/
@nowarn("cat=deprecation")
protected final def legacyWithNewChildren(newChildren: Seq[BaseType]): BaseType = {
assert(newChildren.size == children.size, "Incorrect number of children")
var changed = false
Expand All @@ -381,9 +383,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
val newArgs = mapProductIterator {
case s: StructType => s // Don't convert struct types to some other type of Seq[StructField]
// Handle Seq[TreeNode] in TreeNode parameters.
case s: LazyList[_] =>
// LazyList is lazy so we need to force materialization
case s: Stream[_] =>
// Stream is lazy so we need to force materialization
s.map(mapChild).force
case l: LazyList[_] =>
// LazyList is lazy so we need to force materialization
l.map(mapChild).force
case s: Seq[_] =>
s.map(mapChild)
case m: Map[_, _] =>
Expand Down Expand Up @@ -781,6 +786,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
}
}

@nowarn("cat=deprecation")
override def clone(): BaseType = {
def mapChild(child: Any): Any = child match {
case arg: TreeNode[_] if containsChild(arg) =>
Expand Down Expand Up @@ -813,7 +819,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
case (_, other) => other
}
case d: DataType => d // Avoid unpacking Structs
case args: LazyList[_] => args.map(mapChild).force // Force materialization on stream
case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
case args: LazyList[_] => args.map(mapChild).force // Force materialization on LazyList
case args: Iterable[_] => args.map(mapChild)
case nonChild: AnyRef => nonChild
case null => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ trait CodegenSupport extends SparkPlan {
*
* Note that `outputVars` and `row` can't both be null.
*/
@scala.annotation.nowarn("cat=deprecation")
final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
val inputVarsCandidate =
if (outputVars != null) {
Expand All @@ -166,7 +167,8 @@ trait CodegenSupport extends SparkPlan {
}

val inputVars = inputVarsCandidate match {
case stream: LazyList[ExprCode] => stream.force
case stream: Stream[ExprCode] => stream.force
case lazyList: LazyList[ExprCode] => lazyList.force
case other => other
}

Expand Down