Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6e41081
[SPARK-4502][SQL] Parquet nested column pruning
Jun 24, 2016
0d0e8a0
Refactor SelectedFieldSuite to make its tests simpler and more
mallman Jun 4, 2018
44e78cb
Remove test "select function over nested data" of unknown origin and
mallman Jun 4, 2018
9488cb5
Improve readability of ParquetSchemaPruning and
mallman Jun 4, 2018
f3735b1
Don't handle non-data-field partition column names specially when
mallman Jun 4, 2018
2120ab5
Add test coverage for ParquetSchemaPruning for partitioned tables whose
mallman Jun 4, 2018
8d53bbd
Remove the ColumnarFileFormat type to put it in another PR
mallman Jun 12, 2018
e213471
Add test coverage for the enhancements to "is not null" constraint
mallman Jun 12, 2018
9e6ef5f
Revert changes to QueryPlanConstraints.scala and basicPhysicalOperato…
mallman Jun 24, 2018
e6ea9c2
Revert a whitespace change in DataSourceScanExec.scala
mallman Jun 24, 2018
2d02ab3
Remove modifications to ParquetFileFormat.scala and
mallman Jul 21, 2018
cfffc95
PR review: simplify some syntax and add a code doc
mallman Jul 21, 2018
2779351
When creating a pruned schema by merging an array of root structs, sort
mallman Jul 28, 2018
9329f77
Re-enable ignored test in ParquetSchemaPruningSuite.scala that is
mallman Aug 4, 2018
ec313c1
Enable schema pruning by default
mallman Aug 4, 2018
42aff39
Revert "Enable schema pruning by default"
mallman Aug 5, 2018
71f4c7b
Add a method to not only check a query's scan schemata, but verify th…
mallman Aug 5, 2018
0e5594b
Revert "Revert "Enable schema pruning by default""
mallman Aug 9, 2018
1573ae8
Revert changes to ParquetTest.scala. I'm sure they were useful at some
mallman Aug 17, 2018
09dd655
Refactor code based on code review comments
ajacques Aug 2, 2018
61c7937
Update terminology for parquet-mr reader in
mallman Aug 20, 2018
97b3a51
Handle differences in letter case in columns and fields between query
mallman Aug 21, 2018
2711746
Add test permutations to the "testMixedCasePruning" method to test
mallman Aug 21, 2018
e6baf68
Disable SQL schema pruning by default and revert changes to
mallman Aug 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Handle differences in letter case in columns and fields between query
projections and filters, and the underlying parquet file schema
  • Loading branch information
mallman committed Aug 21, 2018
commit 97b3a51d478f19890ded73aa78d94c055a9f144c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -45,7 +45,9 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
case op @ PhysicalOperation(projects, filters,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also honestly this logic looks convoluted.

l @ LogicalRelation(hadoopFsRelation: HadoopFsRelation, _, _, _))
if canPruneRelation(hadoopFsRelation) =>
val requestedRootFields = identifyRootFields(projects, filters)
val (normalizedProjects, normalizedFilters) =
normalizeAttributeRefNames(l, projects, filters)
val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters)

// If requestedRootFields includes a nested field, continue. Otherwise,
// return op
Expand All @@ -64,7 +66,8 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
val prunedRelation = buildPrunedRelation(l, prunedParquetRelation)
val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)

buildNewProjection(projects, filters, prunedRelation, projectionOverSchema)
buildNewProjection(normalizedProjects, normalizedFilters, prunedRelation,
projectionOverSchema)
} else {
op
}
Expand All @@ -79,6 +82,27 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
private def canPruneRelation(fsRelation: HadoopFsRelation) =
fsRelation.fileFormat.isInstanceOf[ParquetFileFormat]

/**
* Normalizes the names of the attribute references in the given projects and filters to reflect
* the names in the given logical relation. This makes it possible to compare attributes and
* fields by name. Returns a tuple with the normalized projects and filters, respectively.
*/
private def normalizeAttributeRefNames(
logicalRelation: LogicalRelation,
projects: Seq[NamedExpression],
filters: Seq[Expression]): (Seq[NamedExpression], Seq[Expression]) = {
val normalizedAttNameMap = logicalRelation.output.map(att => (att.exprId, att.name)).toMap
val normalizedProjects = projects.map(_.transform {
case att: AttributeReference if normalizedAttNameMap.contains(att.exprId) =>
att.withName(normalizedAttNameMap(att.exprId))
}).map { case expr: NamedExpression => expr }
val normalizedFilters = filters.map(_.transform {
case att: AttributeReference if normalizedAttNameMap.contains(att.exprId) =>
att.withName(normalizedAttNameMap(att.exprId))
})
(normalizedProjects, normalizedFilters)
}

/**
* Returns the set of fields from the Parquet file that the query plan needs.
*/
Expand Down Expand Up @@ -142,23 +166,27 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
sortLeftFieldsByRight(mergedDataSchema, fileDataSchema).asInstanceOf[StructType]
}

/**
* Builds a pruned logical relation from the output of the output relation and the schema of the
* pruned base relation.
*/
private def buildPrunedRelation(
outputRelation: LogicalRelation,
parquetRelation: HadoopFsRelation) = {
prunedBaseRelation: HadoopFsRelation) = {
// We need to replace the expression ids of the pruned relation output attributes
// with the expression ids of the original relation output attributes so that
// references to the original relation's output are not broken
val outputIdMap = outputRelation.output.map(att => (att.name, att.exprId)).toMap
val prunedRelationOutput =
parquetRelation
prunedBaseRelation
.schema
.toAttributes
.map {
case att if outputIdMap.contains(att.name) =>
att.withExprId(outputIdMap(att.name))
case att => att
}
outputRelation.copy(relation = parquetRelation, output = prunedRelationOutput)
outputRelation.copy(relation = prunedBaseRelation, output = prunedRelationOutput)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,69 @@ class ParquetSchemaPruningSuite
}
}

case class MixedCaseColumn(a: String, B: Int)
case class MixedCase(id: Int, CoL1: String, coL2: MixedCaseColumn)

private val mixedCaseData =
MixedCase(0, "r0c1", MixedCaseColumn("abc", 1)) ::
MixedCase(1, "r1c1", MixedCaseColumn("123", 2)) ::
Nil

testMixedCasePruning("select with exact column names") {
val query = sql("select CoL1, coL2.B from mixedcase")
checkScan(query, "struct<CoL1:string,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"),
Row("r0c1", 1) ::
Row("r1c1", 2) ::
Nil)
}

testMixedCasePruning("select with lowercase column names") {
val query = sql("select col1, col2.b from mixedcase")
checkScan(query, "struct<CoL1:string,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"),
Row("r0c1", 1) ::
Row("r1c1", 2) ::
Nil)
}

testMixedCasePruning("select with different-case column names") {
val query = sql("select cOL1, cOl2.b from mixedcase")
checkScan(query, "struct<CoL1:string,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"),
Row("r0c1", 1) ::
Row("r1c1", 2) ::
Nil)
}

testMixedCasePruning("filter with different-case column names") {
val query = sql("select id from mixedcase where Col2.b = 2")
// Pruning with filters is currently unsupported. As-is, the file reader will read the id column
// and the entire coL2 struct. Once pruning with filters has been implemented we can uncomment
// this line
// checkScan(query, "struct<id:int,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"), Row(1) :: Nil)
}

private def testMixedCasePruning(testName: String)(testThunk: => Unit) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
test(s"Spark vectorized reader - mixed-case schema - $testName") {
withMixedCaseData(testThunk)
}
}
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
test(s"Parquet-mr reader - mixed-case schema - $testName") {
withMixedCaseData(testThunk)
}
}
}

private def withMixedCaseData(testThunk: => Unit) {
withParquetTable(mixedCaseData, "mixedcase") {
testThunk
}
}

private val schemaEquality = new Equality[StructType] {
override def areEqual(a: StructType, b: Any): Boolean =
b match {
Expand Down