Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use pushedFilters in ParquetPartitionReaderFactory
  • Loading branch information
gengliangwang committed Jun 7, 2019
commit b530e5f15a9698b28b76ff49f3dae45264386cc7
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.util.SerializableConfiguration
* @param dataSchema Schema of Parquet files.
* @param readDataSchema Required schema of Parquet files.
* @param partitionSchema Schema of partitions.
* @param filters Filters of the batch scan.
* @param filters Filters to be pushed down in the batch scan.
*/
case class ParquetPartitionReaderFactory(
sqlConf: SQLConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ case class ParquetScan(
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
filters: Array[Filter],
pushedFilters: Array[Filter],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pushedFilters is added here just for end-to-end test. Not sure if this is appropriate.

options: CaseInsensitiveStringMap)
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
Expand Down Expand Up @@ -77,14 +76,14 @@ case class ParquetScan(
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
ParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, filters)
dataSchema, readDataSchema, readPartitionSchema, pushedFilters)
}

override def equals(obj: Any): Boolean = obj match {
case p: ParquetScan =>
fileIndex == p.fileIndex && dataSchema == p.dataSchema &&
readDataSchema == p.readDataSchema && readPartitionSchema == p.readPartitionSchema &&
options == p.options && equivalentFilters(filters, p.filters)
options == p.options && equivalentFilters(pushedFilters, p.pushedFilters)
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ case class ParquetScanBuilder(

override def build(): Scan = {
ParquetScan(sparkSession, hadoopConf, fileIndex, dataSchema, readDataSchema(),
readPartitionSchema(), filters, _pushedFilters, options)
readPartitionSchema(), _pushedFilters, options)
}
}