Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
Expand Down Expand Up @@ -167,14 +167,26 @@ case class FileSourceScanExec(
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)

private var fileListingTime = 0L
val driverMetrics: HashMap[String, Long] = HashMap.empty

/**
* Send the driver-side metrics. Before calling this function, selectedPartitions has
* been initialized. See SPARK-26327 for more details.
*/
private def sendDriverMetrics(): Unit = {
driverMetrics.foreach(e => metrics(e._1).add(e._2))
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
}

@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
driverMetrics("filesNum") = ret.map(_.files.size.toLong).sum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @gatorsmile . It looks like a typo of numFiles.

val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
fileListingTime = timeTakenMs
driverMetrics("metadataTime") = timeTakenMs
ret
}

Expand Down Expand Up @@ -286,8 +298,6 @@ case class FileSourceScanExec(
}

private lazy val inputRDD: RDD[InternalRow] = {
// Update metrics for taking effect in both code generation node and normal node.
updateDriverMetrics()
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
Expand All @@ -298,12 +308,14 @@ case class FileSourceScanExec(
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

relation.bucketSpec match {
val readRDD = relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
sendDriverMetrics()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 313 and line 315, both are calling selectedPartitions. Thus, it is safer to say selectedPartitions is initialized before we send the driver-side metrics

readRDD
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
Expand All @@ -313,7 +325,7 @@ case class FileSourceScanExec(
override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
"fileListingTime" -> SQLMetrics.createMetric(sparkContext, "file listing time (ms)"),
"metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time"),
Copy link
Member Author

@gatorsmile gatorsmile Dec 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original name is more straightfoward to end users who has no idea about file listing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we will add more metadata operation and reuse this metrics. Anyway it was not a good idea to do the renaming in a bug fix PR. @xuanyuanking can you create a ticket and send a new PR for renaming?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy that, original thinking is this bug fix is part of https://issues.apache.org/jira/browse/SPARK-26222.

"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -504,19 +516,6 @@ case class FileSourceScanExec(
}
}

/**
* Send the updated metrics to driver, while this function calling, selectedPartitions has
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong.

* been initialized. See SPARK-26327 for more detail.
*/
private def updateDriverMetrics() = {
metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
metrics("fileListingTime").add(fileListingTime)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics("numFiles") :: metrics("fileListingTime") :: Nil)
}

override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
Expand Down