Skip to content

Commit 5463bfc

Browse files
committed
[SPARK-49179][SQL][3.5] Fix v2 multi bucketed inner joins throw AssertionError
backport #47683 to branch-3.5 ### What changes were proposed in this pull request? For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering. ### Why are the changes needed? Fix exception if the query contains multi bucketed inner joins ```sql SELECT * FROM testcat.ns.t1 JOIN testcat.ns.t2 ON t1.id = t2.id JOIN testcat.ns.t3 ON t1.id = t3.id ``` ``` Cause: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:264) at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642) at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385) at scala.collection.immutable.List.map(List.scala:247) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382) at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364) at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497) at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689) at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882) ``` ### Does this PR introduce _any_ user-facing change? yes, it's a bug fix ### How was this patch tested? add test ### Was this patch authored or co-authored using generative AI tooling? no Closes #47735 from ulysses-you/SPARK-49179-3.5. Authored-by: ulysses-you <[email protected]> Signed-off-by: youxiduo <[email protected]>
1 parent 6859ef9 commit 5463bfc

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ case class EnsureRequirements(
550550
private def createKeyGroupedShuffleSpec(
551551
partitioning: Partitioning,
552552
distribution: ClusteredDistribution): Option[KeyGroupedShuffleSpec] = {
553-
def check(partitioning: KeyGroupedPartitioning): Option[KeyGroupedShuffleSpec] = {
553+
def tryCreate(partitioning: KeyGroupedPartitioning): Option[KeyGroupedShuffleSpec] = {
554554
val attributes = partitioning.expressions.flatMap(_.collectLeaves())
555555
val clustering = distribution.clustering
556556

@@ -570,11 +570,10 @@ case class EnsureRequirements(
570570
}
571571

572572
partitioning match {
573-
case p: KeyGroupedPartitioning => check(p)
573+
case p: KeyGroupedPartitioning => tryCreate(p)
574574
case PartitioningCollection(partitionings) =>
575575
val specs = partitionings.map(p => createKeyGroupedShuffleSpec(p, distribution))
576-
assert(specs.forall(_.isEmpty) || specs.forall(_.isDefined))
577-
specs.head
576+
specs.filter(_.isDefined).map(_.get).headOption
578577
case _ => None
579578
}
580579
}

sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,28 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
330330
.add("price", FloatType)
331331
.add("time", TimestampType)
332332

333+
test("SPARK-49179: Fix v2 multi bucketed inner joins throw AssertionError") {
334+
val cols = new StructType()
335+
.add("id", LongType)
336+
.add("name", StringType)
337+
val buckets = Array(bucket(8, "id"))
338+
339+
withTable("t1", "t2", "t3") {
340+
Seq("t1", "t2", "t3").foreach { t =>
341+
createTable(t, cols, buckets)
342+
sql(s"INSERT INTO testcat.ns.$t VALUES (1, 'aa'), (2, 'bb'), (3, 'cc')")
343+
}
344+
val df = sql(
345+
"""
346+
|SELECT t1.id, t2.id, t3.name FROM testcat.ns.t1
347+
|JOIN testcat.ns.t2 ON t1.id = t2.id
348+
|JOIN testcat.ns.t3 ON t1.id = t3.id
349+
|""".stripMargin)
350+
checkAnswer(df, Seq(Row(1, 1, "aa"), Row(2, 2, "bb"), Row(3, 3, "cc")))
351+
assert(collectShuffles(df.queryExecution.executedPlan).isEmpty)
352+
}
353+
}
354+
333355
test("partitioned join: join with two partition keys and matching & sorted partitions") {
334356
val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
335357
createTable(items, items_schema, items_partitions)

0 commit comments

Comments
 (0)