Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Schema evolution is not supported yet. Here we only pick a single random sample file to
// figure out the schema of the whole dataset.
val sampleFile =
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) {
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
throw new FileNotFoundException(
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
" is set to true. Do all input files have \".avro\" extension?"
)
if (AvroFileFormat.ignoreExtension(conf, options)) {
files.headOption.getOrElse {
throw new FileNotFoundException("Files for schema inferring have been not found.")
}
} else {
files.headOption.getOrElse {
throw new FileNotFoundException("No Avro files found.")
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
throw new FileNotFoundException(
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
}
}

Expand Down Expand Up @@ -170,9 +168,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Doing input file filtering is improper because we may generate empty tasks that process no
// input files but stress the scheduler. We should probably add a more general input file
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) {
Iterator.empty
} else {
if (AvroFileFormat.ignoreExtension(conf, options) || file.filePath.endsWith(".avro")) {
val reader = {
val in = new FsInput(new Path(new URI(file.filePath)), conf)
try {
Expand Down Expand Up @@ -227,6 +223,8 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
deserializer.deserialize(record).asInstanceOf[InternalRow]
}
}
} else {
Iterator.empty
}
}
}
Expand Down Expand Up @@ -276,10 +274,15 @@ private[avro] object AvroFileFormat {
}
}

def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
// Files without .avro extensions are not ignored by default
val defaultValue = false
def ignoreExtension(conf: Configuration, options: Map[String, String]): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a class AvroOptions like what we are doing for the other built-in data sources?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like to see it as a part of this PR or a separate one? I would extract some common code like getBool() from CSVOptions to a separate trait and extend AvroOptions by it.

val ignoreFilesWithoutExtensionByDefault = false
val ignoreFilesWithoutExtension = conf.getBoolean(
AvroFileFormat.IgnoreFilesWithoutExtensionProperty,
ignoreFilesWithoutExtensionByDefault)

conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue)
options
.get("ignoreExtension")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we document this option somewhere?

Copy link
Member Author

@MaxGekk MaxGekk Jul 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I am going to add the AvroOptions class and document all Avro options there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we describe that in a public API later.

.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,21 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
spark.read.avro(dir.toString)
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) }
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
}
}
}

intercept[FileNotFoundException] {
withTempPath { dir =>
FileUtils.touch(new File(dir, "test"))

spark
.read
.option("ignoreExtension", false)
.avro(dir.toString)
}
}
}

test("SQL test insert overwrite") {
Expand Down Expand Up @@ -700,7 +711,6 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
}

assert(count == 8)
}
}
Expand Down Expand Up @@ -836,4 +846,45 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(df2.count == 8)
}
}

test("SPARK-24836: checking the ignoreExtension option") {
withTempPath { tempDir =>
val df = spark.read.avro(episodesAvro)
assert(df.count == 8)

val tempSaveDir = s"$tempDir/save/"
df.write.avro(tempSaveDir)

Files.createFile(new File(tempSaveDir, "non-avro").toPath)

val newDf = spark
.read
.option("ignoreExtension", false)
.avro(tempSaveDir)

assert(newDf.count == 8)
}
}

test("SPARK-24836: ignoreExtension must override hadoop's config") {
withTempDir { dir =>
Files.copy(
Paths.get(new URL(episodesAvro).toURI),
Paths.get(dir.getCanonicalPath, "episodes"))

val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
val count = try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
val newDf = spark
.read
.option("ignoreExtension", "true")
.avro(s"${dir.getCanonicalPath}/episodes")
newDf.count()
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
}

assert(count == 8)
}
}
}