Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ object OrcUtils extends Logging {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConf()
// TODO: We need to support merge schema. Please see SPARK-11412.
files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a behavior change.

Previously if there are corrupt files, once SQLConf.IGNORE_CORRUPT_FILES is false, Orc source will throw exception when reading those files.

Now if Orc source reads the first valid schema, it doesn't read other Orc files further. So the corrupt files are ignored when SQLConf.IGNORE_CORRUPT_FILES is false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think we have to create a reader for each file when implementing schema merging like parquet, right?

Copy link
Member

@viirya viirya Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think so. But in Parquet, schema merging is done in parallel. So it won't create all readers at one place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya . The corrupt files are not ignored. Spark will throw SparkException while reading the content.

Now if Orc source reads the first valid schema, it doesn't read other Orc files further. So the corrupt files are ignored when SQLConf.IGNORE_CORRUPT_FILES is false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is only ignored during reading schema.

The change is the timing when the corrupt files are detected. Now it is postponed to actually reading file contents.

That might not be a big deal, though in user experience it is better to throw such exception early.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest {
}
}

def testAllCorruptFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString)
val df = spark.read.orc(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString)
assert(df.count() == 0)
}
}

def testAllCorruptFilesWithoutSchemaInfer(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString)
val df = spark.read.schema("a long").orc(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString)
assert(df.count() == 0)
}
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
testIgnoreCorruptFiles()
testIgnoreCorruptFilesWithoutSchemaInfer()
val m1 = intercept[AnalysisException] {
testAllCorruptFiles()
}.getMessage
assert(m1.contains("Unable to infer schema for ORC"))
testAllCorruptFilesWithoutSchemaInfer()
}

withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val m1 = intercept[SparkException] {
testIgnoreCorruptFiles()
}.getMessage
assert(m1.contains("Could not read footer for file"))
assert(m1.contains("Malformed ORC file"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the error message changed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is because of the behavior change #22157 (comment).

Previously Orc source reads the third file which is corrupt and throws the exception of could not read footer for file.

Now Orc source reads the first file for valid schema and skips other two files. When Orc source uses the schema to read the second Orc file, the schema is not consistent, so the exception of Malformed ORC file is thrown.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we don't backport it ... then I think it's fine. I sounds rather a bug to read and validate all schemas (which is inconsistent with Parquet) where we only needs to pick up single file. I don't think we make a guarantee about the pinking order.

The possible behaviour change is when only read its schema. Previous code would throw an exception but after this PR it wouldn't.

The previous behaviour is something we should expect when mergeSchema option is implemented within ORC side as you guys talked below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this take

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It's reasonable.

val m2 = intercept[SparkException] {
testIgnoreCorruptFilesWithoutSchemaInfer()
}.getMessage
assert(m2.contains("Malformed ORC file"))
val m3 = intercept[SparkException] {
testAllCorruptFiles()
}.getMessage
assert(m3.contains("Could not read footer for file"))
val m4 = intercept[SparkException] {
testAllCorruptFilesWithoutSchemaInfer()
}.getMessage
assert(m4.contains("Malformed ORC file"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ private[hive] object OrcFileOperator extends Logging {
: Option[StructType] = {
// Take the first file where we can open a valid reader if we can find one. Otherwise just
// return None to indicate we can't infer the schema.
paths.flatMap(getFileReader(_, conf, ignoreCorruptFiles)).headOption.map { reader =>
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $paths, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType]
paths.toIterator.map(getFileReader(_, conf, ignoreCorruptFiles)).collectFirst {
case Some(reader) =>
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $paths, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType]
}
}

Expand Down