Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
160 commits
Select commit Hold shift + click to select a range
841721e
SPARK-1352: Improve robustness of spark-submit script
pwendell Mar 31, 2014
5731af5
[SQL] Rewrite join implementation to allow streaming of one relation.
marmbrus Mar 31, 2014
33b3c2a
SPARK-1365 [HOTFIX] Fix RateLimitedOutputStream test
pwendell Mar 31, 2014
564f1c1
SPARK-1376. In the yarn-cluster submitter, rename "args" option to "arg"
sryza Apr 1, 2014
94fe7fd
[SPARK-1377] Upgrade Jetty to 8.1.14v20131031
andrewor14 Apr 1, 2014
ada310a
[Hot Fix #42] Persisted RDD disappears on storage page if re-used
andrewor14 Apr 1, 2014
f5c418d
[SQL] SPARK-1372 Support for caching and uncaching tables in a SQLCon…
marmbrus Apr 1, 2014
764353d
[SPARK-1342] Scala 2.10.4
markhamstra Apr 2, 2014
afb5ea6
[Spark-1134] only call ipython if no arguments are given; remove IPYT…
Apr 2, 2014
45df912
Revert "[Spark-1134] only call ipython if no arguments are given; rem…
mateiz Apr 2, 2014
8b3045c
MLI-1 Decision Trees
manishamde Apr 2, 2014
ea9de65
Remove * from test case golden filename.
marmbrus Apr 2, 2014
11973a7
Renamed stageIdToActiveJob to jobIdToActiveJob.
kayousterhout Apr 2, 2014
de8eefa
[SPARK-1385] Use existing code for JSON de/serialization of BlockId
andrewor14 Apr 2, 2014
7823633
Do not re-use objects in the EdgePartition/EdgeTriplet iterators.
darabos Apr 2, 2014
1faa579
[SPARK-1371][WIP] Compression support for Spark SQL in-memory columna…
liancheng Apr 2, 2014
ed730c9
StopAfter / TopK related changes
rxin Apr 2, 2014
9c65fa7
[SPARK-1212, Part II] Support sparse data in MLlib
mengxr Apr 2, 2014
47ebea5
[SQL] SPARK-1364 Improve datatype and test coverage for ScalaReflecti…
marmbrus Apr 3, 2014
92a86b2
[SPARK-1398] Removed findbugs jsr305 dependency
markhamstra Apr 3, 2014
fbebaed
Spark parquet improvements
AndreSchumacher Apr 3, 2014
5d1feda
[SPARK-1360] Add Timestamp Support for SQL
chenghao-intel Apr 3, 2014
c1ea3af
Spark 1162 Implemented takeOrdered in pyspark.
ScrapCodes Apr 3, 2014
b8f5341
[SQL] SPARK-1333 First draft of java API
marmbrus Apr 3, 2014
a599e43
[SPARK-1134] Fix and document passing of arguments to IPython
Apr 3, 2014
d94826b
[BUILD FIX] Fix compilation of Spark SQL Java API.
marmbrus Apr 3, 2014
9231b01
Fix jenkins from giving the green light to builds that don't compile.
marmbrus Apr 3, 2014
33e6361
Revert "[SPARK-1398] Removed findbugs jsr305 dependency"
pwendell Apr 4, 2014
ee6e9e7
SPARK-1337: Application web UI garbage collects newest stages
pwendell Apr 4, 2014
7f32fd4
SPARK-1350. Always use JAVA_HOME to run executor container JVMs.
sryza Apr 4, 2014
01cf4c4
SPARK-1404: Always upgrade spark-env.sh vars to environment vars
aarondav Apr 4, 2014
f1fa617
[SPARK-1133] Add whole text files reader in MLlib
yinxusen Apr 4, 2014
16b8308
SPARK-1375. Additional spark-submit cleanup
sryza Apr 4, 2014
a02b535
Don't create SparkContext in JobProgressListenerSuite.
pwendell Apr 4, 2014
198892f
[SPARK-1198] Allow pipes tasks to run in different sub-directories
tgravescs Apr 5, 2014
d956cc2
[SQL] Minor fixes.
marmbrus Apr 5, 2014
60e18ce
SPARK-1414. Python API for SparkContext.wholeTextFiles
mateiz Apr 5, 2014
5f3c1bb
Add test utility for generating Jar files with compiled classes.
pwendell Apr 5, 2014
1347ebd
[SPARK-1419] Bumped parent POM to apache 14
markhamstra Apr 5, 2014
b50ddfd
SPARK-1305: Support persisting RDD's directly to Tachyon
haoyuan Apr 5, 2014
8de038e
[SQL] SPARK-1366 Consistent sql function across different types of SQ…
marmbrus Apr 5, 2014
0acc7a0
small fix ( proogram -> program )
prabeesh Apr 5, 2014
7c18428
HOTFIX for broken CI, by SPARK-1336
ScrapCodes Apr 5, 2014
2d0150c
Remove the getStageInfo() method from SparkContext.
kayousterhout Apr 5, 2014
6e88583
[SPARK-1371] fix computePreferredLocations signature to not depend on…
Apr 5, 2014
890d63b
Fix for PR #195 for Java 6
srowen Apr 6, 2014
0b85516
SPARK-1421. Make MLlib work on Python 2.6
mateiz Apr 6, 2014
7012ffa
Fix SPARK-1420 The maven build error for Spark Catalyst
witgo Apr 6, 2014
e258e50
[SPARK-1259] Make RDD locally iterable
epahomov Apr 6, 2014
856c50f
SPARK-1387. Update build plugins, avoid plugin version warning, centr…
srowen Apr 7, 2014
7ce52c4
SPARK-1349: spark-shell gets its own command history
aarondav Apr 7, 2014
4106558
SPARK-1314: Use SPARK_HIVE to determine if we include Hive in packaging
aarondav Apr 7, 2014
1440154
SPARK-1154: Clean up app folders in worker nodes
Apr 7, 2014
87d0928
SPARK-1431: Allow merging conflicting pull requests
pwendell Apr 7, 2014
accd099
[SQL] SPARK-1371 Hash Aggregation Improvements
marmbrus Apr 7, 2014
b5bae84
[SQL] SPARK-1427 Fix toString for SchemaRDD NativeCommands.
marmbrus Apr 7, 2014
a3c51c6
SPARK-1432: Make sure that all metadata fields are properly cleaned
Apr 7, 2014
83f2a2f
[sql] Rename Expression.apply to eval for better readability.
rxin Apr 7, 2014
9dd8b91
SPARK-1252. On YARN, use container-log4j.properties for executors
sryza Apr 7, 2014
2a2ca48
HOTFIX: Disable actor input stream test.
pwendell Apr 7, 2014
0307db0
SPARK-1099: Introduce local[*] mode to infer number of cores
aarondav Apr 7, 2014
14c9238
[sql] Rename execution/aggregates.scala Aggregate.scala, and added a …
rxin Apr 8, 2014
55dfd5d
Removed the default eval implementation from Expression, and added a …
rxin Apr 8, 2014
31e6fff
Added eval for Rand (without any support for user-defined seed).
rxin Apr 8, 2014
f27e56a
Change timestamp cast semantics. When cast to numeric types, return t…
rxin Apr 8, 2014
0d0493f
[SPARK-1402] Added 3 more compression schemes
liancheng Apr 8, 2014
11eabbe
[SPARK-1103] Automatic garbage collection of RDD, shuffle and broadca…
tdas Apr 8, 2014
83ac9a4
[SPARK-1331] Added graceful shutdown to Spark Streaming
tdas Apr 8, 2014
6dc5f58
[SPARK-1396] Properly cleanup DAGScheduler on job cancellation.
kayousterhout Apr 8, 2014
3bc0548
Remove extra semicolon in import statement and unused import in Appli…
hsaputra Apr 8, 2014
a8d86b0
SPARK-1348 binding Master, Worker, and App Web UI to all interfaces
kanzhang Apr 8, 2014
e25b593
SPARK-1445: compute-classpath should not print error if lib_managed n…
aarondav Apr 8, 2014
fac6085
[SPARK-1397] Notify SparkListeners when stages fail or are cancelled.
kayousterhout Apr 8, 2014
12c077d
SPARK-1433: Upgrade Mesos dependency to 0.17.0
techaddict Apr 8, 2014
ce8ec54
Spark 1271: Co-Group and Group-By should pass Iterable[X]
holdenk Apr 9, 2014
b9e0c93
[SPARK-1434] [MLLIB] change labelParser from anonymous function to trait
mengxr Apr 9, 2014
fa0524f
Spark-939: allow user jars to take precedence over spark jars
holdenk Apr 9, 2014
9689b66
[SPARK-1390] Refactoring of matrices backed by RDDs
mengxr Apr 9, 2014
87bd1f9
SPARK-1093: Annotate developer and experimental API's
pwendell Apr 9, 2014
bde9cc1
[SPARK-1357] [MLLIB] Annotate developer and experimental APIs
mengxr Apr 9, 2014
eb5f2b6
SPARK-1407 drain event queue before stopping event logger
kanzhang Apr 9, 2014
0adc932
[SPARK-1357 (fix)] remove empty line after :: DeveloperApi/Experiment…
mengxr Apr 10, 2014
8ca3b2b
SPARK-729: Closures not always serialized at capture time
willb Apr 10, 2014
e55cc4b
SPARK-1446: Spark examples should not do a System.exit
techaddict Apr 10, 2014
e6d4a74
Revert "SPARK-729: Closures not always serialized at capture time"
pwendell Apr 10, 2014
a74fbbb
Fix SPARK-1413: Parquet messes up stdout and stdin when used in Spark…
witgo Apr 10, 2014
79820fe
[SPARK-1276] Add a HistoryServer to render persisted UI
andrewor14 Apr 10, 2014
3bd3129
SPARK-1428: MLlib should convert non-float64 NumPy arrays to float64 …
techaddict Apr 10, 2014
7b52b66
Revert "SPARK-1433: Upgrade Mesos dependency to 0.17.0"
pwendell Apr 10, 2014
f046662
Update tuning.md
ash211 Apr 10, 2014
930b70f
Remove Unnecessary Whitespace's
techaddict Apr 10, 2014
f99401a
[SQL] Improve column pruning in the optimizer.
marmbrus Apr 10, 2014
2c55783
SPARK-1202 - Add a "cancel" button in the UI for stages
Apr 11, 2014
5cd11d5
Set spark.executor.uri from environment variable (needed by Mesos)
ivanwick Apr 11, 2014
7b4203a
Add Spark v0.9.1 to ec2 launch script and use it as the default
harveyfeng Apr 11, 2014
44f654e
SPARK-1202: Improvements to task killing in the UI.
pwendell Apr 11, 2014
446bb34
SPARK-1417: Spark on Yarn - spark UI link from resourcemanager is broken
tgravescs Apr 11, 2014
98225a6
Some clean up in build/docs
pwendell Apr 11, 2014
f5ace8d
[SPARK-1225, 1241] [MLLIB] Add AreaUnderCurve and BinaryClassificatio…
mengxr Apr 11, 2014
6a0f8e3
HOTFIX: Ignore python metastore files in RAT checks.
pwendell Apr 11, 2014
7038b00
[FIX] make coalesce test deterministic in RDDSuite
mengxr Apr 12, 2014
fdfb45e
[WIP] [SPARK-1328] Add vector statistics
yinxusen Apr 12, 2014
aa8bb11
Update WindowedDStream.scala
baishuo Apr 12, 2014
165e06a
SPARK-1057 (alternative) Remove fastutil
srowen Apr 12, 2014
6aa08c3
[SPARK-1386] Web UI for Spark Streaming
tdas Apr 12, 2014
c2d160f
[Fix #204] Update out-dated comments
andrewor14 Apr 12, 2014
ca11919
[SPARK-1403] Move the class loader creation back to where it was in 0…
Apr 13, 2014
4bc07ee
SPARK-1480: Clean up use of classloaders
pwendell Apr 13, 2014
037fe4d
[SPARK-1415] Hadoop min split for wholeTextFiles()
yinxusen Apr 13, 2014
7dbca68
[BUGFIX] In-memory columnar storage bug fixes
liancheng Apr 14, 2014
268b535
HOTFIX: Use file name and not paths for excludes
pwendell Apr 14, 2014
0247b5c
SPARK-1488. Resolve scalac feature warnings during build
srowen Apr 15, 2014
c99bcb7
SPARK-1374: PySpark API for SparkSQL
ahirreddy Apr 15, 2014
df36091
SPARK-1426: Make MLlib work with NumPy versions older than 1.7
techaddict Apr 15, 2014
2580a3b
SPARK-1501: Ensure assertions in Graph.apply are asserted.
willb Apr 15, 2014
6843d63
[SPARK-1157][MLlib] L-BFGS Optimizer based on Breeze's implementation.
Apr 15, 2014
07d72fe
Decision Tree documentation for MLlib programming guide
manishamde Apr 15, 2014
5aaf983
SPARK-1455: Better isolation for unit tests.
pwendell Apr 16, 2014
8517911
[FIX] update sbt-idea to version 1.6.0
mengxr Apr 16, 2014
63ca581
[WIP] SPARK-1430: Support sparse data in Python MLlib
mateiz Apr 16, 2014
273c2fd
[SQL] SPARK-1424 Generalize insertIntoTable functions on SchemaRDDs
marmbrus Apr 16, 2014
6a10d80
[SPARK-959] Updated SBT from 0.13.1 to 0.13.2
liancheng Apr 16, 2014
c0273d8
Make "spark logo" link refer to "/".
Apr 16, 2014
fec462c
Loads test tables when running "sbt hive/console" without HIVE_DEV_HOME
liancheng Apr 16, 2014
9edd887
update spark.default.parallelism
CrazyJvm Apr 16, 2014
c3527a3
SPARK-1310: Start adding k-fold cross validation to MLLib [adds kFold…
holdenk Apr 16, 2014
77f8367
SPARK-1497. Fix scalastyle warnings in YARN, Hive code
srowen Apr 16, 2014
82349fb
Minor addition to SPARK-1497
pwendell Apr 16, 2014
e269c24
SPARK-1469: Scheduler mode should accept lower-case definitions and h…
techaddict Apr 16, 2014
725925c
SPARK-1465: Spark compilation is broken with the latest hadoop-2.4.0 …
Apr 16, 2014
10b1c59
[SPARK-1511] use Files.move instead of renameTo in TestUtils.scala
advancedxy Apr 16, 2014
987760e
Add clean to build
pwendell Apr 16, 2014
235a47c
Rebuild routing table after Graph.reverse
ankurdave Apr 17, 2014
17d3234
SPARK-1329: Create pid2vid with correct number of partitions
ankurdave Apr 17, 2014
016a877
remove unnecessary brace and semicolon in 'putBlockInfo.synchronize' …
CrazyJvm Apr 17, 2014
38877cc
Fixing a race condition in event listener unit test
kanzhang Apr 17, 2014
9c40b9e
misleading task number of groupByKey
CrazyJvm Apr 17, 2014
07b7ad3
Update ReducedWindowedDStream.scala
baishuo Apr 17, 2014
d4916a8
Include stack trace for exceptions thrown by user code.
marmbrus Apr 17, 2014
6ad4c54
SPARK-1462: Examples of ML algorithms are using deprecated APIs
techaddict Apr 17, 2014
bb76eae
[python alternative] pyspark require Python2, failing if system defau…
abhishekkr Apr 17, 2014
6904750
[SPARK-1395] Allow "local:" URIs to work on Yarn.
Apr 17, 2014
0058b5d
SPARK-1408 Modify Spark on Yarn to point to the history server when a…
tgravescs Apr 17, 2014
6c746ba
FIX: Don't build Hive in assembly unless running Hive tests.
pwendell Apr 18, 2014
7863ecc
HOTFIX: Ignore streaming UI test
pwendell Apr 18, 2014
e31c8ff
SPARK-1483: Rename minSplits to minPartitions in public APIs
CodingCat Apr 18, 2014
89f4743
Reuses Row object in ExistingRdd.productToRowRdd()
liancheng Apr 18, 2014
aa17f02
[SPARK-1520] remove fastutil from dependencies
mengxr Apr 18, 2014
8aa1f4c
SPARK-1357 (addendum). More Experimental items in MLlib
srowen Apr 18, 2014
3c7a9ba
SPARK-1523: improve the readability of code in AkkaUtil
CodingCat Apr 18, 2014
81a152c
Fixed broken pyspark shell.
rxin Apr 18, 2014
c399baa
SPARK-1456 Remove view bounds on Ordered in favor of a context bound …
marmbrus Apr 18, 2014
2089e0e
SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and s…
zsxwing Apr 19, 2014
28238c8
README update
rxin Apr 19, 2014
5d0f58b
Use scala deprecation instead of java.
marmbrus Apr 19, 2014
10d0421
Add insertInto and saveAsTable to Python API.
marmbrus Apr 19, 2014
25fc318
[SPARK-1535] ALS: Avoid the garbage-creating ctor of DoubleMatrix
tmyklebu Apr 19, 2014
3a390bf
REPL cleanup.
marmbrus Apr 20, 2014
42238b6
Fix org.scala-lang: * inconsistent versions for maven
witgo Apr 21, 2014
b434ec0
remove exclusion scalap
witgo Apr 21, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SQL] SPARK-1424 Generalize insertIntoTable functions on SchemaRDDs
This makes it possible to create tables and insert into them using the DSL and SQL for the scala and java apis.

Author: Michael Armbrust <[email protected]>

Closes #354 from marmbrus/insertIntoTable and squashes the following commits:

6c6f227 [Michael Armbrust] Create random temporary files in python parquet unit tests.
f5e6d5c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into insertIntoTable
765c506 [Michael Armbrust] Add to JavaAPI.
77b512c [Michael Armbrust] typos.
5c3ef95 [Michael Armbrust] use names for boolean args.
882afdf [Michael Armbrust] Change createTableAs to saveAsTable.  Clean up api annotations.
d07d94b [Michael Armbrust] Add tests, support for creating parquet files and hive tables.
fa3fe81 [Michael Armbrust] Make insertInto available on JavaSchemaRDD as well.  Add createTableAs function.
  • Loading branch information
marmbrus authored and rxin committed Apr 16, 2014
commit 273c2fd08deb49e970ec471c857dcf0b2953f922
14 changes: 10 additions & 4 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,12 @@ def parquetFile(self, path):
"""
Loads a Parquet file, returning the result as a L{SchemaRDD}.

>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.saveAsParquetFile("/tmp/tmp.parquet")
>>> srdd2 = sqlCtx.parquetFile("/tmp/tmp.parquet")
>>> srdd.saveAsParquetFile(parquetFile)
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
>>> srdd.collect() == srdd2.collect()
True
"""
Expand Down Expand Up @@ -278,9 +281,12 @@ def saveAsParquetFile(self, path):
that are written out using this method can be read back in as a SchemaRDD using the
L{SQLContext.parquetFile} method.

>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.saveAsParquetFile("/tmp/test.parquet")
>>> srdd2 = sqlCtx.parquetFile("/tmp/test.parquet")
>>> srdd.saveAsParquetFile(parquetFile)
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
>>> srdd2.collect() == srdd.collect()
True
"""
Expand Down
57 changes: 50 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@ package org.apache.spark.sql
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD

import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl
import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies

import org.apache.spark.sql.parquet.ParquetRelation

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -65,12 +73,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
new this.QueryExecution { val logical = plan }

/**
* :: Experimental ::
* :: DeveloperApi ::
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
* interface is considered internal, and thus not guranteed to be stable. As a result, using
* them directly is not reccomended.
* interface is considered internal, and thus not guaranteed to be stable. As a result, using
* them directly is not recommended.
*/
@Experimental
@DeveloperApi
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)

/**
Expand All @@ -89,6 +97,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation(path))

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
* This registered table can be used as the target of future `insertInto` operations.
*
* {{{
* val sqlContext = new SQLContext(...)
* import sqlContext._
*
* case class Person(name: String, age: Int)
* createParquetFile[Person]("path/to/file.parquet").registerAsTable("people")
* sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
* @tparam A A case class type that describes the desired schema of the parquet file to be
* created.
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
* @param conf A Hadoop configuration object that can be used to specify options to the parquet
* output format.
*
* @group userf
*/
@Experimental
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): SchemaRDD = {
new SchemaRDD(
this,
ParquetRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf))
}

/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
Expand Down Expand Up @@ -208,9 +249,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}

/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
@DeveloperApi
protected abstract class QueryExecution {
def logical: LogicalPlan

Expand All @@ -231,7 +274,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
override def toString: String =
s"""== Logical Plan ==
|${stringOrError(analyzed)}
|== Optimized Logical Plan
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
Expand Down
28 changes: 5 additions & 23 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import net.razorvine.pickle.Pickler

import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.annotation.{AlphaComponent, Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -83,8 +83,6 @@ import java.util.{Map => JMap}
* rdd.where('key === 1).orderBy('value.asc).select('key).collect()
* }}}
*
* @todo There is currently no support for creating SchemaRDDs from either Java or Python RDDs.
*
* @groupname Query Language Integrated Queries
* @groupdesc Query Functions that create new queries from SchemaRDDs. The
* result of all query functions is also a SchemaRDD, allowing multiple operations to be
Expand Down Expand Up @@ -276,8 +274,8 @@ class SchemaRDD(
* an `OUTER JOIN` in SQL. When no output rows are produced by the generator for a
* given row, a single row will be output, with `NULL` values for each of the
* generated columns.
* @param alias an optional alias that can be used as qualif for the attributes that are produced
* by this generate operation.
* @param alias an optional alias that can be used as qualifier for the attributes that are
* produced by this generate operation.
*
* @group Query
*/
Expand All @@ -290,29 +288,13 @@ class SchemaRDD(
new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan))

/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table. Note in a standard [[SQLContext]] there is
* no notion of persistent tables, and thus queries that contain this operator will fail to
* optimize. When working with an extension of a SQLContext that has a persistent catalog, such
* as a `HiveContext`, this operation will result in insertions to the table specified.
* Returns this RDD as a SchemaRDD. Intended primarily to force the invocation of the implicit
* conversion from a standard RDD to a SchemaRDD.
*
* @group schema
*/
@Experimental
def insertInto(tableName: String, overwrite: Boolean = false) =
new SchemaRDD(
sqlContext,
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite))

/**
* Returns this RDD as a SchemaRDD.
* @group schema
*/
def toSchemaRDD = this

/** FOR INTERNAL USE ONLY */
def analyze = sqlContext.analyzer(logicalPlan)

private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
this.mapPartitions { iter =>
Expand Down
59 changes: 54 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._

/**
Expand All @@ -29,14 +31,24 @@ trait SchemaRDDLike {
private[sql] def baseSchemaRDD: SchemaRDD

/**
* :: DeveloperApi ::
* A lazily computed query execution workflow. All other RDD operations are passed
* through to the RDD that is produced by this workflow.
* through to the RDD that is produced by this workflow. This workflow is produced lazily because
* invoking the whole query optimization pipeline can be expensive.
*
* We want this to be lazy because invoking the whole query optimization pipeline can be
* expensive.
* The query execution is considered a Developer API as phases may be added or removed in future
* releases. This execution is only exposed to provide an interface for inspecting the various
* phases for debugging purposes. Applications should not depend on particular phases existing
* or producing any specific output, even for exactly the same query.
*
* Additionally, the RDD exposed by this execution is not designed for consumption by end users.
* In particular, it does not contain any schema information, and it reuses Row objects
* internally. This object reuse improves performance, but can make programming against the RDD
* more difficult. Instead end users should perform RDD operations on a SchemaRDD directly.
*/
@transient
protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
@DeveloperApi
lazy val queryExecution = sqlContext.executePlan(logicalPlan)

override def toString =
s"""${super.toString}
Expand All @@ -45,7 +57,8 @@ trait SchemaRDDLike {

/**
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
* are written out using this method can be read back in as a SchemaRDD using the ``function
* are written out using this method can be read back in as a SchemaRDD using the `parquetFile`
* function.
*
* @group schema
*/
Expand All @@ -62,4 +75,40 @@ trait SchemaRDDLike {
def registerAsTable(tableName: String): Unit = {
sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)
}

/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
*
* @group schema
*/
@Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit =
sqlContext.executePlan(
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd

/**
* :: Experimental ::
* Appends the rows from this RDD to the specified table.
*
* @group schema
*/
@Experimental
def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)

/**
* :: Experimental ::
* Creates a table from the the contents of this SchemaRDD. This will fail if the table already
* exists.
*
* Note that this currently only works with SchemaRDDs that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* @group schema
*/
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.sql.api.java

import java.beans.{Introspector, PropertyDescriptor}
import java.beans.Introspector

import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
Expand All @@ -45,29 +48,42 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
result
}

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
* a table. This registered table can be used as the target of future insertInto` operations.
*
* {{{
* JavaSQLContext sqlCtx = new JavaSQLContext(...)
*
* sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerAsTable("people")
* sqlCtx.sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
* @param beanClass A java bean class object that will be used to determine the schema of the
* parquet file. s
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
* @param conf A Hadoop configuration object that can be used to specific options to the parquet
* output format.
*/
@Experimental
def createParquetFile(
beanClass: Class[_],
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): JavaSchemaRDD = {
new JavaSchemaRDD(
sqlContext,
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf))
}

/**
* Applies a schema to an RDD of Java Beans.
*/
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
val beanInfo = Introspector.getBeanInfo(beanClass)

val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
val schema = fields.map { property =>
val dataType = property.getPropertyType match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
case c: Class[_] if c == java.lang.Long.TYPE => LongType
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
}

AttributeReference(property.getName, dataType, true)()
}

val schema = getSchema(beanClass)
val className = beanClass.getCanonicalName
val rowRdd = rdd.rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
Expand Down Expand Up @@ -97,4 +113,26 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
}

/** Returns a Catalyst Schema for the given java bean class. */
protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
val beanInfo = Introspector.getBeanInfo(beanClass)

val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
fields.map { property =>
val dataType = property.getPropertyType match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
case c: Class[_] if c == java.lang.Long.TYPE => LongType
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
}
// TODO: Nullability could be stricter.
AttributeReference(property.getName, dataType, nullable = true)()
}
}
}
Loading