Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ class OrcFileFormat
if (enableVectorizedReader) {
val batchReader = new OrcColumnarBatchReader(
enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity)
val iter = new RecordReaderIterator(batchReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add comment why we put this registration here with SPARK-23399. Since we would forget this investigation in the future :), this comment will help us and will remind to run the test case manually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!


batchReader.initialize(fileSplit, taskAttemptContext)
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Feb 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the reported case, the ORC file is opened here.
But, it seems that the task is killed, TaskKilled (Stage cancelled), during initBatch before registering its listener . For a case throwing Exception at initBatch, this PR prevents the open file leakage.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan and @gatorsmile . Could you take a look this?
For ORC library, it looks okay when we call close correctly.

Copy link
Member

@viirya viirya Feb 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Thanks for this fix! My question is how do we know if close is not called before and is called now? Have you verified it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I tried to verify it manually in local, seems close is called before this change. Maybe I miss something or this is environment depending.

batchReader.initBatch(
reader.getSchema,
Expand All @@ -196,8 +199,6 @@ class OrcFileFormat
partitionSchema,
file.partitionValues)

val iter = new RecordReaderIterator(batchReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
iter.asInstanceOf[Iterator[InternalRow]]
} else {
val orcRecordReader = new OrcInputFormat[OrcStruct]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.datasources.orc
import java.io.File
import java.util.Locale

import org.apache.hadoop.fs.Path
import org.apache.orc.OrcConf.COMPRESS
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkException
import org.apache.spark.sql.Row
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -160,6 +162,25 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}
}

// This should be tested manually because it raises OOM intentionally
// in order to cause `Leaked filesystem connection`. The test suite dies, too.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, nice trick to fail the reader midway!

But it's a little weird to have it as a unit test, shall we just put it in the PR description and say it's manually tested? This test needs to be run manually anyway...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

ignore("SPARK-23399 Register a task completion listner first for OrcColumnarBatchReader") {
withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
Seq(0).toDF("a").write.format("orc").save(new Path(basePath, "first").toString)
Seq(1).toDF("a").write.format("orc").save(new Path(basePath, "second").toString)
val df = spark.read.orc(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString)
val e = intercept[SparkException] {
df.collect()
}
assert(e.getCause.isInstanceOf[OutOfMemoryError])
}
}
}
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Feb 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, All.
The above test case generates the same leakage reported in JIRA.
And, this PR fixes that. Please try this test case in IntelliJ with the master branch.

}

class OrcSourceSuite extends OrcSuite with SharedSQLContext {
Expand Down