Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Moving the ignoreExtension to AvroOptions
  • Loading branch information
MaxGekk committed Jul 19, 2018
commit 3bd3475f1b4ca67b3f98c25a480c670f4fbb7a43
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Schema evolution is not supported yet. Here we only pick a single random sample file to
// figure out the schema of the whole dataset.
val sampleFile =
if (AvroFileFormat.ignoreExtension(conf, options)) {
if (AvroFileFormat.ignoreExtension(conf, parsedOptions)) {
files.headOption.getOrElse {
throw new FileNotFoundException("Files for schema inferring have been not found.")
}
Expand Down Expand Up @@ -169,7 +169,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Doing input file filtering is improper because we may generate empty tasks that process no
// input files but stress the scheduler. We should probably add a more general input file
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
if (AvroFileFormat.ignoreExtension(conf, options) || file.filePath.endsWith(".avro")) {
if (AvroFileFormat.ignoreExtension(conf, parsedOptions) || file.filePath.endsWith(".avro")) {
val reader = {
val in = new FsInput(new Path(new URI(file.filePath)), conf)
try {
Expand Down Expand Up @@ -273,15 +273,12 @@ private[avro] object AvroFileFormat {
}
}

def ignoreExtension(conf: Configuration, options: Map[String, String]): Boolean = {
def ignoreExtension(conf: Configuration, options: AvroOptions): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to object AvroOptions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a comment above this function to describe how we determine it?

val ignoreFilesWithoutExtensionByDefault = false
val ignoreFilesWithoutExtension = conf.getBoolean(
AvroFileFormat.IgnoreFilesWithoutExtensionProperty,
ignoreFilesWithoutExtensionByDefault)

options
.get("ignoreExtension")
.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
options.ignoreExtension.getOrElse(!ignoreFilesWithoutExtension)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,12 @@ class AvroOptions(@transient val parameters: CaseInsensitiveMap[String])
* See Avro spec for details: https://avro.apache.org/docs/1.8.2/spec.html#schema_record .
*/
val recordNamespace: String = parameters.getOrElse("recordNamespace", "")

/**
* The option controls ignoring of files without `.avro` extensions in read.
* If the option is enabled, all files (with and without `.avro` extension) are loaded.
* If the option is not set, the Hadoop's config `avro.mapred.ignore.inputs.without.extension`
* is taken into account. If the former one is not set too, file extensions are ignored.
*/
val ignoreExtension: Option[Boolean] = parameters.get("ignoreExtension").map(_.toBoolean)
}