Skip to content

Commit d600cf7

Browse files
committed
use expressionencoder
1 parent 84e7a7f commit d600cf7

File tree

7 files changed

+348
-421
lines changed

7 files changed

+348
-421
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20-
import java.lang.reflect.Modifier
21-
2220
import org.apache.commons.lang3.reflect.ConstructorUtils
2321

2422
import org.apache.spark.internal.Logging
@@ -580,18 +578,14 @@ object ScalaReflection extends ScalaReflection {
580578
/**
581579
* Returns the parameter names and types for the primary constructor of this class.
582580
*
583-
* Note that it only works for scala classes with primary constructor.
581+
* Note that it only works for scala classes with primary constructor, and currently doesn't
582+
* support inner class.
584583
*/
585-
def getConstructorParameters(cls: Class[_]): Seq[Class[_]] = {
584+
def getConstructorParameters(cls: Class[_]): Seq[(String, Type)] = {
586585
val m = runtimeMirror(cls.getClassLoader)
587586
val classSymbol = m.staticClass(cls.getName)
588587
val t = classSymbol.selfType
589-
val dropHead = if (cls.isMemberClass && !Modifier.isStatic(cls.getModifiers)) {
590-
1
591-
} else {
592-
0
593-
}
594-
getConstructorParameters(t).drop(dropHead).map { case (_, tpe) => getClassFromType(tpe) }
588+
getConstructorParameters(t)
595589
}
596590

597591
/**
@@ -617,15 +611,6 @@ object ScalaReflection extends ScalaReflection {
617611
}
618612
}
619613

620-
def getClassForCaseClass[T: TypeTag]: Option[Class[_]] = {
621-
val tpe = localTypeOf[T]
622-
if (isSubtype(tpe.dealias, localTypeOf[Product])) {
623-
Some(getClassFromType(tpe))
624-
} else {
625-
None
626-
}
627-
}
628-
629614
/*
630615
* Retrieves the runtime class corresponding to the provided type.
631616
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,6 @@ class Analyzer(
256256
Batch("Nondeterministic", Once,
257257
PullOutNondeterministic),
258258
Batch("UDF", Once,
259-
ResolveCaseClassForUDF,
260-
// `ResolveCaseClassForUDF` may generates `NewInstance` so we need to resolve it
261-
ResolveNewInstance,
262259
HandleNullInputsForUDF),
263260
Batch("UpdateNullability", Once,
264261
UpdateAttributeNullability),
@@ -2744,39 +2741,6 @@ class Analyzer(
27442741
}
27452742
}
27462743

2747-
object ResolveCaseClassForUDF extends Rule[LogicalPlan] {
2748-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
2749-
case p if !p.resolved => p // Skip unresolved nodes.
2750-
2751-
case p => p transformExpressionsUp {
2752-
2753-
case udf @ ScalaUDF(_, _, inputs, _, inputCaseClass, _, _, _, _) =>
2754-
if (inputCaseClass.exists(_.isDefined)) {
2755-
assert(inputs.size == inputCaseClass.size)
2756-
val newInputs = inputs.zip(inputCaseClass).map {
2757-
case (input, clazzOpt) =>
2758-
if (clazzOpt.isDefined) {
2759-
val clazz = clazzOpt.get
2760-
assert(input.dataType.isInstanceOf[StructType],
2761-
s"expects StructType, but got ${input.dataType}")
2762-
val dataType = input.dataType.asInstanceOf[StructType]
2763-
val args = dataType.toAttributes.zipWithIndex.map { case (a, i) =>
2764-
GetStructField(input, i, Some(a.name))
2765-
}
2766-
NewInstance(clazz, args, ObjectType(clazz))
2767-
} else {
2768-
input
2769-
}
2770-
}
2771-
// assign Nil inputCaseClass to avoid applying this rule for multiple times
2772-
udf.copy(children = newInputs, inputCaseClass = Nil)
2773-
} else {
2774-
udf
2775-
}
2776-
}
2777-
}
2778-
}
2779-
27802744
/**
27812745
* Check and add proper window frames for all window functions.
27822746
*/

0 commit comments

Comments
 (0)