Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
213 commits
Select commit Hold shift + click to select a range
e1e09e0
SPARK-977 Added Python RDD.zip function
Mar 10, 2014
f551898
[SPARK-972] Added detailed callsite info for ValueError in context.py…
jyotiska Mar 10, 2014
a59419c
SPARK-1168, Added foldByKey to pyspark.
ScrapCodes Mar 10, 2014
2a51617
SPARK-1205: Clean up callSite/origin/generator.
pwendell Mar 10, 2014
2a2c964
SPARK-1211. In ApplicationMaster, set spark.master system property to…
sryza Mar 11, 2014
16788a6
SPARK-1167: Remove metrics-ganglia from default build due to LGPL iss…
pwendell Mar 11, 2014
2409af9
SPARK-1064
sryza Mar 12, 2014
af7f2f1
Spark-1163, Added missing Python RDD functions
Mar 12, 2014
c8c59b3
[SPARK-1232] Fix the hadoop 0.23 yarn build
tgravescs Mar 12, 2014
b5162f4
[SPARK-1233] Fix running hadoop 0.23 due to java.lang.NoSuchFieldExce…
tgravescs Mar 12, 2014
5d1ec64
Fix #SPARK-1149 Bad partitioners can cause Spark to hang
Mar 12, 2014
b8afe30
SPARK-1162 Added top in python.
ScrapCodes Mar 12, 2014
9032f7c
SPARK-1160: Deprecate toArray in RDD
CodingCat Mar 13, 2014
31a7040
Fix example bug: compile error
Mar 13, 2014
6bd2eaa
hot fix for PR105 - change to Java annotation
CodingCat Mar 13, 2014
4ea23db
SPARK-1019: pyspark RDD take() throws an NPE
pwendell Mar 13, 2014
e4e8d8f
[SPARK-1237, 1238] Improve the computation of YtY for implicit ALS
mengxr Mar 13, 2014
6983732
SPARK-1183. Don't use "worker" to mean executor
sryza Mar 13, 2014
ca4bf8c
SPARK-1236 - Upgrade Jetty to 9.1.3.v20140225.
rxin Mar 13, 2014
181b130
[bugfix] wrong client arg, should use executor-cores
tsdeng Mar 14, 2014
e19044c
Fix serialization of MutablePair. Also provide an interface for easy …
marmbrus Mar 14, 2014
97e4459
SPARK-1254. Consolidate, order, and harmonize repository declarations…
srowen Mar 15, 2014
f5486e9
SPARK-1255: Allow user to pass Serializer object instead of class nam…
rxin Mar 16, 2014
dc96546
SPARK-1240: handle the case of empty RDD when takeSample
CodingCat Mar 17, 2014
796977a
SPARK-1244: Throw exception if map output status exceeds frame size
pwendell Mar 17, 2014
087eedc
[Spark-1261] add instructions for running python examples to doc over…
Mar 18, 2014
e3681f2
Spark 1246 add min max to stat counter
dwmclary Mar 18, 2014
e7423d4
Revert "SPARK-1236 - Upgrade Jetty to 9.1.3.v20140225."
pwendell Mar 18, 2014
2fa26ec
SPARK-1102: Create a saveAsNewAPIHadoopDataset method
CodingCat Mar 18, 2014
79e547f
Update copyright year in NOTICE to 2014
mateiz Mar 18, 2014
e108b9a
[SPARK-1260]: faster construction of features with intercept
mengxr Mar 18, 2014
f9d8a83
[SPARK-1266] persist factors in implicit ALS
mengxr Mar 19, 2014
cc2655a
Fix SPARK-1256: Master web UI and Worker web UI returns a 404 error
witgo Mar 19, 2014
a18ea00
Bundle tachyon: SPARK-1269
nicklan Mar 19, 2014
d55ec86
bugfix: Wrong "Duration" in "Active Stages" in stages page
BlackNiuza Mar 19, 2014
6112270
SPARK-1203 fix saving to hdfs from yarn
tgravescs Mar 19, 2014
ab747d3
Bugfixes/improvements to scheduler
mridulm Mar 19, 2014
79d07d6
[SPARK-1132] Persisting Web UI through refactoring the SparkListener …
andrewor14 Mar 19, 2014
67fa71c
Added doctest for map function in rdd.py
jyotiska Mar 19, 2014
1678931
SPARK-1099:Spark's local mode should probably respect spark.cores.max…
Mar 19, 2014
ffe272d
Revert "SPARK-1099:Spark's local mode should probably respect spark.c…
aarondav Mar 20, 2014
66a03e5
Principal Component Analysis
rezazadeh Mar 20, 2014
ca76423
[Hot Fix #42] Do not stop SparkUI if bind() is not called
andrewor14 Mar 20, 2014
9aadcff
SPARK-1251 Support for optimizing and executing structured queries
marmbrus Mar 21, 2014
e09139d
Fix maven jenkins: Add explicit init for required tables in SQLQueryS…
marmbrus Mar 21, 2014
7e17fe6
Add hive test files to repository. Remove download script.
marmbrus Mar 21, 2014
2c0aa22
SPARK-1279: Fix improper use of SimpleDateFormat
zsxwing Mar 21, 2014
dab5439
Make SQL keywords case-insensitive
mateiz Mar 21, 2014
d780983
Add asCode function for dumping raw tree representations.
marmbrus Mar 21, 2014
646e554
Fix to Stage UI to display numbers on progress bar
emtiazahmed Mar 22, 2014
abf6714
SPARK-1254. Supplemental fix for HTTPS on Maven Central
srowen Mar 23, 2014
57a4379
[SPARK-1292] In-memory columnar representation for Spark SQL
liancheng Mar 23, 2014
8265dc7
Fixed coding style issues in Spark SQL
liancheng Mar 23, 2014
80c2968
[SPARK-1212] Adding sparse data support and update KMeans
mengxr Mar 24, 2014
21109fb
SPARK-1144 Added license and RAT to check licenses.
ScrapCodes Mar 24, 2014
56db8a2
HOT FIX: Exclude test files from RAT
pwendell Mar 24, 2014
8043b7b
SPARK-1294 Fix resolution of uppercase field names using a HiveContext.
marmbrus Mar 25, 2014
dc126f2
SPARK-1094 Support MiMa for reporting binary compatibility accross ve…
pwendell Mar 25, 2014
5140598
SPARK-1128: set hadoop task properties when constructing HadoopRDD
CodingCat Mar 25, 2014
b637f2d
Unify the logic for column pruning, projection, and filtering of tabl…
marmbrus Mar 25, 2014
007a733
SPARK-1286: Make usage of spark-env.sh idempotent
aarondav Mar 25, 2014
134ace7
Add more hive compatability tests to whitelist
marmbrus Mar 25, 2014
71d4ed2
SPARK-1316. Remove use of Commons IO
srowen Mar 25, 2014
f8111ea
SPARK-1319: Fix scheduler to account for tasks using > 1 CPUs.
shivaram Mar 25, 2014
8237df8
Avoid Option while generating call site
witgo Mar 25, 2014
4f7d547
Initial experimentation with Travis CI configuration
marmbrus Mar 26, 2014
b859853
SPARK-1321 Use Guava's top k implementation rather than our BoundedPr…
rxin Mar 26, 2014
a0853a3
SPARK-1322, top in pyspark should sort result in descending order.
ScrapCodes Mar 26, 2014
345825d
Unified package definition format in Spark SQL
liancheng Mar 26, 2014
32cbdfd
[SQL] Un-ignore a test that is now passing.
marmbrus Mar 27, 2014
e15e574
[SQL] Add a custom serializer for maps since they do not have a no-ar…
marmbrus Mar 27, 2014
be6d96c
SPARK-1324: SparkUI Should Not Bind to SPARK_PUBLIC_DNS
pwendell Mar 27, 2014
3e63d98
Spark 1095 : Adding explicit return types to all public methods
NirmalReddy Mar 27, 2014
1fa48d9
SPARK-1325. The maven build error for Spark Tools
srowen Mar 27, 2014
d679843
[SPARK-1327] GLM needs to check addIntercept for intercept and weights
mengxr Mar 27, 2014
5b2d863
Cut down the granularity of travis tests.
marmbrus Mar 27, 2014
426042a
SPARK-1330 removed extra echo from comput_classpath.sh
tgravescs Mar 27, 2014
53953d0
SPARK-1335. Also increase perm gen / code cache for scalatest when in…
srowen Mar 27, 2014
6f986f0
[SPARK-1268] Adding XOR and AND-NOT operations to spark.util.collecti…
Mar 27, 2014
3d89043
[SPARK-1210] Prevent ContextClassLoader of Actor from becoming ClassL…
ueshin Mar 28, 2014
632c322
Make sed do -i '' on OSX
nicklan Mar 28, 2014
60abc25
SPARK-1096, a space after comment start style checker.
ScrapCodes Mar 28, 2014
75d46be
fix path for jar, make sed actually work on OSX
nicklan Mar 28, 2014
56cc7fb
First cut implementation of Streaming UI.
tdas Mar 28, 2014
3738f24
SPARK-1345 adding missing dependency on avro for hadoop 0.23 to the n…
tgravescs Mar 29, 2014
1617816
SPARK-1126. spark-app preliminary
sryza Mar 29, 2014
af3746c
Implement the RLike & Like in catalyst
chenghao-intel Mar 29, 2014
fda86d8
[SPARK-1186] : Enrich the Spark Shell to support additional arguments.
berngp Mar 30, 2014
92b8395
Don't swallow all kryo errors, only those that indicate we are out of…
marmbrus Mar 30, 2014
2861b07
[SQL] SPARK-1354 Fix self-joins of parquet relations
marmbrus Mar 30, 2014
df1b9f7
SPARK-1336 Reducing the output of run-tests script.
ScrapCodes Mar 30, 2014
95d7d2a
[SPARK-1354][SQL] Add tableName as a qualifier for SimpleCatelogy
jerryshao Mar 30, 2014
d666053
SPARK-1352 - Comment style single space before ending */ check.
ScrapCodes Mar 30, 2014
841721e
SPARK-1352: Improve robustness of spark-submit script
pwendell Mar 31, 2014
5731af5
[SQL] Rewrite join implementation to allow streaming of one relation.
marmbrus Mar 31, 2014
33b3c2a
SPARK-1365 [HOTFIX] Fix RateLimitedOutputStream test
pwendell Mar 31, 2014
93f1c69
Added network receiver information to the Streaming UI.
tdas Mar 31, 2014
564f1c1
SPARK-1376. In the yarn-cluster submitter, rename "args" option to "arg"
sryza Apr 1, 2014
94fe7fd
[SPARK-1377] Upgrade Jetty to 8.1.14v20131031
andrewor14 Apr 1, 2014
ada310a
[Hot Fix #42] Persisted RDD disappears on storage page if re-used
andrewor14 Apr 1, 2014
4d86e98
Added basic stats to the StreamingUI and refactored the UI to a Page …
tdas Apr 1, 2014
db27bad
Added last batch processing time to StreamingUI.
tdas Apr 1, 2014
f5c418d
[SQL] SPARK-1372 Support for caching and uncaching tables in a SQLCon…
marmbrus Apr 1, 2014
aef4dd5
Added Apache licenses.
tdas Apr 1, 2014
764353d
[SPARK-1342] Scala 2.10.4
markhamstra Apr 2, 2014
afb5ea6
[Spark-1134] only call ipython if no arguments are given; remove IPYT…
Apr 2, 2014
45df912
Revert "[Spark-1134] only call ipython if no arguments are given; rem…
mateiz Apr 2, 2014
8b3045c
MLI-1 Decision Trees
manishamde Apr 2, 2014
ea9de65
Remove * from test case golden filename.
marmbrus Apr 2, 2014
11973a7
Renamed stageIdToActiveJob to jobIdToActiveJob.
kayousterhout Apr 2, 2014
de8eefa
[SPARK-1385] Use existing code for JSON de/serialization of BlockId
andrewor14 Apr 2, 2014
7823633
Do not re-use objects in the EdgePartition/EdgeTriplet iterators.
darabos Apr 2, 2014
1faa579
[SPARK-1371][WIP] Compression support for Spark SQL in-memory columna…
liancheng Apr 2, 2014
ed730c9
StopAfter / TopK related changes
rxin Apr 2, 2014
9c65fa7
[SPARK-1212, Part II] Support sparse data in MLlib
mengxr Apr 2, 2014
7d57444
Refactoring the UI interface to add flexibility
andrewor14 Apr 2, 2014
47ebea5
[SQL] SPARK-1364 Improve datatype and test coverage for ScalaReflecti…
marmbrus Apr 3, 2014
cd000b0
Merge github.com:apache/spark into ui-refactor
andrewor14 Apr 3, 2014
a37ad4f
Comments, imports and formatting (minor)
andrewor14 Apr 3, 2014
ed25dfc
Generalize SparkUI header to display tabs dynamically
andrewor14 Apr 3, 2014
92a86b2
[SPARK-1398] Removed findbugs jsr305 dependency
markhamstra Apr 3, 2014
fbebaed
Spark parquet improvements
AndreSchumacher Apr 3, 2014
5d1feda
[SPARK-1360] Add Timestamp Support for SQL
chenghao-intel Apr 3, 2014
53be2c5
Minor style updates.
tdas Apr 3, 2014
61358e3
Merge remote-tracking branch 'apache-github/master' into streaming-we…
tdas Apr 3, 2014
c1ea3af
Spark 1162 Implemented takeOrdered in pyspark.
ScrapCodes Apr 3, 2014
b8f5341
[SQL] SPARK-1333 First draft of java API
marmbrus Apr 3, 2014
a599e43
[SPARK-1134] Fix and document passing of arguments to IPython
Apr 3, 2014
9a48fa1
Allow adding tabs to SparkUI dynamically + add example
andrewor14 Apr 3, 2014
0d61ee8
Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refa…
andrewor14 Apr 3, 2014
d94826b
[BUILD FIX] Fix compilation of Spark SQL Java API.
marmbrus Apr 3, 2014
8f7323b
End of file new lines, indentation, and imports (minor)
andrewor14 Apr 3, 2014
9231b01
Fix jenkins from giving the green light to builds that don't compile.
marmbrus Apr 3, 2014
33e6361
Revert "[SPARK-1398] Removed findbugs jsr305 dependency"
pwendell Apr 4, 2014
ee6e9e7
SPARK-1337: Application web UI garbage collects newest stages
pwendell Apr 4, 2014
7f32fd4
SPARK-1350. Always use JAVA_HOME to run executor container JVMs.
sryza Apr 4, 2014
01cf4c4
SPARK-1404: Always upgrade spark-env.sh vars to environment vars
aarondav Apr 4, 2014
f1fa617
[SPARK-1133] Add whole text files reader in MLlib
yinxusen Apr 4, 2014
16b8308
SPARK-1375. Additional spark-submit cleanup
sryza Apr 4, 2014
a02b535
Don't create SparkContext in JobProgressListenerSuite.
pwendell Apr 4, 2014
198892f
[SPARK-1198] Allow pipes tasks to run in different sub-directories
tgravescs Apr 5, 2014
d956cc2
[SQL] Minor fixes.
marmbrus Apr 5, 2014
60e18ce
SPARK-1414. Python API for SparkContext.wholeTextFiles
mateiz Apr 5, 2014
5f3c1bb
Add test utility for generating Jar files with compiled classes.
pwendell Apr 5, 2014
1347ebd
[SPARK-1419] Bumped parent POM to apache 14
markhamstra Apr 5, 2014
b50ddfd
SPARK-1305: Support persisting RDD's directly to Tachyon
haoyuan Apr 5, 2014
8de038e
[SQL] SPARK-1366 Consistent sql function across different types of SQ…
marmbrus Apr 5, 2014
0acc7a0
small fix ( proogram -> program )
prabeesh Apr 5, 2014
7c18428
HOTFIX for broken CI, by SPARK-1336
ScrapCodes Apr 5, 2014
2d0150c
Remove the getStageInfo() method from SparkContext.
kayousterhout Apr 5, 2014
6e88583
[SPARK-1371] fix computePreferredLocations signature to not depend on…
Apr 5, 2014
890d63b
Fix for PR #195 for Java 6
srowen Apr 6, 2014
0b85516
SPARK-1421. Make MLlib work on Python 2.6
mateiz Apr 6, 2014
7012ffa
Fix SPARK-1420 The maven build error for Spark Catalyst
witgo Apr 6, 2014
e258e50
[SPARK-1259] Make RDD locally iterable
epahomov Apr 6, 2014
856c50f
SPARK-1387. Update build plugins, avoid plugin version warning, centr…
srowen Apr 7, 2014
7ce52c4
SPARK-1349: spark-shell gets its own command history
aarondav Apr 7, 2014
4106558
SPARK-1314: Use SPARK_HIVE to determine if we include Hive in packaging
aarondav Apr 7, 2014
1440154
SPARK-1154: Clean up app folders in worker nodes
Apr 7, 2014
87d0928
SPARK-1431: Allow merging conflicting pull requests
pwendell Apr 7, 2014
accd099
[SQL] SPARK-1371 Hash Aggregation Improvements
marmbrus Apr 7, 2014
b5bae84
[SQL] SPARK-1427 Fix toString for SchemaRDD NativeCommands.
marmbrus Apr 7, 2014
a3c51c6
SPARK-1432: Make sure that all metadata fields are properly cleaned
Apr 7, 2014
83f2a2f
[sql] Rename Expression.apply to eval for better readability.
rxin Apr 7, 2014
9dd8b91
SPARK-1252. On YARN, use container-log4j.properties for executors
sryza Apr 7, 2014
2a2ca48
HOTFIX: Disable actor input stream test.
pwendell Apr 7, 2014
0307db0
SPARK-1099: Introduce local[*] mode to infer number of cores
aarondav Apr 7, 2014
c78c92d
Remove outdated comment
andrewor14 Apr 7, 2014
14c9238
[sql] Rename execution/aggregates.scala Aggregate.scala, and added a …
rxin Apr 8, 2014
55dfd5d
Removed the default eval implementation from Expression, and added a …
rxin Apr 8, 2014
31e6fff
Added eval for Rand (without any support for user-defined seed).
rxin Apr 8, 2014
f27e56a
Change timestamp cast semantics. When cast to numeric types, return t…
rxin Apr 8, 2014
0d0493f
[SPARK-1402] Added 3 more compression schemes
liancheng Apr 8, 2014
11eabbe
[SPARK-1103] Automatic garbage collection of RDD, shuffle and broadca…
tdas Apr 8, 2014
83ac9a4
[SPARK-1331] Added graceful shutdown to Spark Streaming
tdas Apr 8, 2014
6dc5f58
[SPARK-1396] Properly cleanup DAGScheduler on job cancellation.
kayousterhout Apr 8, 2014
3bc0548
Remove extra semicolon in import statement and unused import in Appli…
hsaputra Apr 8, 2014
a8d86b0
SPARK-1348 binding Master, Worker, and App Web UI to all interfaces
kanzhang Apr 8, 2014
e25b593
SPARK-1445: compute-classpath should not print error if lib_managed n…
aarondav Apr 8, 2014
fac6085
[SPARK-1397] Notify SparkListeners when stages fail or are cancelled.
kayousterhout Apr 8, 2014
12c077d
SPARK-1433: Upgrade Mesos dependency to 0.17.0
techaddict Apr 8, 2014
ce8ec54
Spark 1271: Co-Group and Group-By should pass Iterable[X]
holdenk Apr 9, 2014
b9e0c93
[SPARK-1434] [MLLIB] change labelParser from anonymous function to trait
mengxr Apr 9, 2014
fa0524f
Spark-939: allow user jars to take precedence over spark jars
holdenk Apr 9, 2014
9689b66
[SPARK-1390] Refactoring of matrices backed by RDDs
mengxr Apr 9, 2014
87bd1f9
SPARK-1093: Annotate developer and experimental API's
pwendell Apr 9, 2014
bde9cc1
[SPARK-1357] [MLLIB] Annotate developer and experimental APIs
mengxr Apr 9, 2014
eb5f2b6
SPARK-1407 drain event queue before stopping event logger
kanzhang Apr 9, 2014
0adc932
[SPARK-1357 (fix)] remove empty line after :: DeveloperApi/Experiment…
mengxr Apr 10, 2014
8ca3b2b
SPARK-729: Closures not always serialized at capture time
willb Apr 10, 2014
3e986f8
Merge remote-tracking branch 'apache/master' into streaming-web-ui
tdas Apr 10, 2014
168fe86
Merge pull request #2 from andrewor14/ui-refactor
tdas Apr 10, 2014
827e81a
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
tdas Apr 10, 2014
1af239b
Changed streaming UI to attach itself as a tab with the Spark UI.
tdas Apr 10, 2014
1c0bcef
Refactored streaming UI into two files.
tdas Apr 10, 2014
fa760fe
Fixed long line.
tdas Apr 10, 2014
e55cc4b
SPARK-1446: Spark examples should not do a System.exit
techaddict Apr 10, 2014
e6d4a74
Revert "SPARK-729: Closures not always serialized at capture time"
pwendell Apr 10, 2014
a74fbbb
Fix SPARK-1413: Parquet messes up stdout and stdin when used in Spark…
witgo Apr 10, 2014
79820fe
[SPARK-1276] Add a HistoryServer to render persisted UI
andrewor14 Apr 10, 2014
3bd3129
SPARK-1428: MLlib should convert non-float64 NumPy arrays to float64 …
techaddict Apr 10, 2014
ee6543f
Minor changes based on Andrew's comments.
tdas Apr 10, 2014
6de06b0
Merge remote-tracking branch 'apache/master' into streaming-web-ui
tdas Apr 10, 2014
548c98c
Wide refactoring of WebUI, UITab, and UIPage (see commit message)
andrewor14 Apr 11, 2014
914b8ff
Moved utils functions to UIUtils.
tdas Apr 11, 2014
585cd65
Merge pull request #5 from andrewor14/ui-refactor
tdas Apr 11, 2014
caa5e05
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
tdas Apr 11, 2014
f8e1053
Added Spark and Streaming UI unit tests.
tdas Apr 11, 2014
aa396d4
Rename tabs and pages (No more IndexPage.scala)
andrewor14 Apr 11, 2014
2fc09c8
Added binary check exclusions
tdas Apr 11, 2014
72fe256
Merge pull request #6 from andrewor14/ui-refactor
tdas Apr 11, 2014
89dae36
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
tdas Apr 11, 2014
90feb8d
Address Patrick's comments
andrewor14 Apr 11, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Unify the logic for column pruning, projection, and filtering of tabl…
…e scans.

This removes duplicated logic, dead code and casting when planning parquet table scans and hive table scans.

Other changes:
 - Fix tests now that we are doing a better job of column pruning (i.e., since pruning predicates are applied before we even start scanning tuples, columns required by these predicates do not need to be included in the output of the scan unless they are also included in the final output of this logical plan fragment).
 - Add rule to simplify trivial filters.  This was required to avoid `WHERE false` from getting pushed into table scans, since `HiveTableScan` (reasonably) refuses to apply partition pruning predicates to non-partitioned tables.

Author: Michael Armbrust <[email protected]>

Closes apache#213 from marmbrus/strategyCleanup and squashes the following commits:

48ce403 [Michael Armbrust] Move one more bit of parquet stuff into the core SQLContext.
834ce08 [Michael Armbrust] Address comments.
0f2c6f5 [Michael Armbrust] Unify the logic for column pruning, projection, and filtering of table scans for both Hive and Parquet relations.  Fix tests now that we are doing a better job of column pruning.
  • Loading branch information
marmbrus authored and pwendell committed Mar 25, 2014
commit b637f2d91ab4d3d5bf13e8d959c919ebd776f6af
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification,
SimplifyFilters,
SimplifyCasts) ::
Batch("Filter Pushdown", Once,
CombineFilters,
Expand Down Expand Up @@ -90,6 +91,22 @@ object CombineFilters extends Rule[LogicalPlan] {
}
}

/**
* Removes filters that can be evaluated trivially. This is done either by eliding the filter for
* cases where it will always evaluate to `true`, or substituting a dummy empty relation when the
* filter will always evaluate to `false`.
*/
object SimplifyFilters extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Filter(Literal(true, BooleanType), child) =>
child
case Filter(Literal(null, _), child) =>
LocalRelation(child.output)
case Filter(Literal(false, BooleanType), child) =>
LocalRelation(child.output)
}
}

/**
* Pushes [[catalyst.plans.logical.Filter Filter]] operators through
* [[catalyst.plans.logical.Project Project]] operators, in-lining any
Expand Down
38 changes: 38 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,47 @@ class SQLContext(@transient val sparkContext: SparkContext)
TopK ::
PartialAggregation ::
SparkEquiInnerJoin ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil

/**
* Used to build table scan operators where complex projection and filtering are done using
* separate physical operators. This function returns the given scan operator with Project and
* Filter nodes added only when needed. For example, a Project operator is only used when the
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {

val projectSet = projectList.flatMap(_.references).toSet
val filterSet = filterPredicates.flatMap(_.references).toSet
val filterCondition = filterPredicates.reduceLeftOption(And)

// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
// optimization with the current implementation would change the output schema.
// TODO: Decouple final output schema from expression evaluation so this copy can be
// avoided safely.

if (projectList.toSet == projectSet && filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
filterCondition.map(Filter(_, scan)).getOrElse(scan)
} else {
val scan = scanBuilder((projectSet ++ filterSet).toSeq)
Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
}
}
}

@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@
package org.apache.spark.sql
package execution

import org.apache.spark.SparkContext

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.parquet.InsertIntoParquetTable
import org.apache.spark.sql.parquet._

abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

val sparkContext: SparkContext
self: SQLContext#SparkPlanner =>

object SparkEquiInnerJoin extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down Expand Up @@ -170,6 +166,25 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

object ParquetOperations extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// TODO: need to support writing to other types of files. Unify the below code paths.
case logical.WriteToFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, None)
InsertIntoParquetTable(relation, planLater(child))(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child))(sparkContext) :: Nil
case PhysicalOperation(projectList, filters, relation: parquet.ParquetRelation) =>
// TODO: Should be pushing down filters as well.
pruneFilterProject(
projectList,
filters,
ParquetTableScan(_, relation, None)(sparkContext)) :: Nil
case _ => Nil
}
}

// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
// TODO: Set
Expand All @@ -185,14 +200,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
case logical.Project(projectList, r: ParquetRelation)
if projectList.forall(_.isInstanceOf[Attribute]) =>

// simple projection of data loaded from Parquet file
parquet.ParquetTableScan(
projectList.asInstanceOf[Seq[Attribute]],
r,
None)(sparkContext) :: Nil
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
Expand All @@ -216,12 +223,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.ExistingRdd(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
case logical.WriteToFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, None)
InsertIntoParquetTable(relation, planLater(child))(sparkContext) :: Nil
case p: parquet.ParquetRelation =>
parquet.ParquetTableScan(p.output, p, None)(sparkContext) :: Nil
case SparkLogicalPlan(existingPlan) => existingPlan :: Nil
case _ => Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
results
}

// TODO: Move this.

SessionState.start(sessionState)

/**
Expand Down Expand Up @@ -191,8 +189,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

override val strategies: Seq[Strategy] = Seq(
TopK,
ColumnPrunings,
PartitionPrunings,
ParquetOperations,
HiveTableScans,
DataSinks,
Scripts,
Expand All @@ -217,7 +214,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override lazy val optimizedPlan =
optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))

// TODO: We are loosing schema here.
override lazy val toRdd: RDD[Row] =
analyzed match {
case NativeCommand(cmd) =>
Expand Down
113 changes: 11 additions & 102 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ package hive
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.parquet.{InsertIntoParquetTable, ParquetRelation, ParquetTableScan}

trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
Expand All @@ -43,121 +42,31 @@ trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child))(hiveContext.sparkContext) :: Nil
case _ => Nil
}
}

object HiveTableScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Push attributes into table scan when possible.
case p @ logical.Project(projectList, m: MetastoreRelation) if isSimpleProject(projectList) =>
HiveTableScan(projectList.asInstanceOf[Seq[Attribute]], m, None)(hiveContext) :: Nil
case m: MetastoreRelation =>
HiveTableScan(m.output, m, None)(hiveContext) :: Nil
case _ => Nil
}
}

/**
* A strategy used to detect filtering predicates on top of a partitioned relation to help
* partition pruning.
*
* This strategy itself doesn't perform partition pruning, it just collects and combines all the
* partition pruning predicates and pass them down to the underlying [[HiveTableScan]] operator,
* which does the actual pruning work.
* Retrieves data using a HiveTableScan. Partition pruning predicates are also detected and
* applied.
*/
object PartitionPrunings extends Strategy {
object HiveTableScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case p @ FilteredOperation(predicates, relation: MetastoreRelation)
if relation.isPartitioned =>

case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) =>
// Filter out all predicates that only deal with partition keys, these are given to the
// hive table scan operator to be used for partition pruning.
val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet

// Filter out all predicates that only deal with partition keys
val (pruningPredicates, otherPredicates) = predicates.partition {
_.references.map(_.exprId).subsetOf(partitionKeyIds)
}

val scan = HiveTableScan(
relation.output, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)

otherPredicates
.reduceLeftOption(And)
.map(Filter(_, scan))
.getOrElse(scan) :: Nil

case _ =>
Nil
}
}

/**
* A strategy that detects projects and filters over some relation and applies column pruning if
* possible. Partition pruning is applied first if the relation is partitioned.
*/
object ColumnPrunings extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// TODO(andre): the current mix of HiveRelation and ParquetRelation
// here appears artificial; try to refactor to break it into two
case PhysicalOperation(projectList, predicates, relation: BaseRelation) =>
val predicateOpt = predicates.reduceOption(And)
val predicateRefs = predicateOpt.map(_.references).getOrElse(Set.empty)
val projectRefs = projectList.flatMap(_.references)

// To figure out what columns to preserve after column pruning, we need to consider:
//
// 1. Columns referenced by the project list (order preserved)
// 2. Columns referenced by filtering predicates but not by project list
// 3. Relation output
//
// Then the final result is ((1 union 2) intersect 3)
val prunedCols = (projectRefs ++ (predicateRefs -- projectRefs)).intersect(relation.output)

val filteredScans =
if (relation.isPartitioned) { // from here on relation must be a [[MetaStoreRelation]]
// Applies partition pruning first for partitioned table
val filteredRelation = predicateOpt.map(logical.Filter(_, relation)).getOrElse(relation)
PartitionPrunings(filteredRelation).view.map(_.transform {
case scan: HiveTableScan =>
scan.copy(attributes = prunedCols)(hiveContext)
})
} else {
val scan = relation match {
case MetastoreRelation(_, _, _) => {
HiveTableScan(
prunedCols,
relation.asInstanceOf[MetastoreRelation],
None)(hiveContext)
}
case ParquetRelation(_, _) => {
ParquetTableScan(
relation.output,
relation.asInstanceOf[ParquetRelation],
None)(hiveContext.sparkContext)
.pruneColumns(prunedCols)
}
}
predicateOpt.map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
}

if (isSimpleProject(projectList) && prunedCols == projectRefs) {
filteredScans
} else {
filteredScans.view.map(execution.Project(projectList, _))
}

pruneFilterProject(
projectList,
otherPredicates,
HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil
case _ =>
Nil
}
}

/**
* Returns true if `projectList` only performs column pruning and does not evaluate other
* complex expressions.
*/
def isSimpleProject(projectList: Seq[NamedExpression]) = {
projectList.forall(_.isInstanceOf[Attribute])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class PruningSuite extends HiveComparisonTest {
createPruningTest("Column pruning: with partitioned table",
"SELECT key FROM srcpart WHERE ds = '2008-04-08' LIMIT 3",
Seq("key"),
Seq("key", "ds"),
Seq("key"),
Seq(
Seq("2008-04-08", "11"),
Seq("2008-04-08", "12")))
Expand Down Expand Up @@ -97,7 +97,7 @@ class PruningSuite extends HiveComparisonTest {
createPruningTest("Partition pruning: with filter on string partition key",
"SELECT value, hr FROM srcpart1 WHERE ds = '2008-04-08'",
Seq("value", "hr"),
Seq("value", "hr", "ds"),
Seq("value", "hr"),
Seq(
Seq("2008-04-08", "11"),
Seq("2008-04-08", "12")))
Expand All @@ -113,14 +113,14 @@ class PruningSuite extends HiveComparisonTest {
createPruningTest("Partition pruning: left only 1 partition",
"SELECT value, hr FROM srcpart1 WHERE ds = '2008-04-08' AND hr < 12",
Seq("value", "hr"),
Seq("value", "hr", "ds"),
Seq("value", "hr"),
Seq(
Seq("2008-04-08", "11")))

createPruningTest("Partition pruning: all partitions pruned",
"SELECT value, hr FROM srcpart1 WHERE ds = '2014-01-27' AND hr = 11",
Seq("value", "hr"),
Seq("value", "hr", "ds"),
Seq("value", "hr"),
Seq.empty)

createPruningTest("Partition pruning: pruning with both column key and partition key",
Expand All @@ -147,8 +147,8 @@ class PruningSuite extends HiveComparisonTest {
(columnNames, partValues)
}.head

assert(actualOutputColumns sameElements expectedOutputColumns, "Output columns mismatch")
assert(actualScannedColumns sameElements expectedScannedColumns, "Scanned columns mismatch")
assert(actualOutputColumns === expectedOutputColumns, "Output columns mismatch")
assert(actualScannedColumns === expectedScannedColumns, "Scanned columns mismatch")

assert(
actualPartValues.length === expectedPartValues.length,
Expand Down