Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
rename
  • Loading branch information
xuanyuanking committed Dec 11, 2018
commit deff6d5bdd91631d6a64ed223bd84be1ecd5da40
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Arrays
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter}

/**
* The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition
Expand Down Expand Up @@ -157,9 +157,9 @@ class ShuffledRowRDD(
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
// `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator,
// `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
// as well as the `tempMetrics` for basic shuffle metrics.
val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics)
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
// The range of pre-shuffle partitions that we are fetching at here is
// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
val reader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
Expand All @@ -50,7 +50,7 @@ case class ShuffleExchangeExec(
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
) ++ readMetrics ++ writeMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.metric.{SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}

/**
* Take the first `limit` elements and collect them to a single partition.
Expand All @@ -41,7 +41,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = readMetrics ++ writeMetrics
protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
Expand Down Expand Up @@ -165,7 +165,7 @@ case class TakeOrderedAndProjectExec(
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = readMetrics ++ writeMetrics

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter
* @param metrics All metrics in current SparkPlan. This param should not empty and
* contains all shuffle metrics defined in createShuffleReadMetrics.
*/
private[spark] class SQLShuffleMetricsReporter(
class SQLShuffleReadMetricsReporter(
tempMetrics: TempShuffleReadMetrics,
metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
private[this] val _remoteBlocksFetched =
metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED)
metrics(SQLShuffleReadMetricsReporter.REMOTE_BLOCKS_FETCHED)
private[this] val _localBlocksFetched =
metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED)
metrics(SQLShuffleReadMetricsReporter.LOCAL_BLOCKS_FETCHED)
private[this] val _remoteBytesRead =
metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ)
metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ)
private[this] val _remoteBytesReadToDisk =
metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
private[this] val _localBytesRead =
metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ)
metrics(SQLShuffleReadMetricsReporter.LOCAL_BYTES_READ)
private[this] val _fetchWaitTime =
metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME)
metrics(SQLShuffleReadMetricsReporter.FETCH_WAIT_TIME)
private[this] val _recordsRead =
metrics(SQLShuffleMetricsReporter.RECORDS_READ)
metrics(SQLShuffleReadMetricsReporter.RECORDS_READ)

override def incRemoteBlocksFetched(v: Long): Unit = {
_remoteBlocksFetched.add(v)
Expand Down Expand Up @@ -75,7 +75,7 @@ private[spark] class SQLShuffleMetricsReporter(
}
}

private[spark] object SQLShuffleMetricsReporter {
object SQLShuffleReadMetricsReporter {
val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
val REMOTE_BYTES_READ = "remoteBytesRead"
Expand All @@ -102,7 +102,7 @@ private[spark] object SQLShuffleMetricsReporter {
* @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
* @param metrics Shuffle write metrics in current SparkPlan.
*/
private[spark] class SQLShuffleWriteMetricsReporter(
class SQLShuffleWriteMetricsReporter(
metricsReporter: ShuffleWriteMetricsReporter,
metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter {
private[this] val _bytesWritten =
Expand All @@ -112,29 +112,29 @@ private[spark] class SQLShuffleWriteMetricsReporter(
private[this] val _writeTime =
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)

override private[spark] def incBytesWritten(v: Long): Unit = {
override def incBytesWritten(v: Long): Unit = {
metricsReporter.incBytesWritten(v)
_bytesWritten.add(v)
}
override private[spark] def decRecordsWritten(v: Long): Unit = {
override def decRecordsWritten(v: Long): Unit = {
metricsReporter.decBytesWritten(v)
_recordsWritten.set(_recordsWritten.value - v)
}
override private[spark] def incRecordsWritten(v: Long): Unit = {
override def incRecordsWritten(v: Long): Unit = {
metricsReporter.incRecordsWritten(v)
_recordsWritten.add(v)
}
override private[spark] def incWriteTime(v: Long): Unit = {
override def incWriteTime(v: Long): Unit = {
metricsReporter.incWriteTime(v)
_writeTime.add(v)
}
override private[spark] def decBytesWritten(v: Long): Unit = {
override def decBytesWritten(v: Long): Unit = {
metricsReporter.decBytesWritten(v)
_bytesWritten.set(_bytesWritten.value - v)
}
}

private[spark] object SQLShuffleWriteMetricsReporter {
object SQLShuffleWriteMetricsReporter {
val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import org.apache.spark.sql.types._
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
Expand Down Expand Up @@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
new UnsafeRowSerializer(2))
val shuffled = new ShuffledRowRDD(
dependency,
SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
shuffled.count()
}
}