Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6e41081
[SPARK-4502][SQL] Parquet nested column pruning
Jun 24, 2016
0d0e8a0
Refactor SelectedFieldSuite to make its tests simpler and more
mallman Jun 4, 2018
44e78cb
Remove test "select function over nested data" of unknown origin and
mallman Jun 4, 2018
9488cb5
Improve readability of ParquetSchemaPruning and
mallman Jun 4, 2018
f3735b1
Don't handle non-data-field partition column names specially when
mallman Jun 4, 2018
2120ab5
Add test coverage for ParquetSchemaPruning for partitioned tables whose
mallman Jun 4, 2018
8d53bbd
Remove the ColumnarFileFormat type to put it in another PR
mallman Jun 12, 2018
e213471
Add test coverage for the enhancements to "is not null" constraint
mallman Jun 12, 2018
9e6ef5f
Revert changes to QueryPlanConstraints.scala and basicPhysicalOperato…
mallman Jun 24, 2018
e6ea9c2
Revert a whitespace change in DataSourceScanExec.scala
mallman Jun 24, 2018
2d02ab3
Remove modifications to ParquetFileFormat.scala and
mallman Jul 21, 2018
cfffc95
PR review: simplify some syntax and add a code doc
mallman Jul 21, 2018
2779351
When creating a pruned schema by merging an array of root structs, sort
mallman Jul 28, 2018
9329f77
Re-enable ignored test in ParquetSchemaPruningSuite.scala that is
mallman Aug 4, 2018
ec313c1
Enable schema pruning by default
mallman Aug 4, 2018
42aff39
Revert "Enable schema pruning by default"
mallman Aug 5, 2018
71f4c7b
Add a method to not only check a query's scan schemata, but verify th…
mallman Aug 5, 2018
0e5594b
Revert "Revert "Enable schema pruning by default""
mallman Aug 9, 2018
1573ae8
Revert changes to ParquetTest.scala. I'm sure they were useful at some
mallman Aug 17, 2018
09dd655
Refactor code based on code review comments
ajacques Aug 2, 2018
61c7937
Update terminology for parquet-mr reader in
mallman Aug 20, 2018
97b3a51
Handle differences in letter case in columns and fields between query
mallman Aug 21, 2018
2711746
Add test permutations to the "testMixedCasePruning" method to test
mallman Aug 21, 2018
e6baf68
Disable SQL schema pruning by default and revert changes to
mallman Aug 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-4502][SQL] Parquet nested column pruning
  • Loading branch information
Michael Allman authored and mallman committed Aug 9, 2018
commit 6e41081e98be40fde7f9099670c4ef3a84531cf0
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.planning

import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField}
import org.apache.spark.sql.types.StructField

/**
* A Scala extractor that extracts the child expression and struct field from a [[GetStructField]].
* This is in contrast to the [[GetStructField]] case class extractor which returns the field
* ordinal instead of the field itself.
*/
private[planning] object GetStructFieldObject {
def unapply(getStructField: GetStructField): Option[(Expression, StructField)] =
Some((
getStructField.child,
getStructField.childSchema(getStructField.ordinal)))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.planning

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

/**
* A Scala extractor that projects an expression over a given schema. Data types,
* field indexes and field counts of complex type extractors and attributes
* are adjusted to fit the schema. All other expressions are left as-is. This
* class is motivated by columnar nested schema pruning.
*/
case class ProjectionOverSchema(schema: StructType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still looks weird that we place this under catalyst since we currently only use it under execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. So...

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move it to execution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this to sql.execution if we move all three classes: ProjectionOverSchema, GetStructFieldObject, and SelectedField. Is there a difference in the catalyst.planning vs the execution packages?

private val fieldNames = schema.fieldNames.toSet

def unapply(expr: Expression): Option[Expression] = getProjection(expr)

private def getProjection(expr: Expression): Option[Expression] =
expr match {
case a @ AttributeReference(name, _, _, _) if (fieldNames.contains(name)) =>
Some(a.copy(dataType = schema(name).dataType)(a.exprId, a.qualifier))
case GetArrayItem(child, arrayItemOrdinal) =>
getProjection(child).map {
case projection =>
GetArrayItem(projection, arrayItemOrdinal)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

getProjection(child).map { projection => GetArrayItem(projection, arrayItemOrdinal) }

}
case GetArrayStructFields(child, StructField(name, _, _, _), _, numFields, containsNull) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid such pattern StructField(name, _, _, _), _, to be honest when they look excessive (per https://github.com/databricks/scala-style-guide#pattern-matching)

getProjection(child).map(p => (p, p.dataType)).map {
case (projection, ArrayType(projSchema @ StructType(_), _)) =>
GetArrayStructFields(projection,
projSchema(name), projSchema.fieldIndex(name), projSchema.size, containsNull)
}
case GetMapValue(child, key) =>
getProjection(child).map {
case projection =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

getProjection(child).map { projection => GetMapValue(projection, key) }

GetMapValue(projection, key)
}
case GetStructFieldObject(child, StructField(name, _, _, _)) =>
getProjection(child).map(p => (p, p.dataType)).map {
case (projection, projSchema @ StructType(_)) =>
GetStructField(projection, projSchema.fieldIndex(name))
}
case _ =>
None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.planning

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

/**
* A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst
* complex type extractor. For example, consider a relation with the following schema:
*
* {{{
* root
* |-- name: struct (nullable = true)
* | |-- first: string (nullable = true)
* | |-- last: string (nullable = true)
* }}}
*
* Further, suppose we take the select expression `name.first`. This will parse into an
* `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern:
*
* {{{
* GetStructFieldObject(
* AttributeReference("name", StructType(_), _, _),
* StructField("first", StringType, _, _))
* }}}
*
* [[SelectedField]] converts that expression into
*
* {{{
* StructField("name", StructType(Array(StructField("first", StringType))))
* }}}
*
* by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the
* same name as its child (or "parent" going right to left in the select expression) and a data
* type appropriate to the complex type extractor. In our example, the name of the child expression
* is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string
* field named "first".
*
* @param expr the top-level complex type extractor
*/
object SelectedField {
def unapply(expr: Expression): Option[StructField] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Expression -> ExtractValue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code does not compile with that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error:(61, 12) constructor cannot be instantiated to expected type;
 found   : org.apache.spark.sql.catalyst.expressions.Alias
 required: org.apache.spark.sql.catalyst.expressions.ExtractValue
      case Alias(child, _) => child

Alias takes: Alias(child: Expression, name: String)

// If this expression is an alias, work on its child instead
val unaliased = expr match {
case Alias(child, _) => child
case expr => expr
}
selectField(unaliased, None)
}

private def selectField(expr: Expression, fieldOpt: Option[StructField]): Option[StructField] = {
expr match {
// No children. Returns a StructField with the attribute name or None if fieldOpt is None.
case AttributeReference(name, dataType, nullable, metadata) =>
fieldOpt.map(field =>
StructField(name, wrapStructType(dataType, field), nullable, metadata))
// Handles case "expr0.field[n]", where "expr0" is of struct type and "expr0.field" is of
// array type.
case GetArrayItem(x @ GetStructFieldObject(child, field @ StructField(name,
dataType, nullable, metadata)), _) =>
val childField = fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field), nullable, metadata)).getOrElse(field)
selectField(child, Some(childField))
// Handles case "expr0.field[n]", where "expr0.field" is of array type.
case GetArrayItem(child, _) =>
selectField(child, fieldOpt)
// Handles case "expr0.field.subfield", where "expr0" and "expr0.field" are of array type.
case GetArrayStructFields(child: GetArrayStructFields,
field @ StructField(name, dataType, nullable, metadata), _, _, _) =>
val childField = fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field),
nullable, metadata)).getOrElse(field)
selectField(child, Some(childField))
// Handles case "expr0.field", where "expr0" is of array type.
case GetArrayStructFields(child,
field @ StructField(name, dataType, nullable, metadata), _, _, containsNull) =>
val childField =
fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field),
nullable, metadata)).getOrElse(field)
selectField(child, Some(childField))
// Handles case "expr0.field[key]", where "expr0" is of struct type and "expr0.field" is of
// map type.
case GetMapValue(x @ GetStructFieldObject(child, field @ StructField(name,
dataType,
nullable, metadata)), _) =>
val childField = fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field),
nullable, metadata)).getOrElse(field)
selectField(child, Some(childField))
// Handles case "expr0.field[key]", where "expr0.field" is of map type.
case GetMapValue(child, _) =>
selectField(child, fieldOpt)
// Handles case "expr0.field", where expr0 is of struct type.
case GetStructFieldObject(child,
field @ StructField(name, dataType, nullable, metadata)) =>
val childField = fieldOpt.map(field => StructField(name,
wrapStructType(dataType, field),
nullable, metadata)).getOrElse(field)
selectField(child, Some(childField))
case _ =>
None
}
}

// Constructs a composition of complex types with a StructType(Array(field)) at its core. Returns
// a StructType for a StructType, an ArrayType for an ArrayType and a MapType for a MapType.
private def wrapStructType(dataType: DataType, field: StructField): DataType = {
dataType match {
case _: StructType =>
StructType(Array(field))
case ArrayType(elementType, containsNull) =>
ArrayType(wrapStructType(elementType, field), containsNull)
case MapType(keyType, valueType, valueContainsNull) =>
MapType(keyType, wrapStructType(valueType, field), valueContainsNull)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ trait ConstraintHelper {

/**
* Infers a set of `isNotNull` constraints from null intolerant expressions as well as
* non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
* returns a constraint of the form `isNotNull(a)`
* non-nullable attributes and complex type extractors. For example, if an expression is of the
* form (`a > 5`), this returns a constraint of the form `isNotNull(a)`. For an expression of the
* form (`a.b > 5`), this returns the more precise constraint `isNotNull(a.b)`.
*/
def constructIsNotNullConstraints(
constraints: Set[Expression],
Expand All @@ -99,27 +100,28 @@ trait ConstraintHelper {
}

/**
* Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions
* of constraints.
* Infer the Attribute and ExtractValue-specific IsNotNull constraints from the null intolerant
* child expressions of constraints.
*/
private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] =
constraint match {
// When the root is IsNotNull, we can push IsNotNull through the child null intolerant
// expressions
case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_))
case IsNotNull(expr) => scanNullIntolerantField(expr).map(IsNotNull(_))
// Constraints always return true for all the inputs. That means, null will never be returned.
// Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child
// null intolerant expressions.
case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
case _ => scanNullIntolerantField(constraint).map(IsNotNull(_))
}

/**
* Recursively explores the expressions which are null intolerant and returns all attributes
* in these expressions.
* Recursively explores the expressions which are null intolerant and returns all attributes and
* complex type extractors in these expressions.
*/
private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match {
private def scanNullIntolerantField(expr: Expression): Seq[Expression] = expr match {
case ev: ExtractValue => Seq(ev)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this improvement, can we do it in a separate PR? The corresponding unit test case are needed in InferFiltersFromConstraintsSuite instead of ParquetSchemaPruningSuite.

Copy link
Contributor Author

@mallman mallman May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree adding more direct and independent test coverage for this change is a good idea. However, omitting this change will weaken the capabilities of this PR. It would also imply the removal of the failing test case in ParquetSchemaPruningSuite, which would imply two follow on PRs. The first would be to add this specific change plus the right test coverage. The next would be to restore the test case removed from ParquetSchemaPruningSuite.

Let me suggest an alternative. As this change is a valuable enhancement for this PR, let me try adding an appropriate test case in InferFiltersFromConstraintsSuite as part of this PR. That will eliminate the requirement for two more follow-on PRs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this change, we also need to change the code in basicPhysicalOperators.scala. I do not think this is the right solution. More importantly, the changes in basicPhysicalOperators.scala might break the others. We need a separate PR for these changes. Please remove the changes made in basicPhysicalOperators.scala and QueryPlanConstraints.scala

case a: Attribute => Seq(a)
case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute)
case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantField)
case _ => Seq.empty[Attribute]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1381,8 +1381,18 @@ object SQLConf {
"issues. Turn on this config to insert a local sort before actually doing repartition " +
"to generate consistent repartition results. The performance of repartition() may go " +
"down since we insert extra local sort before it.")
.booleanConf
.createWithDefault(true)

val NESTED_SCHEMA_PRUNING_ENABLED =
buildConf("spark.sql.nestedSchemaPruning.enabled")
.internal()
.doc("Prune nested fields from a logical relation's output which are unnecessary in " +
"satisfying a query. This optimization allows columnar file format readers to avoid " +
"reading unnecessary nested column data. Currently Parquet is the only data source that " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ORC?

cc @dongjoon-hyun Do you know whether it is also doable in the latest ORC version?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pinging me, @gatorsmile . Let me check it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORC should be able to support this capability as well, but this PR does not address that.

"implements this optimization.")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)
Copy link

@DaimonPl DaimonPl Jun 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about enabling it as default? there should be enough time to find any unexpected problems with 2.4.0

additionally nested column pruning would be enabled during all other automatic tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm against enabling this feature by default with a known failing test case. For example, https://github.com/apache/spark/pull/21320/files#diff-0c6c7481232e9637b91c179f1005426aR71.


val TOP_K_SORT_FALLBACK_THRESHOLD =
buildConf("spark.sql.execution.topKSortFallbackThreshold")
Expand Down Expand Up @@ -1863,6 +1873,8 @@ class SQLConf extends Serializable with Logging {
def partitionOverwriteMode: PartitionOverwriteMode.Value =
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))

def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)

def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)

def legacySizeOfNull: Boolean = getConf(SQLConf.LEGACY_SIZE_OF_NULL)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.internal.SQLConf.NESTED_SCHEMA_PRUNING_ENABLED

/**
* A PlanTest that ensures that all tests in this suite are run with nested schema pruning enabled.
* Remove this trait once the default value of SQLConf.NESTED_SCHEMA_PRUNING_ENABLED is set to true.
*/
private[sql] trait SchemaPruningTest extends PlanTest with BeforeAndAfterAll {
private var originalConfSchemaPruningEnabled = false

override protected def beforeAll(): Unit = {
// Call `withSQLConf` eagerly because some subtypes of `PlanTest` (I'm looking at you,
// `SQLTestUtils`) override `withSQLConf` to reset the existing `SQLConf` with a new one without
// copying existing settings first. This here is an awful, ugly way to get around that behavior
// by initializing the "real" `SQLConf` with an noop call to `withSQLConf`. I don't want to risk
// "fixing" the downstream behavior, breaking everything else that's expecting these semantics.
// Oh well...
withSQLConf()(())
originalConfSchemaPruningEnabled = conf.nestedSchemaPruningEnabled
conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, true)
super.beforeAll()
}

override protected def afterAll(): Unit = {
try {
super.afterAll()
} finally {
conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, originalConfSchemaPruningEnabled)
}
}
}
Loading