Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Output log warning
  • Loading branch information
MaxGekk committed Jan 11, 2020
commit 4e1065a3c877b301fce7a5686bd0fc7597e01963
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class AvroOptions(
*/
@deprecated("Use the general data source option pathGlobFilter for filtering file names", "3.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this if it's really deprecated? I get that it will remove some compiler warnings, but, that's not super important, or can be worked around as you do elsewhere by deprecating the test methods too?

val ignoreExtension: Boolean = {
def warn(s: String): Unit = logWarning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we define a separate method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, to reuse the same code in 2 places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly but I think it's fine to don't do it ...

s"$s is deprecated, and it will be not use by Avro datasource in the future releases. " +
"Use the general data source option pathGlobFilter for filtering file names.")
val ignoreFilesWithoutExtensionByDefault = false
val ignoreFilesWithoutExtension = conf.getBoolean(
AvroFileFormat.IgnoreFilesWithoutExtensionProperty,
Expand All @@ -78,7 +81,17 @@ class AvroOptions(
parameters
.get(AvroOptions.ignoreExtensionKey)
.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
.map { ignoreExtensionOption =>
if (ignoreExtensionOption != !ignoreFilesWithoutExtensionByDefault) {
warn(s"The Avro option '${AvroOptions.ignoreExtensionKey}'")
Copy link
Member

@HyukjinKwon HyukjinKwon Jan 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, from a cursory look, this warning can be shown for every file which I think is noisy:

abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
assert(partition.isInstanceOf[FilePartition])
val filePartition = partition.asInstanceOf[FilePartition]
val iter = filePartition.files.toIterator.map { file =>
PartitionedFileReader(file, buildReader(file))
}

Do you mind if I ask double check this?

Copy link
Member Author

@MaxGekk MaxGekk Jan 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I will check that but general thoughts are:

  1. The log warning is printed only if an user sets non-default config values
  2. I don't think AvroOptions should be created (initialized from scratch) per-each file if it is created in current implementation. I would say it is not necessary to initialize AvroOptions again and again. After all, AvroOptions should be the same for all files/partitions.
  3. And the noise in logs will force people to avoid using of the deprecated options ;-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon you are right, it prints warnings per each partition. I have confirmed that by the test:

  test("count deprecation log events") {
    val partitionNum = 3
    val logAppender = new AppenderSkeleton {
      val loggingEvents = new ArrayBuffer[LoggingEvent]()

      override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent)
      override def close(): Unit = {}
      override def requiresLayout(): Boolean = false
    }
    withTempPath { dir =>
      Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1), ("d", 2, 1))
        .toDF("value", "p1", "p2")
        .repartition(partitionNum)
        .write
        .format("avro")
        .option("header", true)
        .save(dir.getCanonicalPath)
      withLogAppender(logAppender) {
        val df = spark
          .read
          .format("avro")
          .schema("value STRING, p1 INTEGER, p2 INTEGER")
          .option(AvroOptions.ignoreExtensionKey, false)
          .option("header", true)
          .load(dir.getCanonicalPath)
        df.count()
      }
      val deprecatedEvents = logAppender.loggingEvents
        .map(_.getRenderedMessage)
        .filter(_.contains(AvroOptions.ignoreExtensionKey))
      assert(deprecatedEvents.size === partitionNum)
    }
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I moved instantiation of AvroOptions out of buildReader():
Screen Shot 2020-01-12 at 20 14 06
The warning is printed always 2 times. It means AvroPartitionReaderFactory is constructed twice.
And both times from

override def supportsColumnar: Boolean = {
require(partitions.forall(readerFactory.supportColumnarReads) ||
!partitions.exists(readerFactory.supportColumnarReads),
"Cannot mix row-based and columnar input partitions.")
partitions.exists(readerFactory.supportColumnarReads)
}
which is called:
First time from
Screen Shot 2020-01-12 at 20 32 02
The second time from:
Screen Shot 2020-01-12 at 20 33 36

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is interesting that rewriting supportsColumnar as:

  override val supportsColumnar: Boolean = {
    val factory = readerFactory
    require(partitions.forall(factory.supportColumnarReads) ||
      !partitions.exists(factory.supportColumnarReads),
      "Cannot mix row-based and columnar input partitions.")

    partitions.exists(factory.supportColumnarReads)
  }

does not help too because DataSourceV2ScanExecBase is initialized twice from:
First time:
Screen Shot 2020-01-12 at 20 52 49
Second time in TreeNode.makeCopy:
Screen Shot 2020-01-12 at 21 06 40
Making supportsColumnar as lazy val doesn't help as well because supportsColumnar is invoked twice for different objects.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not nice that we construct some classes twice when it is not necessary. WDYT? /cc @cloud-fan @dongjoon-hyun

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we shouldn't instantiate twice, but not a big problem. I'm more worried about we instantiate it for every partition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, even if we fix this, it will still show the warning twice for schema inference and reading path at the very least. It's okay as long as we show the warning and document. Let's just go simple in this PR. This warning will be removed very soon, too.

}
ignoreExtensionOption
}.getOrElse {
if (ignoreFilesWithoutExtension != ignoreFilesWithoutExtensionByDefault) {
warn(s"The Hadoop's config '${AvroFileFormat.IgnoreFilesWithoutExtensionProperty}'")
}
!ignoreFilesWithoutExtension
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import java.sql.{Date, Timestamp}
import java.util.{Locale, TimeZone, UUID}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.language.reflectiveCalls

import org.apache.avro.Schema
import org.apache.avro.Schema.{Field, Type}
Expand All @@ -32,6 +34,8 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils
import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
Expand Down Expand Up @@ -1497,6 +1501,35 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
|}
""".stripMargin)
}


test("deprecation warning for ignoreExtension") {
val logAppender = new AppenderSkeleton {
val loggingEvents = new ArrayBuffer[LoggingEvent]()

override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent)
override def close(): Unit = {}
override def requiresLayout(): Boolean = false
}
def check(key: String): Unit = {
assert(logAppender.loggingEvents.exists(
_.getRenderedMessage.contains(s"'$key' is deprecated")))
}

withLogAppender(logAppender) {
withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") {
spark.read.format("avro").load(testAvro).collect()
}
}
check(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)

withLogAppender(logAppender) {
spark.read
.option(AvroOptions.ignoreExtensionKey, false)
.format("avro").load(testAvro).collect()
}
check(AvroOptions.ignoreExtensionKey)
}
}

class AvroV1Suite extends AvroSuite {
Expand Down