Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Arrays
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter}

/**
* The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition
Expand Down Expand Up @@ -112,6 +113,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A
*/
class ShuffledRowRDD(
var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
metrics: Map[String, SQLMetric],
specifiedPartitionStartIndices: Option[Array[Int]] = None)
extends RDD[InternalRow](dependency.rdd.context, Nil) {

Expand Down Expand Up @@ -154,7 +156,10 @@ class ShuffledRowRDD(

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
// `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator,
// as well as the `tempMetrics` for basic shuffle metrics.
val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics)
// The range of pre-shuffle partitions that we are fetching at here is
// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
val reader =
Expand All @@ -163,7 +168,7 @@ class ShuffledRowRDD(
shuffledRowPartition.startPreShufflePartitionIndex,
shuffledRowPartition.endPreShufflePartitionIndex,
context,
metrics)
sqlMetricsReporter)
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ case class ShuffleExchangeExec(
// e.g. it can be null on the Executor side

override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
) ++ SQLMetrics.getShuffleReadMetrics(sparkContext)

override def nodeName: String = {
val extraInfo = coordinator match {
Expand Down Expand Up @@ -108,7 +109,7 @@ case class ShuffleExchangeExec(
assert(newPartitioning.isInstanceOf[HashPartitioning])
newPartitioning = UnknownPartitioning(indices.length)
}
new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
new ShuffledRowRDD(shuffleDependency, metrics, specifiedPartitionStartIndices)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
* Take the first `limit` elements and collect them to a single partition.
Expand All @@ -37,11 +38,13 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
override def outputPartitioning: Partitioning = SinglePartition
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)
protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
val shuffled = new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
locallyLimited, child.output, SinglePartition, serializer))
locallyLimited, child.output, SinglePartition, serializer),
metrics)
shuffled.mapPartitionsInternal(_.take(limit))
}
}
Expand Down Expand Up @@ -151,6 +154,8 @@ case class TakeOrderedAndProjectExec(

private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)

override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)

protected override def doExecute(): RDD[InternalRow] = {
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val localTopK: RDD[InternalRow] = {
Expand All @@ -160,7 +165,8 @@ case class TakeOrderedAndProjectExec(
}
val shuffled = new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
localTopK, child.output, SinglePartition, serializer))
localTopK, child.output, SinglePartition, serializer),
metrics)
shuffled.mapPartitions { iter =>
val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
if (projectList != child.output) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ object SQLMetrics {

private val baseForAvgMetric: Int = 10

val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than putting this list and the getShuffleReadMetrics function here, we should move it into SQLShuffleMetricsReporter. Otherwise in the future when one adds another metric, he/she is likely to forget to update SQLShuffleMetricsReporter.

val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
val REMOTE_BYTES_READ = "remoteBytesRead"
val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
val LOCAL_BYTES_READ = "localBytesRead"
val FETCH_WAIT_TIME = "fetchWaitTime"
val RECORDS_READ = "recordsRead"

/**
* Converts a double value to long value by multiplying a base integer, so we can store it in
* `SQLMetrics`. It only works for average metrics. When showing the metrics on UI, we restore
Expand Down Expand Up @@ -194,4 +202,16 @@ object SQLMetrics {
SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value)))
}
}

/**
* Create all shuffle read relative metrics and return the Map.
*/
def getShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to name this create, rather than get, to imply we are creating a new set rather than just returning some existing sets.

Copy link
Member Author

@xuanyuanking xuanyuanking Nov 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, rename it to createShuffleReadMetrics and move to SQLShuffleMetricsReporter. Done in #23175.

REMOTE_BLOCKS_FETCHED -> createMetric(sc, "remote blocks fetched"),
LOCAL_BLOCKS_FETCHED -> createMetric(sc, "local blocks fetched"),
REMOTE_BYTES_READ -> createSizeMetric(sc, "remote bytes read"),
REMOTE_BYTES_READ_TO_DISK -> createSizeMetric(sc, "remote bytes read to disk"),
LOCAL_BYTES_READ -> createSizeMetric(sc, "local bytes read"),
FETCH_WAIT_TIME -> createTimingMetric(sc, "fetch wait time"),
RECORDS_READ -> createMetric(sc, "records read"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.metric

import org.apache.spark.executor.TempShuffleReadMetrics

/**
* A shuffle metrics reporter for SQL exchange operators.
* @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
* @param metrics All metrics in current SparkPlan. This param should not empty and
* contains all shuffle metrics defined in [[SQLMetrics.getShuffleReadMetrics]].
*/
private[spark] class SQLShuffleMetricsReporter(
tempMetrics: TempShuffleReadMetrics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 space indent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks ,done in #23175.

metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
private[this] val _remoteBlocksFetched = metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED)
private[this] val _localBlocksFetched = metrics(SQLMetrics.LOCAL_BLOCKS_FETCHED)
private[this] val _remoteBytesRead = metrics(SQLMetrics.REMOTE_BYTES_READ)
private[this] val _remoteBytesReadToDisk = metrics(SQLMetrics.REMOTE_BYTES_READ_TO_DISK)
private[this] val _localBytesRead = metrics(SQLMetrics.LOCAL_BYTES_READ)
private[this] val _fetchWaitTime = metrics(SQLMetrics.FETCH_WAIT_TIME)
private[this] val _recordsRead = metrics(SQLMetrics.RECORDS_READ)

override def incRemoteBlocksFetched(v: Long): Unit = {
_remoteBlocksFetched.add(v)
tempMetrics.incRemoteBlocksFetched(v)
}
override def incLocalBlocksFetched(v: Long): Unit = {
_localBlocksFetched.add(v)
tempMetrics.incLocalBlocksFetched(v)
}
override def incRemoteBytesRead(v: Long): Unit = {
_remoteBytesRead.add(v)
tempMetrics.incRemoteBytesRead(v)
}
override def incRemoteBytesReadToDisk(v: Long): Unit = {
_remoteBytesReadToDisk.add(v)
tempMetrics.incRemoteBytesReadToDisk(v)
}
override def incLocalBytesRead(v: Long): Unit = {
_localBytesRead.add(v)
tempMetrics.incLocalBytesRead(v)
}
override def incFetchWaitTime(v: Long): Unit = {
_fetchWaitTime.add(v)
tempMetrics.incFetchWaitTime(v)
}
override def incRecordsRead(v: Long): Unit = {
_recordsRead.add(v)
tempMetrics.incRecordsRead(v)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types._
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
Expand Down Expand Up @@ -137,7 +138,9 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
rowsRDD,
new PartitionIdPassthrough(2),
new UnsafeRowSerializer(2))
val shuffled = new ShuffledRowRDD(dependency)
val shuffled = new ShuffledRowRDD(
dependency,
SQLMetrics.getShuffleReadMetrics(spark.sparkContext))
shuffled.count()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
Map("number of output rows" -> 1L,
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
val shuffleExpected1 = Map(
"records read" -> 2L,
"local blocks fetched" -> 2L,
"remote blocks fetched" -> 0L)
testSparkPlanMetrics(df, 1, Map(
2L -> (("HashAggregate", expected1(0))),
1L -> (("Exchange", shuffleExpected1)),
0L -> (("HashAggregate", expected1(1))))
)

Expand All @@ -106,8 +111,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
Map("number of output rows" -> 3L,
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
val shuffleExpected2 = Map(
"records read" -> 4L,
"local blocks fetched" -> 4L,
"remote blocks fetched" -> 0L)
testSparkPlanMetrics(df2, 1, Map(
2L -> (("HashAggregate", expected2(0))),
1L -> (("Exchange", shuffleExpected2)),
0L -> (("HashAggregate", expected2(1))))
)
}
Expand Down Expand Up @@ -191,7 +201,11 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
testSparkPlanMetrics(df, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
"number of output rows" -> 4L))))
"number of output rows" -> 4L))),
2L -> (("Exchange", Map(
"records read" -> 4L,
"local blocks fetched" -> 2L,
"remote blocks fetched" -> 0L))))
)
}
}
Expand All @@ -208,15 +222,15 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
// It's 8 because we read 6 rows in the left and 2 row in the right one
"number of output rows" -> 8L))))
)

val df2 = spark.sql(
"SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df2, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
// It's 8 because we read 6 rows in the left and 2 row in the right one
"number of output rows" -> 8L))))
)
}
Expand Down Expand Up @@ -287,7 +301,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
// Assume the execution plan is
// ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0)
val df = df1.join(df2, "key")
val metrics = getSparkPlanMetrics(df, 1, Set(1L))
testSparkPlanMetrics(df, 1, Map(
1L -> (("ShuffledHashJoin", Map(
"number of output rows" -> 2L,
Expand Down