Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add a manual test case.
  • Loading branch information
dongjoon-hyun committed Feb 13, 2018
commit 3b8cb0a1def32924afd3e4b9e4fc702e1d53d36a
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.datasources.orc
import java.io.File
import java.util.Locale

import org.apache.hadoop.fs.Path
import org.apache.orc.OrcConf.COMPRESS
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkException
import org.apache.spark.sql.Row
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -160,6 +162,25 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}
}

// This should be tested manually because it raises OOM intentionally
// in order to cause `Leaked filesystem connection`. The test suite dies, too.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, nice trick to fail the reader midway!

But it's a little weird to have it as a unit test, shall we just put it in the PR description and say it's manually tested? This test needs to be run manually anyway...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

ignore("SPARK-23399 Register a task completion listner first for OrcColumnarBatchReader") {
withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
Seq(0).toDF("a").write.format("orc").save(new Path(basePath, "first").toString)
Seq(1).toDF("a").write.format("orc").save(new Path(basePath, "second").toString)
val df = spark.read.orc(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString)
val e = intercept[SparkException] {
df.collect()
}
assert(e.getCause.isInstanceOf[OutOfMemoryError])
}
}
}
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Feb 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, All.
The above test case generates the same leakage reported in JIRA.
And, this PR fixes that. Please try this test case in IntelliJ with the master branch.

}

class OrcSourceSuite extends OrcSuite with SharedSQLContext {
Expand Down