Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2a1cd27
optimization + test
bogdanrdc Aug 23, 2018
421ee20
debug benchmark + early batch
bogdanrdc Aug 23, 2018
d7e49e7
revert benchmark
bogdanrdc Aug 23, 2018
ba6d91e
Merge remote-tracking branch 'upstream/master' into local-relation-fi…
bogdanrdc Aug 24, 2018
326e5d7
test fix
bogdanrdc Aug 24, 2018
4263bd2
[SPARK-25073][YARN] AM and Executor Memory validation message is not …
sujith71955 Aug 24, 2018
f84d256
[SPARK-25214][SS] Fix the issue that Kafka v2 source may return dupli…
zsxwing Aug 24, 2018
c721895
[SPARK-25174][YARN] Limit the size of diagnostic message for am to un…
yaooqinn Aug 24, 2018
f8536e3
[SPARK-25234][SPARKR] avoid integer overflow in parallelize
mengxr Aug 24, 2018
af6a91e
Correct missing punctuation in the documentation
Aug 25, 2018
c613c6b
[MINOR] Fix Scala 2.12 build
dbtsai Aug 25, 2018
ee1c0e8
[SPARK-24688][EXAMPLES] Modify the comments about LabeledPoint
huangweizhe123 Aug 25, 2018
77fb55e
[SPARK-25214][SS][FOLLOWUP] Fix the issue that Kafka v2 source may re…
zsxwing Aug 25, 2018
b00824c
[SPARK-23792][DOCS] Documentation improvements for datetime functions
abradbury Aug 26, 2018
ee6cb6c
[SPARK-23698][PYTHON][FOLLOWUP] Resolve undefiend names in setup.py
HyukjinKwon Aug 27, 2018
c129176
[SPARK-19355][SQL][FOLLOWUP] Remove the child.outputOrdering check in…
viirya Aug 27, 2018
368b42f
[SPARK-24978][SQL] Add spark.sql.fast.hash.aggregate.row.max.capacity…
heary-cao Aug 27, 2018
0378b1f
[SPARK-25249][CORE][TEST] add a unit test for OpenHashMap
10110346 Aug 27, 2018
d5a953a
[SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests.
jose-torres Aug 27, 2018
3598483
[SPARK-24149][YARN][FOLLOW-UP] Only get the delegation tokens of the …
wangyum Aug 27, 2018
397fa62
[SPARK-24090][K8S] Update running-on-kubernetes.md
liyinan926 Aug 27, 2018
dcd001b
[SPARK-24721][SQL] Exclude Python UDFs filters in FileSourceStrategy
icexelloss Aug 28, 2018
b23538b
[SPARK-25218][CORE] Fix potential resource leaks in TransportServer a…
zsxwing Aug 28, 2018
f769a94
[SPARK-25005][SS] Support non-consecutive offsets for Kafka
zsxwing Aug 28, 2018
68c41ff
comment
bogdanrdc Aug 28, 2018
dad6a7f
Merge remote-tracking branch 'upstream/master' into local-relation-fi…
bogdanrdc Aug 28, 2018
cb067c3
Merge remote-tracking branch 'upstream/master' into local-relation-fi…
bogdanrdc Aug 28, 2018
d552cc1
space
bogdanrdc Aug 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
optimization + test
  • Loading branch information
bogdanrdc committed Aug 23, 2018
commit 2a1cd27d7f188b51429553718cac91d87d667bbd
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {

case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)

case Filter(condition, LocalRelation(output, data, isStreaming))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if !hasUnevaluableExpr(condition) =>
Copy link
Member

@gatorsmile gatorsmile Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the condition is non-deterministic, the values will be always the same after the plans are optimized. The DataFrame with non-deterministic filters will always return the same result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is OK, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We blocks all these optimization in the other optimization rules, e.g., ConstantFolding. All the non-deterministic expressions are not foldable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConvertToLocalRelation was already doing this for Project so I assumed it's OK. what I did is exactly how Project(LocalRelation) works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine about introducing this change since we already did it in Project(LocalRelation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it is fine in this case. The only thing is that it violates the contract of the optimizer: it should not change the results of a query.

val predicate = InterpretedPredicate.create(condition, output)
predicate.initialize(0)
LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming)
}

private def hasUnevaluableExpr(expr: Expression): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{LessThan, Literal}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -52,4 +53,21 @@ class ConvertToLocalRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("Filter on LocalRelation should be turned into a single LocalRelation") {
val testRelation = LocalRelation(
LocalRelation('a.int, 'b.int).output,
InternalRow(1, 2) :: InternalRow(4, 5) :: Nil)

val correctAnswer = LocalRelation(
LocalRelation('a1.int, 'b1.int).output,
InternalRow(1, 3) :: Nil)

val filterAndProjectOnLocal = testRelation
.select(UnresolvedAttribute("a").as("a1"), (UnresolvedAttribute("b") + 1).as("b1"))
.where(LessThan(UnresolvedAttribute("b1"), Literal.create(6)))

val optimized = Optimize.execute(filterAndProjectOnLocal.analyze)

comparePlans(optimized, correctAnswer)
}
}