Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.io.{FileNotFoundException, IOException}
import java.io.{Closeable, FileNotFoundException, IOException}

import org.apache.parquet.io.ParquetDecodingException

Expand Down Expand Up @@ -85,6 +85,17 @@ class FileScanRDD(
private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null

private def resetCurrentIterator(): Unit = {
currentIterator match {
case iter: NextIterator[_] =>
iter.closeIfNeeded()
case iter: Closeable =>
iter.close()
case _ => // do nothing
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this happen? Only when currentIterator is null?

Copy link
Contributor Author

@ankurdave ankurdave Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are currently two cases aside from null:

  • OrcFileFormat produces an ordinary non-Closeable Iterator due to unwrapOrcStructs().
  • The user can create a FileScanRDD with an arbitrary readFunction that does not return a Closeable Iterator.

It would be ideal if we could disallow these cases and require the iterator to be Closeable, but it seems that would require changing public APIs.

}
currentIterator = null
}

def hasNext: Boolean = {
// Kill the task in case it has been marked as killed. This logic is from
// InterruptibleIterator, but we inline it here instead of wrapping the iterator in order
Expand Down Expand Up @@ -128,15 +139,21 @@ class FileScanRDD(
// Sets InputFileBlockHolder for the file block's information
InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length)

resetCurrentIterator()
if (ignoreMissingFiles || ignoreCorruptFiles) {
currentIterator = new NextIterator[Object] {
// The readFunction may read some bytes before consuming the iterator, e.g.,
// vectorized Parquet reader. Here we use lazy val to delay the creation of
// iterator so that we will throw exception in `getNext`.
private lazy val internalIter = readCurrentFile()
// vectorized Parquet reader. Here we use a lazily initialized variable to delay the
// creation of iterator so that we will throw exception in `getNext`.
private var internalIter: Iterator[InternalRow] = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm why is this change necessary?

Copy link
Contributor Author

@ankurdave ankurdave Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the downstream operator never pulls any rows from the iterator, then the first time we access internalIter will be when close() is called. If internalIter is a lazy val, this will trigger a call to readCurrentFile(), which is unnecessary and may throw. Changing internalIter from a lazy val to a var lets us avoid this unnecessary call.

Several tests fail without this change, including AvroV1Suite.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks


override def getNext(): AnyRef = {
try {
// Initialize `internalIter` lazily.
if (internalIter == null) {
internalIter = readCurrentFile()
}

if (internalIter.hasNext) {
internalIter.next()
} else {
Expand All @@ -158,7 +175,13 @@ class FileScanRDD(
}
}

override def close(): Unit = {}
override def close(): Unit = {
internalIter match {
case iter: Closeable =>
iter.close()
case _ => // do nothing
}
}
}
} else {
currentIterator = readCurrentFile()
Expand Down Expand Up @@ -188,6 +211,7 @@ class FileScanRDD(
override def close(): Unit = {
incTaskInputMetricsBytesRead()
InputFileBlockHolder.unset()
resetCurrentIterator()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class RecordReaderIterator[T](
rowReader.getCurrentValue
}

override def map[B](f: (T) => B): Iterator[B] with Closeable =
new Iterator[B] with Closeable {
override def hasNext: Boolean = RecordReaderIterator.this.hasNext
override def next(): B = f(RecordReaderIterator.this.next())
override def close(): Unit = RecordReaderIterator.this.close()
}

override def close(): Unit = {
if (rowReader != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,18 +327,31 @@ class ParquetFileFormat
int96RebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
// SPARK-37089: We cannot register a task completion listener to close this iterator here
// because downstream exec nodes have already registered their listeners. Since listeners
// are executed in reverse order of registration, a listener registered here would close the
// iterator while downstream exec nodes are still running. When off-heap column vectors are
// enabled, this can cause a use-after-free bug leading to a segfault.
//
// Instead, we use FileScanRDD's task completion listener to close this iterator.
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
try {
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}

// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: Throwable =>
// SPARK-23457: In case there is an exception in initialization, close the iterator to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we let the caller FileScanRDD close the iterator when hitting errors?

Copy link
Contributor Author

@ankurdave ankurdave Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think FileScanRDD does close the iterator when hitting exceptions, because it uses a task completion listener to do so. The only case where it will not close the iterator is when the exception prevents FileScanRDD from getting a reference to the iterator, as is the case here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see!

// avoid leaking resources.
iter.close()
throw e
}
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
Expand All @@ -354,19 +367,25 @@ class ParquetFileFormat
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)

val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
try {
reader.initialize(split, hadoopAttemptContext)

val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
} catch {
case e: Throwable =>
// SPARK-23457: In case there is an exception in initialization, close the iterator to
// avoid leaking resources.
iter.close()
throw e
}
}
}
Expand Down