diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index c0fa4e777b49..322ffffca564 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} @@ -167,14 +167,26 @@ case class FileSourceScanExec( partitionSchema = relation.partitionSchema, relation.sparkSession.sessionState.conf) - private var fileListingTime = 0L + val driverMetrics: HashMap[String, Long] = HashMap.empty + + /** + * Send the driver-side metrics. Before calling this function, selectedPartitions has + * been initialized. See SPARK-26327 for more details. + */ + private def sendDriverMetrics(): Unit = { + driverMetrics.foreach(e => metrics(e._1).add(e._2)) + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, + metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq) + } @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) + driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 - fileListingTime = timeTakenMs + driverMetrics("metadataTime") = timeTakenMs ret } @@ -286,8 +298,6 @@ case class FileSourceScanExec( } private lazy val inputRDD: RDD[InternalRow] = { - // Update metrics for taking effect in both code generation node and normal node. - updateDriverMetrics() val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -298,12 +308,14 @@ case class FileSourceScanExec( options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) - relation.bucketSpec match { + val readRDD = relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) case _ => createNonBucketedReadRDD(readFile, selectedPartitions, relation) } + sendDriverMetrics() + readRDD } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -313,7 +325,7 @@ case class FileSourceScanExec( override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"), - "fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"), + "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { @@ -504,19 +516,6 @@ case class FileSourceScanExec( } } - /** - * Send the updated metrics to driver, while this function calling, selectedPartitions has - * been initialized. See SPARK-26327 for more detail. - */ - private def updateDriverMetrics() = { - metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum) - metrics("fileListingTime").add(fileListingTime) - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - metrics("numFiles") :: metrics("fileListingTime") :: Nil) - } - override def doCanonicalize(): FileSourceScanExec = { FileSourceScanExec( relation,