Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class FileScanRDD(
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
private def updateBytesRead(): Unit = {
private def incTaskInputMetricsBytesRead(): Unit = {
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
}

Expand All @@ -106,15 +106,16 @@ class FileScanRDD(
// don't need to run this `if` for every record.
val preNumRecordsRead = inputMetrics.recordsRead
if (nextElement.isInstanceOf[ColumnarBatch]) {
incTaskInputMetricsBytesRead()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so always update when processing ColumnarBatch, but use the previous logic otherwise. That seems OK. It should still address the original problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... I guess the only possible drawback is that if the number of records in a ColumnarBatch is pretty small, then this could cause it to update bytes read a lot more frequently than before. Bu if the number of records is large (>100) then this won't matter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4096 is the default number of the batch reader in both ORC and Parquet. If the users set the conf to a much smaller number, they will face the perf regression due to the the extra overhead in many places. I do not think end users will do this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. In this case the behavior should be the same before and after this change, but it's therefore fine, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that the default value of "spark.sql.parquet.columnarReaderBatchSize is 4096, this change is better .

inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
} else {
// too costly to update every record
if (inputMetrics.recordsRead %
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
incTaskInputMetricsBytesRead()
}
inputMetrics.incRecordsRead(1)
}
// The records may be incremented by more than 1 at a time.
if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) {
updateBytesRead()
}
nextElement
}

Expand Down Expand Up @@ -201,7 +202,7 @@ class FileScanRDD(
}

override def close(): Unit = {
updateBytesRead()
incTaskInputMetricsBytesRead()
InputFileBlockHolder.unset()
}
}
Expand Down