Skip to content
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ class OrcFileFormat
if (enableVectorizedReader) {
val batchReader = new OrcColumnarBatchReader(
enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity)
val iter = new RecordReaderIterator(batchReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add comment why we put this registration here with SPARK-23399. Since we would forget this investigation in the future :), this comment will help us and will remind to run the test case manually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!


batchReader.initialize(fileSplit, taskAttemptContext)
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Feb 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the reported case, the ORC file is opened here.
But, it seems that the task is killed, TaskKilled (Stage cancelled), during initBatch before registering its listener . For a case throwing Exception at initBatch, this PR prevents the open file leakage.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan and @gatorsmile . Could you take a look this?
For ORC library, it looks okay when we call close correctly.

Copy link
Member

@viirya viirya Feb 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Thanks for this fix! My question is how do we know if close is not called before and is called now? Have you verified it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I tried to verify it manually in local, seems close is called before this change. Maybe I miss something or this is environment depending.

batchReader.initBatch(
reader.getSchema,
Expand All @@ -196,8 +199,6 @@ class OrcFileFormat
partitionSchema,
file.partitionValues)

val iter = new RecordReaderIterator(batchReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
iter.asInstanceOf[Iterator[InternalRow]]
} else {
val orcRecordReader = new OrcInputFormat[OrcStruct]
Expand Down