Skip to content

Commit 33de7df

Browse files
gatorsmileholdenk
authored andcommitted
[SPARK-26327][SQL][FOLLOW-UP] Refactor the code and restore the metrics name
## What changes were proposed in this pull request? - The original comment about `updateDriverMetrics` is not right. - Refactor the code to ensure `selectedPartitions ` has been set before sending the driver-side metrics. - Restore the original name, which is more general and extendable. ## How was this patch tested? The existing tests. Closes apache#23328 from gatorsmile/followupSpark-26142. Authored-by: gatorsmile <[email protected]> Signed-off-by: gatorsmile <[email protected]>
1 parent 90c9bd5 commit 33de7df

File tree

1 file changed

+19
-20
lines changed

1 file changed

+19
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import scala.collection.mutable.ArrayBuffer
20+
import scala.collection.mutable.{ArrayBuffer, HashMap}
2121

2222
import org.apache.commons.lang3.StringUtils
2323
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
@@ -167,14 +167,26 @@ case class FileSourceScanExec(
167167
partitionSchema = relation.partitionSchema,
168168
relation.sparkSession.sessionState.conf)
169169

170-
private var fileListingTime = 0L
170+
val driverMetrics: HashMap[String, Long] = HashMap.empty
171+
172+
/**
173+
* Send the driver-side metrics. Before calling this function, selectedPartitions has
174+
* been initialized. See SPARK-26327 for more details.
175+
*/
176+
private def sendDriverMetrics(): Unit = {
177+
driverMetrics.foreach(e => metrics(e._1).add(e._2))
178+
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
179+
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
180+
metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
181+
}
171182

172183
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
173184
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
174185
val startTime = System.nanoTime()
175186
val ret = relation.location.listFiles(partitionFilters, dataFilters)
187+
driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum
176188
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
177-
fileListingTime = timeTakenMs
189+
driverMetrics("metadataTime") = timeTakenMs
178190
ret
179191
}
180192

@@ -286,8 +298,6 @@ case class FileSourceScanExec(
286298
}
287299

288300
private lazy val inputRDD: RDD[InternalRow] = {
289-
// Update metrics for taking effect in both code generation node and normal node.
290-
updateDriverMetrics()
291301
val readFile: (PartitionedFile) => Iterator[InternalRow] =
292302
relation.fileFormat.buildReaderWithPartitionValues(
293303
sparkSession = relation.sparkSession,
@@ -298,12 +308,14 @@ case class FileSourceScanExec(
298308
options = relation.options,
299309
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
300310

301-
relation.bucketSpec match {
311+
val readRDD = relation.bucketSpec match {
302312
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
303313
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
304314
case _ =>
305315
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
306316
}
317+
sendDriverMetrics()
318+
readRDD
307319
}
308320

309321
override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -313,7 +325,7 @@ case class FileSourceScanExec(
313325
override lazy val metrics =
314326
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
315327
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
316-
"fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"),
328+
"metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time"),
317329
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
318330

319331
protected override def doExecute(): RDD[InternalRow] = {
@@ -504,19 +516,6 @@ case class FileSourceScanExec(
504516
}
505517
}
506518

507-
/**
508-
* Send the updated metrics to driver, while this function calling, selectedPartitions has
509-
* been initialized. See SPARK-26327 for more detail.
510-
*/
511-
private def updateDriverMetrics() = {
512-
metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
513-
metrics("fileListingTime").add(fileListingTime)
514-
515-
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
516-
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
517-
metrics("numFiles") :: metrics("fileListingTime") :: Nil)
518-
}
519-
520519
override def doCanonicalize(): FileSourceScanExec = {
521520
FileSourceScanExec(
522521
relation,

0 commit comments

Comments
 (0)