Skip to content

Commit 4c55671

Browse files
jiakeJackey Lee
authored andcommitted
[SPARK-26316][SPARK-21052] Revert hash join metrics in that causes performance degradation
## What changes were proposed in this pull request? The wrong implementation in the hash join metrics in [spark 21052](https://issues.apache.org/jira/browse/SPARK-21052) caused significant performance degradation in TPC-DS. And the result is [here](https://docs.google.com/spreadsheets/d/18a5BdOlmm8euTaRodyeWum9yu92mbWWu6JbhGXtr7yE/edit#gid=0) in TPC-DS 1TB scale. So we currently partial revert 21052. **Cluster info:**   | Master Node | Worker Nodes -- | -- | -- Node | 1x | 4x Processor | Intel(R) Xeon(R) Platinum 8170 CPU 2.10GHz | Intel(R) Xeon(R) Platinum 8180 CPU 2.50GHz Memory | 192 GB | 384 GB Storage Main | 8 x 960G SSD | 8 x 960G SSD Network | 10Gbe |   Role | CM Management NameNodeSecondary NameNodeResource ManagerHive Metastore Server | DataNodeNodeManager OS Version | CentOS 7.2 | CentOS 7.2 Hadoop | Apache Hadoop 2.7.5 | Apache Hadoop 2.7.5 Hive | Apache Hive 2.2.0 |   Spark | Apache Spark 2.1.0  & Apache Spark2.3.0 |   JDK  version | 1.8.0_112 | 1.8.0_112 **Related parameters setting:** Component | Parameter | Value -- | -- | -- Yarn Resource Manager | yarn.scheduler.maximum-allocation-mb | 120GB   | yarn.scheduler.minimum-allocation-mb | 1GB   | yarn.scheduler.maximum-allocation-vcores | 121   | Yarn.resourcemanager.scheduler.class | Fair Scheduler Yarn Node Manager | yarn.nodemanager.resource.memory-mb | 120GB   | yarn.nodemanager.resource.cpu-vcores | 121 Spark | spark.executor.memory | 110GB   | spark.executor.cores | 50 ## How was this patch tested? N/A Closes apache#23269 from JkSelf/partial-revert-21052. Authored-by: jiake <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a69552d commit 4c55671

File tree

5 files changed

+6
-165
lines changed

5 files changed

+6
-165
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Dist
2929
import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, SparkPlan}
3030
import org.apache.spark.sql.execution.metric.SQLMetrics
3131
import org.apache.spark.sql.types.{BooleanType, LongType}
32-
import org.apache.spark.util.TaskCompletionListener
3332

3433
/**
3534
* Performs an inner hash join of two child relations. When the output RDD of this operator is
@@ -48,8 +47,7 @@ case class BroadcastHashJoinExec(
4847
extends BinaryExecNode with HashJoin with CodegenSupport {
4948

5049
override lazy val metrics = Map(
51-
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
52-
"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))
50+
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
5351

5452
override def requiredChildDistribution: Seq[Distribution] = {
5553
val mode = HashedRelationBroadcastMode(buildKeys)
@@ -63,13 +61,12 @@ case class BroadcastHashJoinExec(
6361

6462
protected override def doExecute(): RDD[InternalRow] = {
6563
val numOutputRows = longMetric("numOutputRows")
66-
val avgHashProbe = longMetric("avgHashProbe")
6764

6865
val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
6966
streamedPlan.execute().mapPartitions { streamedIter =>
7067
val hashed = broadcastRelation.value.asReadOnlyCopy()
7168
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
72-
join(streamedIter, hashed, numOutputRows, avgHashProbe)
69+
join(streamedIter, hashed, numOutputRows)
7370
}
7471
}
7572

@@ -111,23 +108,6 @@ case class BroadcastHashJoinExec(
111108
}
112109
}
113110

114-
/**
115-
* Returns the codes used to add a task completion listener to update avg hash probe
116-
* at the end of the task.
117-
*/
118-
private def genTaskListener(avgHashProbe: String, relationTerm: String): String = {
119-
val listenerClass = classOf[TaskCompletionListener].getName
120-
val taskContextClass = classOf[TaskContext].getName
121-
s"""
122-
| $taskContextClass$$.MODULE$$.get().addTaskCompletionListener(new $listenerClass() {
123-
| @Override
124-
| public void onTaskCompletion($taskContextClass context) {
125-
| $avgHashProbe.set($relationTerm.getAverageProbesPerLookup());
126-
| }
127-
| });
128-
""".stripMargin
129-
}
130-
131111
/**
132112
* Returns a tuple of Broadcast of HashedRelation and the variable name for it.
133113
*/
@@ -137,15 +117,11 @@ case class BroadcastHashJoinExec(
137117
val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
138118
val clsName = broadcastRelation.value.getClass.getName
139119

140-
// At the end of the task, we update the avg hash probe.
141-
val avgHashProbe = metricTerm(ctx, "avgHashProbe")
142-
143120
// Inline mutable state since not many join operations in a task
144121
val relationTerm = ctx.addMutableState(clsName, "relation",
145122
v => s"""
146123
| $v = (($clsName) $broadcast.value()).asReadOnlyCopy();
147124
| incPeakExecutionMemory($v.estimatedSize());
148-
| ${genTaskListener(avgHashProbe, v)}
149125
""".stripMargin, forceInline = true)
150126
(broadcastRelation, relationTerm)
151127
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.execution.joins
1919

20-
import org.apache.spark.TaskContext
2120
import org.apache.spark.sql.catalyst.InternalRow
2221
import org.apache.spark.sql.catalyst.expressions._
2322
import org.apache.spark.sql.catalyst.plans._
@@ -194,8 +193,7 @@ trait HashJoin {
194193
protected def join(
195194
streamedIter: Iterator[InternalRow],
196195
hashed: HashedRelation,
197-
numOutputRows: SQLMetric,
198-
avgHashProbe: SQLMetric): Iterator[InternalRow] = {
196+
numOutputRows: SQLMetric): Iterator[InternalRow] = {
199197

200198
val joinedIter = joinType match {
201199
case _: InnerLike =>
@@ -213,10 +211,6 @@ trait HashJoin {
213211
s"BroadcastHashJoin should not take $x as the JoinType")
214212
}
215213

216-
// At the end of the task, we update the avg hash probe.
217-
TaskContext.get().addTaskCompletionListener[Unit](_ =>
218-
avgHashProbe.set(hashed.getAverageProbesPerLookup))
219-
220214
val resultProj = createResultProjection
221215
joinedIter.map { r =>
222216
numOutputRows += 1

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,6 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation {
8080
* Release any used resources.
8181
*/
8282
def close(): Unit
83-
84-
/**
85-
* Returns the average number of probes per key lookup.
86-
*/
87-
def getAverageProbesPerLookup: Double
8883
}
8984

9085
private[execution] object HashedRelation {
@@ -279,8 +274,6 @@ private[joins] class UnsafeHashedRelation(
279274
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
280275
read(() => in.readInt(), () => in.readLong(), in.readBytes)
281276
}
282-
283-
override def getAverageProbesPerLookup: Double = binaryMap.getAverageProbesPerLookup
284277
}
285278

286279
private[joins] object UnsafeHashedRelation {
@@ -395,10 +388,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
395388
// The number of unique keys.
396389
private var numKeys = 0L
397390

398-
// Tracking average number of probes per key lookup.
399-
private var numKeyLookups = 0L
400-
private var numProbes = 0L
401-
402391
// needed by serializer
403392
def this() = {
404393
this(
@@ -483,8 +472,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
483472
*/
484473
def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
485474
if (isDense) {
486-
numKeyLookups += 1
487-
numProbes += 1
488475
if (key >= minKey && key <= maxKey) {
489476
val value = array((key - minKey).toInt)
490477
if (value > 0) {
@@ -493,14 +480,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
493480
}
494481
} else {
495482
var pos = firstSlot(key)
496-
numKeyLookups += 1
497-
numProbes += 1
498483
while (array(pos + 1) != 0) {
499484
if (array(pos) == key) {
500485
return getRow(array(pos + 1), resultRow)
501486
}
502487
pos = nextSlot(pos)
503-
numProbes += 1
504488
}
505489
}
506490
null
@@ -528,8 +512,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
528512
*/
529513
def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = {
530514
if (isDense) {
531-
numKeyLookups += 1
532-
numProbes += 1
533515
if (key >= minKey && key <= maxKey) {
534516
val value = array((key - minKey).toInt)
535517
if (value > 0) {
@@ -538,14 +520,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
538520
}
539521
} else {
540522
var pos = firstSlot(key)
541-
numKeyLookups += 1
542-
numProbes += 1
543523
while (array(pos + 1) != 0) {
544524
if (array(pos) == key) {
545525
return valueIter(array(pos + 1), resultRow)
546526
}
547527
pos = nextSlot(pos)
548-
numProbes += 1
549528
}
550529
}
551530
null
@@ -585,11 +564,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
585564
private def updateIndex(key: Long, address: Long): Unit = {
586565
var pos = firstSlot(key)
587566
assert(numKeys < array.length / 2)
588-
numKeyLookups += 1
589-
numProbes += 1
590567
while (array(pos) != key && array(pos + 1) != 0) {
591568
pos = nextSlot(pos)
592-
numProbes += 1
593569
}
594570
if (array(pos + 1) == 0) {
595571
// this is the first value for this key, put the address in array.
@@ -721,8 +697,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
721697
writeLong(maxKey)
722698
writeLong(numKeys)
723699
writeLong(numValues)
724-
writeLong(numKeyLookups)
725-
writeLong(numProbes)
726700

727701
writeLong(array.length)
728702
writeLongArray(writeBuffer, array, array.length)
@@ -764,8 +738,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
764738
maxKey = readLong()
765739
numKeys = readLong()
766740
numValues = readLong()
767-
numKeyLookups = readLong()
768-
numProbes = readLong()
769741

770742
val length = readLong().toInt
771743
mask = length - 2
@@ -783,11 +755,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
783755
override def read(kryo: Kryo, in: Input): Unit = {
784756
read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
785757
}
786-
787-
/**
788-
* Returns the average number of probes per key lookup.
789-
*/
790-
def getAverageProbesPerLookup: Double = numProbes.toDouble / numKeyLookups
791758
}
792759

793760
private[joins] class LongHashedRelation(
@@ -839,8 +806,6 @@ private[joins] class LongHashedRelation(
839806
resultRow = new UnsafeRow(nFields)
840807
map = in.readObject().asInstanceOf[LongToUnsafeRowMap]
841808
}
842-
843-
override def getAverageProbesPerLookup: Double = map.getAverageProbesPerLookup
844809
}
845810

846811
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ case class ShuffledHashJoinExec(
4242
override lazy val metrics = Map(
4343
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
4444
"buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
45-
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"),
46-
"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))
45+
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))
4746

4847
override def requiredChildDistribution: Seq[Distribution] =
4948
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
@@ -63,10 +62,9 @@ case class ShuffledHashJoinExec(
6362

6463
protected override def doExecute(): RDD[InternalRow] = {
6564
val numOutputRows = longMetric("numOutputRows")
66-
val avgHashProbe = longMetric("avgHashProbe")
6765
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
6866
val hashed = buildHashedRelation(buildIter)
69-
join(streamIter, hashed, numOutputRows, avgHashProbe)
67+
join(streamIter, hashed, numOutputRows)
7068
}
7169
}
7270
}

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 1 addition & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -261,50 +261,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
261261
)
262262
}
263263

264-
test("BroadcastHashJoin metrics: track avg probe") {
265-
// The executed plan looks like:
266-
// Project [a#210, b#211, b#221]
267-
// +- BroadcastHashJoin [a#210], [a#220], Inner, BuildRight
268-
// :- Project [_1#207 AS a#210, _2#208 AS b#211]
269-
// : +- Filter isnotnull(_1#207)
270-
// : +- LocalTableScan [_1#207, _2#208]
271-
// +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, binary, true]))
272-
// +- Project [_1#217 AS a#220, _2#218 AS b#221]
273-
// +- Filter isnotnull(_1#217)
274-
// +- LocalTableScan [_1#217, _2#218]
275-
//
276-
// Assume the execution plan with node id is
277-
// WholeStageCodegen disabled:
278-
// Project(nodeId = 0)
279-
// BroadcastHashJoin(nodeId = 1)
280-
// ...(ignored)
281-
//
282-
// WholeStageCodegen enabled:
283-
// WholeStageCodegen(nodeId = 0)
284-
// Project(nodeId = 1)
285-
// BroadcastHashJoin(nodeId = 2)
286-
// Project(nodeId = 3)
287-
// Filter(nodeId = 4)
288-
// ...(ignored)
289-
Seq(true, false).foreach { enableWholeStage =>
290-
val df1 = generateRandomBytesDF()
291-
val df2 = generateRandomBytesDF()
292-
val df = df1.join(broadcast(df2), "a")
293-
val nodeIds = if (enableWholeStage) {
294-
Set(2L)
295-
} else {
296-
Set(1L)
297-
}
298-
val metrics = getSparkPlanMetrics(df, 2, nodeIds, enableWholeStage).get
299-
nodeIds.foreach { nodeId =>
300-
val probes = metrics(nodeId)._2("avg hash probe (min, med, max)")
301-
probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe =>
302-
assert(probe.toDouble > 1.0)
303-
}
304-
}
305-
}
306-
}
307-
308264
test("ShuffledHashJoin metrics") {
309265
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "40",
310266
"spark.sql.shuffle.partitions" -> "2",
@@ -323,8 +279,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
323279
val df = df1.join(df2, "key")
324280
testSparkPlanMetrics(df, 1, Map(
325281
1L -> (("ShuffledHashJoin", Map(
326-
"number of output rows" -> 2L,
327-
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))),
282+
"number of output rows" -> 2L))),
328283
2L -> (("Exchange", Map(
329284
"shuffle records written" -> 2L,
330285
"records read" -> 2L))),
@@ -335,53 +290,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
335290
}
336291
}
337292

338-
test("ShuffledHashJoin metrics: track avg probe") {
339-
// The executed plan looks like:
340-
// Project [a#308, b#309, b#319]
341-
// +- ShuffledHashJoin [a#308], [a#318], Inner, BuildRight
342-
// :- Exchange hashpartitioning(a#308, 2)
343-
// : +- Project [_1#305 AS a#308, _2#306 AS b#309]
344-
// : +- Filter isnotnull(_1#305)
345-
// : +- LocalTableScan [_1#305, _2#306]
346-
// +- Exchange hashpartitioning(a#318, 2)
347-
// +- Project [_1#315 AS a#318, _2#316 AS b#319]
348-
// +- Filter isnotnull(_1#315)
349-
// +- LocalTableScan [_1#315, _2#316]
350-
//
351-
// Assume the execution plan with node id is
352-
// WholeStageCodegen disabled:
353-
// Project(nodeId = 0)
354-
// ShuffledHashJoin(nodeId = 1)
355-
// ...(ignored)
356-
//
357-
// WholeStageCodegen enabled:
358-
// WholeStageCodegen(nodeId = 0)
359-
// Project(nodeId = 1)
360-
// ShuffledHashJoin(nodeId = 2)
361-
// ...(ignored)
362-
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "5000000",
363-
"spark.sql.shuffle.partitions" -> "2",
364-
"spark.sql.join.preferSortMergeJoin" -> "false") {
365-
Seq(true, false).foreach { enableWholeStage =>
366-
val df1 = generateRandomBytesDF(65535 * 5)
367-
val df2 = generateRandomBytesDF(65535)
368-
val df = df1.join(df2, "a")
369-
val nodeIds = if (enableWholeStage) {
370-
Set(2L)
371-
} else {
372-
Set(1L)
373-
}
374-
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
375-
nodeIds.foreach { nodeId =>
376-
val probes = metrics(nodeId)._2("avg hash probe (min, med, max)")
377-
probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe =>
378-
assert(probe.toDouble > 1.0)
379-
}
380-
}
381-
}
382-
}
383-
}
384-
385293
test("BroadcastHashJoin(outer) metrics") {
386294
val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
387295
val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")

0 commit comments

Comments
 (0)