diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index b404cfa61f41e..ac062fdc092ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -79,9 +79,10 @@ object OrcUtils extends Logging { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConf() // TODO: We need to support merge schema. Please see SPARK-11412. - files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema => - logDebug(s"Reading schema from file $files, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] + files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { + case Some(schema) => + logDebug(s"Reading schema from file $files, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index f58c331f33ca8..e9dccbf2e261c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest { } } + def testAllCorruptFiles(): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) + spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) + val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) + assert(df.count() == 0) + } + } + + def testAllCorruptFilesWithoutSchemaInfer(): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) + spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) + val df = spark.read.schema("a long").orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) + assert(df.count() == 0) + } + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { testIgnoreCorruptFiles() testIgnoreCorruptFilesWithoutSchemaInfer() + val m1 = intercept[AnalysisException] { + testAllCorruptFiles() + }.getMessage + assert(m1.contains("Unable to infer schema for ORC")) + testAllCorruptFilesWithoutSchemaInfer() } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { val m1 = intercept[SparkException] { testIgnoreCorruptFiles() }.getMessage - assert(m1.contains("Could not read footer for file")) + assert(m1.contains("Malformed ORC file")) val m2 = intercept[SparkException] { testIgnoreCorruptFilesWithoutSchemaInfer() }.getMessage assert(m2.contains("Malformed ORC file")) + val m3 = intercept[SparkException] { + testAllCorruptFiles() + }.getMessage + assert(m3.contains("Could not read footer for file")) + val m4 = intercept[SparkException] { + testAllCorruptFilesWithoutSchemaInfer() + }.getMessage + assert(m4.contains("Malformed ORC file")) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 80e44ca504356..713b70f252b6a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -92,11 +92,12 @@ private[hive] object OrcFileOperator extends Logging { : Option[StructType] = { // Take the first file where we can open a valid reader if we can find one. Otherwise just // return None to indicate we can't infer the schema. - paths.flatMap(getFileReader(_, conf, ignoreCorruptFiles)).headOption.map { reader => - val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] - val schema = readerInspector.getTypeName - logDebug(s"Reading schema from file $paths, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType] + paths.toIterator.map(getFileReader(_, conf, ignoreCorruptFiles)).collectFirst { + case Some(reader) => + val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] + val schema = readerInspector.getTypeName + logDebug(s"Reading schema from file $paths, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType] } }