Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ object UnionPushdown extends Rule[LogicalPlan] {
* - Aggregate
* - Project <- Join
* - LeftSemiJoin
* - Generate
* - Collapse adjacent projections, performing alias substitution.
*/
object ColumnPruning extends Rule[LogicalPlan] {
Expand Down Expand Up @@ -171,6 +172,10 @@ object ColumnPruning extends Rule[LogicalPlan] {

Project(substitutedProjection, child)

// add a project which is blocked by Generate
case p @ pushBelowGenerate(newChild) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just look for Project on top of generate instead of attempting to walk the query tree. You can assume other rules will push projects down to you.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generate block the process of collectProjectsAndFilters in PhysicalOperation.
Except that Project on top of generate, the expression named Filter may also be on the top of generate. How to solve this scene?
Project
Filter
Generate(explode)
Are there many Filters between Project and Generate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that we need several rules to accomplish all of the various optimizations. However, as its written now this is trying to do too much and as a result is too hard to follow.

p.copy(child = newChild)

// Eliminate no-op Projects
case Project(projectList, child) if child.output == projectList => child
}
Expand All @@ -182,6 +187,32 @@ object ColumnPruning extends Rule[LogicalPlan] {
} else {
c
}

object pushBelowGenerate {
// because generate block project operate, it need to insert a project below generate with all
// references
def collectRefersUntilGen(refers: AttributeSet, plan: LogicalPlan): LogicalPlan = {
val collectRefers = refers ++ plan.references
plan match {
case filter @ Filter(_, c) =>
val newChild = collectRefersUntilGen(collectRefers, c)
// null indicate child is not changed
if (newChild != null) filter.copy(child = newChild) else null
case gen @ Generate(_, _, _, _, c) =>
if ((c.outputSet -- collectRefers.filter(c.outputSet.contains)).nonEmpty) {
gen.copy(child = Project(collectRefers.filter(c.outputSet.contains).toSeq, c))
} else {
null
}
case _ => null
}
}

def unapply(plan: Project): Option[LogicalPlan] = {
val newChild = collectRefersUntilGen(plan.references, plan.child)
if (newChild != null) Some(newChild) else None
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ case class Generate(
child: LogicalPlan)
extends UnaryNode {


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mutable state is no longer needed.

protected def generatorOutput: Seq[Attribute] = {
val output = alias
.map(a => generator.output.map(_.withQualifiers(a :: Nil)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,11 @@ class FilterPushdownSuite extends PlanTest {
.where(('c > 6) || ('b > 5)).analyze
}
val optimized = Optimize(originalQuery)

comparePlans(optimized, originalQuery)
val correctAnswer = {
testRelationWithArrayType
.generate(Explode(Seq("c"), 'c_arr), true, false, Some("arr"))
.where(('c > 6) || ('b > 5)).analyze
}
comparePlans(optimized, correctAnswer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is going on here?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ case class Generate(
val boundGenerator = BindReferences.bindReference(generator, child.output)

override def execute(): RDD[Row] = {
if (join) {
// #SPARK-6489 do not join when the child has no output
if (join && child.output.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unrelated. Also, how is it possible to run a generator when there is no output?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT one FROM person LATERAL VIEW explode(1) AS one;
Project do not need the table data, and know the table count of rows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be trying to optimize degenerate queries such as this one by putting random hacks into the execution engine.

child.execute().mapPartitions { iter =>
val nullValues = Seq.fill(generator.output.size)(Literal(null))
// Used to produce rows with no matches when outer = true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,13 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
INSERT OVERWRITE TABLE episodes_part PARTITION (doctor_pt=1)
SELECT title, air_date, doctor FROM episodes
""".cmd
)
),
TestTable("person",
("CREATE TABLE person(name string, age int, data array<INT>) " +
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' " +
"COLLECTION ITEMS TERMINATED BY ':'").cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/person.txt")}' INTO TABLE person".cmd
)
)

hiveQTestUtilTables.foreach(registerTestTable)
Expand Down
5 changes: 5 additions & 0 deletions sql/hive/src/test/resources/data/files/person.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
A, 20, 10:12:19
B, 25, 7:8:4
C, 19, 12:4:232
D, 73, 243:53:7835
E, 88, 1345:23:532532:353
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
Seq("key"),
Seq.empty)

createPruningTest("Column pruning - explode with aggregate",
"SELECT name, sum(d) AS sumd FROM person LATERAL VIEW explode(data) d AS d GROUP BY name",
Seq("name", "sumd"),
Seq("name","data"),
Seq.empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to go all the way to hive to create an integration test. Instead create some unit tests in ...catalyst.optimizer.


createPruningTest("Column pruning - outer explode with limit",
"SELECT name FROM person LATERAL VIEW OUTER explode(data) outd AS d" +
" where name < \"C\" limit 3",
Seq("name"),
Seq("name", "data"),
Seq.empty)

createPruningTest(s"Column pruning - select all without explode optimze - query test",
"SELECT * FROM person LATERAL VIEW OUTER explode(data) outd AS d WHERE 20 < age",
Seq("name", "age", "data", "d"),
Seq("name", "age", "data"),
Seq.empty)



// Partition pruning tests

createPruningTest("Partition pruning - non-partitioned, non-trivial project",
Expand Down