Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Merge remote-tracking branch 'upstream/master' into SPARK-32201
  • Loading branch information
LantaoJin committed Jul 21, 2020
commit 5bed68ceed90c89457ff04af6da62f2222c795b7
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,6 @@ case class SortMergeJoinExec(
}
}

override def outputPartitioning: Partitioning = joinType match {
case _ if isSkewJoin => UnknownPartitioning(0)
case _: InnerLike =>
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
// For left and right outer joins, the output is partitioned by the streamed input's join keys.
case LeftOuter => left.outputPartitioning
case RightOuter => right.outputPartitioning
case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)
case LeftExistence(_) => left.outputPartitioning
case x =>
throw new IllegalArgumentException(
s"${getClass.getSimpleName} should not take $x as the JoinType")
}

override def requiredChildDistribution: Seq[Distribution] = {
if (isSkewJoin) {
// We re-arrange the shuffle partitions to deal with skew join, and the new children
Expand All @@ -96,6 +82,14 @@ case class SortMergeJoinExec(
}
}

override def outputPartitioning: Partitioning = {
if (isSkewJoin) {
UnknownPartitioning(0)
} else {
super.outputPartitioning
}
}

override def outputOrdering: Seq[SortOrder] = joinType match {
// For inner join, orders of both sides keys should be kept.
case _: InnerLike =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ class AdaptiveQueryExecSuite
val result = dfAdaptive.collect()
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val df = sql(query)
QueryTest.sameRows(result.toSeq, df.collect().toSeq).foreach {
error => fail(error)
}
checkAnswer(df, result)
}
val planAfter = dfAdaptive.queryExecution.executedPlan
assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.