Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.utils.{SparkArrowUtil, SparkSchemaUtil, SparkVectorUtil}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.util.Utils
import org.apache.spark.util.{SparkVersionUtil, Utils}

import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot}
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
Expand All @@ -49,12 +49,12 @@ import scala.collection.mutable.ArrayBuffer
class ColumnarArrowPythonRunner(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
argOffsets: Array[Array[Int]],
argMetas: Array[Array[(Int, Option[String])]],
schema: StructType,
timeZoneId: String,
conf: Map[String, String],
pythonMetrics: Map[String, SQLMetric])
extends BasePythonRunnerShim(funcs, evalType, argOffsets, pythonMetrics) {
extends BasePythonRunnerShim(funcs, evalType, argMetas, pythonMetrics) {

override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback

Expand Down Expand Up @@ -149,7 +149,7 @@ class ColumnarArrowPythonRunner(
PythonRDD.writeUTF(k, dataOut)
PythonRDD.writeUTF(v, dataOut)
}
ColumnarArrowPythonRunner.this.writeUdf(dataOut, argOffsets)
ColumnarArrowPythonRunner.this.writeUdf(dataOut, argMetas)
}

// For Spark earlier than 4.0. It overrides the corresponding abstract method
Expand All @@ -165,6 +165,12 @@ class ColumnarArrowPythonRunner(
}

def writeToStreamHelper(dataOut: DataOutputStream): Boolean = {
if (!inputIterator.hasNext) {
// See https://issues.apache.org/jira/browse/SPARK-44705:
// Starting from Spark 4.0, we should return false once the iterator is drained out,
// otherwise Spark won't stop calling this method repeatedly.
return false
}
var numRows: Long = 0
val arrowSchema = SparkSchemaUtil.toArrowSchema(schema, timeZoneId)
val allocator = ArrowBufferAllocators.contextInstance()
Expand Down Expand Up @@ -264,7 +270,7 @@ case class ColumnarArrowEvalPythonExec(

protected def evaluateColumnar(
funcs: Seq[(ChainedPythonFunctions, Long)],
argOffsets: Array[Array[Int]],
argMetas: Array[Array[(Int, Option[String])]],
iter: Iterator[ColumnarBatch],
schema: StructType,
context: TaskContext): Iterator[ColumnarBatch] = {
Expand All @@ -274,7 +280,7 @@ case class ColumnarArrowEvalPythonExec(
val columnarBatchIter = new ColumnarArrowPythonRunner(
funcs,
evalType,
argOffsets,
argMetas,
schema,
sessionLocalTimeZone,
pythonRunnerConf,
Expand Down Expand Up @@ -306,22 +312,51 @@ case class ColumnarArrowEvalPythonExec(
val allInputs = new ArrayBuffer[Expression]
val dataTypes = new ArrayBuffer[DataType]
val originalOffsets = new ArrayBuffer[Int]
val argOffsets = inputs.map {
input =>
input.map {
e =>
if (allInputs.exists(_.semanticEquals(e))) {
allInputs.indexWhere(_.semanticEquals(e))
} else {
val offset = child.output.indexWhere(
_.exprId.equals(e.asInstanceOf[AttributeReference].exprId))
originalOffsets += offset
allInputs += e
dataTypes += e.dataType
allInputs.length - 1
}
}.toArray
}.toArray
val argMetas: Array[Array[(Int, Option[String])]] = if (SparkVersionUtil.gteSpark40) {
// Spark 4.0 requires ArgumentMetadata rather than trivial integer-based offset.
// See https://issues.apache.org/jira/browse/SPARK-44918.
inputs.map {
input =>
input.map {
e =>
val (key, value) = e match {
case EvalPythonExecBase.NamedArgumentExpressionShim(key, value) =>
(Some(key), value)
case _ =>
(None, e)
}
val pair: (Int, Option[String]) = if (allInputs.exists(_.semanticEquals(value))) {
allInputs.indexWhere(_.semanticEquals(value)) -> key
} else {
val offset = child.output.indexWhere(
_.exprId.equals(e.asInstanceOf[AttributeReference].exprId))
originalOffsets += offset
allInputs += value
dataTypes += value.dataType
(allInputs.length - 1) -> key
}
pair
}.toArray
}.toArray
} else {
inputs.map {
input =>
input.map {
e =>
val pair: (Int, Option[String]) = if (allInputs.exists(_.semanticEquals(e))) {
allInputs.indexWhere(_.semanticEquals(e)) -> None
} else {
val offset = child.output.indexWhere(
_.exprId.equals(e.asInstanceOf[AttributeReference].exprId))
originalOffsets += offset
allInputs += e
dataTypes += e.dataType
(allInputs.length - 1) -> None
}
pair
}.toArray
}.toArray
}
val schema = StructType(dataTypes.zipWithIndex.map {
case (dt, i) =>
StructField(s"_$i", dt)
Expand All @@ -339,15 +374,15 @@ case class ColumnarArrowEvalPythonExec(
inputCbCache += inputCb
numInputRows += inputCb.numRows
// We only need to pass the referred cols data to python worker for evaluation.
var colsForEval = new ArrayBuffer[ColumnVector]()
val colsForEval = new ArrayBuffer[ColumnVector]()
for (i <- originalOffsets) {
colsForEval += inputCb.column(i)
}
new ColumnarBatch(colsForEval.toArray, inputCb.numRows())
}

val outputColumnarBatchIterator =
evaluateColumnar(pyFuncs, argOffsets, inputBatchIter, schema, context)
evaluateColumnar(pyFuncs, argMetas, inputBatchIter, schema, context)
val res =
outputColumnarBatchIterator.zipWithIndex.map {
case (outputCb, batchId) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
.set("spark.executor.cores", "1")
}

// TODO: fix on spark-4.0
testWithMaxSparkVersion("arrow_udf test: without projection", "3.5") {
test("arrow_udf test: without projection") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
.toDF("a", "b")
Expand All @@ -60,8 +59,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
checkAnswer(df2, expected)
}

// TODO: fix on spark-4.0
testWithMaxSparkVersion("arrow_udf test: with unrelated projection", "3.5") {
test("arrow_udf test: with unrelated projection") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
.toDF("a", "b")
Expand All @@ -81,7 +79,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
checkAnswer(df, expected)
}

// TODO: fix on spark-4.0
// A fix needed for Spark 4.0 change in https://github.com/apache/spark/pull/42864.
testWithMaxSparkVersion("arrow_udf test: with preprojection", "3.5") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
Comment on lines +82 to 85
Copy link
Member Author

@zhztheplayer zhztheplayer Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case remains excluded in 4.0 due to the following error:

Details
[DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION] Cannot resolve "pyarrowUDF((b * 2))" due to data type mismatch: cannot cast "STRING" to "VOID". SQLSTATE: 42K09;
'Project [a#9, b#10, d_b#11, p_a#13, cast(pyarrowUDF(cast((b#10 * 2) as string)) as void) AS p_b#15]
+- Project [a#9, b#10, d_b#11, cast(pyarrowUDF(cast(a#9 as string)) as string) AS p_a#13]
   +- Project [a#9, b#10, (b#10 * 2) AS d_b#11]
      +- Project [_1#2 AS a#9, _2#3 AS b#10]
         +- LocalRelation [_1#2, _2#3]

org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION] Cannot resolve "pyarrowUDF((b * 2))" due to data type mismatch: cannot cast "STRING" to "VOID". SQLSTATE: 42K09;
'Project [a#9, b#10, d_b#11, p_a#13, cast(pyarrowUDF(cast((b#10 * 2) as string)) as void) AS p_b#15]
+- Project [a#9, b#10, d_b#11, cast(pyarrowUDF(cast(a#9 as string)) as string) AS p_a#13]
   +- Project [a#9, b#10, (b#10 * 2) AS d_b#11]
      +- Project [_1#2 AS a#9, _2#3 AS b#10]
         +- LocalRelation [_1#2, _2#3]

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:77)
	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:70)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$7(CheckAnalysis.scala:420)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$7$adapted(CheckAnalysis.scala:402)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:252)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251)
	at scala.collection.immutable.Vector.foreach(Vector.scala:2125)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6(CheckAnalysis.scala:402)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6$adapted(CheckAnalysis.scala:402)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:402)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:284)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:252)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:284)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:255)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:299)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:244)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:231)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:299)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$resolveInFixedPoint$1(HybridAnalyzer.scala:192)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:192)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80)
	at org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$1(Dataset.scala:115)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:113)
	at org.apache.spark.sql.classic.Dataset.withPlan(Dataset.scala:2263)
	at org.apache.spark.sql.classic.Dataset.withColumns(Dataset.scala:1283)
	at org.apache.spark.sql.classic.Dataset.withColumns(Dataset.scala:232)
	at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2187)
	at org.apache.spark.sql.classic.Dataset.withColumn(Dataset.scala:1819)
	at org.apache.gluten.execution.python.ArrowEvalPythonExecSuite.$anonfun$new$5(ArrowEvalPythonExecSuite.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
	at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
	at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
	at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
	at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
	at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:77)
		at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:70)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$7(CheckAnalysis.scala:420)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$7$adapted(CheckAnalysis.scala:402)
		at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:252)
		at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251)
		at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251)
		at scala.collection.immutable.Vector.foreach(Vector.scala:2125)
		at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6(CheckAnalysis.scala:402)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6$adapted(CheckAnalysis.scala:402)
		at scala.collection.immutable.List.foreach(List.scala:334)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:402)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:284)
		at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:252)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:284)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:255)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:299)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:244)
		at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:231)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:299)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$resolveInFixedPoint$1(HybridAnalyzer.scala:192)
		at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:192)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 69 more

The error is related to apache/spark#42864. It is not a bug in Gluten; the test code needs to be updated. A fix is needed later.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import java.net.Socket
abstract class BasePythonRunnerShim(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
argOffsets: Array[Array[Int]],
argMetas: Array[Array[(Int, Option[String])]],
pythonMetrics: Map[String, SQLMetric])
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs.map(_._1), evalType, argOffsets) {
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](
funcs.map(_._1),
evalType,
argMetas.map(_.map(_._1))) {
// The type aliases below provide consistent type names in child classes,
// ensuring code compatibility with both Spark 4.0 and earlier versions.
type Writer = WriterThread
Expand All @@ -43,8 +46,10 @@ abstract class BasePythonRunnerShim(
partitionIndex: Int,
context: TaskContext): Writer

protected def writeUdf(dataOut: DataOutputStream, argOffsets: Array[Array[Int]]): Unit = {
PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argOffsets)
protected def writeUdf(
dataOut: DataOutputStream,
argMetas: Array[Array[(Int, Option[String])]]): Unit = {
PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argMetas.map(_.map(_._1)))
}

override protected def newWriterThread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python
import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType

abstract class EvalPythonExecBase extends EvalPythonExec {
Expand All @@ -32,3 +33,9 @@ abstract class EvalPythonExecBase extends EvalPythonExec {
throw new IllegalStateException("EvalPythonExecTransformer doesn't support evaluate")
}
}

object EvalPythonExecBase {
object NamedArgumentExpressionShim {
def unapply(expr: Expression): Option[(String, Expression)] = None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import java.net.Socket
abstract class BasePythonRunnerShim(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
argOffsets: Array[Array[Int]],
argMetas: Array[Array[(Int, Option[String])]],
pythonMetrics: Map[String, SQLMetric])
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs.map(_._1), evalType, argOffsets) {
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](
funcs.map(_._1),
evalType,
argMetas.map(_.map(_._1))) {
// The type aliases below provide consistent type names in child classes,
// ensuring code compatibility with both Spark 4.0 and earlier versions.
type Writer = WriterThread
Expand All @@ -43,8 +46,10 @@ abstract class BasePythonRunnerShim(
partitionIndex: Int,
context: TaskContext): Writer

protected def writeUdf(dataOut: DataOutputStream, argOffsets: Array[Array[Int]]): Unit = {
PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argOffsets)
protected def writeUdf(
dataOut: DataOutputStream,
argMetas: Array[Array[(Int, Option[String])]]): Unit = {
PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argMetas.map(_.map(_._1)))
}

override protected def newWriterThread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python
import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType

abstract class EvalPythonExecBase extends EvalPythonExec {
Expand All @@ -32,3 +33,9 @@ abstract class EvalPythonExecBase extends EvalPythonExec {
throw new IllegalStateException("EvalPythonExecTransformer doesn't support evaluate")
}
}

object EvalPythonExecBase {
object NamedArgumentExpressionShim {
def unapply(expr: Expression): Option[(String, Expression)] = None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import java.net.Socket
abstract class BasePythonRunnerShim(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
argOffsets: Array[Array[Int]],
argMetas: Array[Array[(Int, Option[String])]],
pythonMetrics: Map[String, SQLMetric])
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs.map(_._1), evalType, argOffsets) {
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](
funcs.map(_._1),
evalType,
argMetas.map(_.map(_._1))) {
// The type aliases below provide consistent type names in child classes,
// ensuring code compatibility with both Spark 4.0 and earlier versions.
type Writer = WriterThread
Expand All @@ -43,8 +46,10 @@ abstract class BasePythonRunnerShim(
partitionIndex: Int,
context: TaskContext): Writer

protected def writeUdf(dataOut: DataOutputStream, argOffsets: Array[Array[Int]]): Unit = {
PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argOffsets)
protected def writeUdf(
dataOut: DataOutputStream,
argMetas: Array[Array[(Int, Option[String])]]): Unit = {
PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argMetas.map(_.map(_._1)))
}

override protected def newWriterThread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python
import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType

abstract class EvalPythonExecBase extends EvalPythonExec {
Expand All @@ -32,3 +33,9 @@ abstract class EvalPythonExecBase extends EvalPythonExec {
throw new IllegalStateException("EvalPythonExecTransformer doesn't support evaluate")
}
}

object EvalPythonExecBase {
object NamedArgumentExpressionShim {
def unapply(expr: Expression): Option[(String, Expression)] = None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import java.net.Socket
abstract class BasePythonRunnerShim(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
argOffsets: Array[Array[Int]],
argMetas: Array[Array[(Int, Option[String])]],
pythonMetrics: Map[String, SQLMetric])
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](
funcs.map(_._1),
evalType,
argOffsets,
argMetas.map(_.map(_._1)),
None) {
// The type aliases below provide consistent type names in child classes,
// ensuring code compatibility with both Spark 4.0 and earlier versions.
Expand All @@ -47,8 +47,10 @@ abstract class BasePythonRunnerShim(
partitionIndex: Int,
context: TaskContext): Writer

protected def writeUdf(dataOut: DataOutputStream, argOffsets: Array[Array[Int]]): Unit = {
PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argOffsets)
protected def writeUdf(
dataOut: DataOutputStream,
argMetas: Array[Array[(Int, Option[String])]]): Unit = {
PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argMetas.map(_.map(_._1)))
}

override protected def newWriterThread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python
import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedArgumentExpression}
import org.apache.spark.sql.types.StructType

abstract class EvalPythonExecBase extends EvalPythonExec {
Expand All @@ -32,3 +33,12 @@ abstract class EvalPythonExecBase extends EvalPythonExec {
throw new IllegalStateException("EvalPythonExecTransformer doesn't support evaluate")
}
}

object EvalPythonExecBase {
object NamedArgumentExpressionShim {
def unapply(expr: Expression): Option[(String, Expression)] = expr match {
case NamedArgumentExpression(key, value) => Some((key, value))
case _ => None
}
}
}
Loading