diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 0397d15aed924..8230dbaf8ea6c 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.SerializableConfiguration * @param dataSchema Schema of AVRO files. * @param readDataSchema Required data schema of AVRO files. * @param partitionSchema Schema of partitions. - * @param options Options for parsing AVRO files. + * @param parsedOptions Options for parsing AVRO files. */ case class AvroPartitionReaderFactory( sqlConf: SQLConf, @@ -54,11 +54,10 @@ case class AvroPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - options: Map[String, String]) extends FilePartitionReaderFactory with Logging { + parsedOptions: AvroOptions) extends FilePartitionReaderFactory with Logging { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value - val parsedOptions = new AvroOptions(options, conf) val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse) if (parsedOptions.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) { diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala index bb840e69d99a3..d5a29124a276e 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.AvroOptions import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex @@ -45,10 +46,11 @@ case class AvroScan( val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) + val parsedOptions = new AvroOptions(caseSensitiveMap, hadoopConf) // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. AvroPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, caseSensitiveMap) + dataSchema, readDataSchema, readPartitionSchema, parsedOptions) } override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan =