Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
82fd38d
[SPARK-5200] Disable web UI in Hive ThriftServer tests
JoshRosen Jan 12, 2015
ef9224e
[SPARK-5102][Core]subclass of MapStatus needs to be registered with Kryo
lianhuiwang Jan 12, 2015
13e610b
SPARK-4159 [BUILD] Addendum: improve running of single test after ena…
srowen Jan 12, 2015
a3978f3
[SPARK-5078] Optionally read from SPARK_LOCAL_HOSTNAME
marmbrus Jan 12, 2015
aff49a3
SPARK-5172 [BUILD] spark-examples-***.jar shades a wrong Hadoop distr…
srowen Jan 12, 2015
3aed305
[SPARK-4999][Streaming] Change storeInBlockManager to false by default
jerryshao Jan 12, 2015
5d9fa55
[SPARK-5049][SQL] Fix ordering of partition columns in ParquetTableScan
marmbrus Jan 12, 2015
1e42e96
[SPARK-5138][SQL] Ensure schema can be inferred from a namedtuple
mulby Jan 13, 2015
f7741a9
[SPARK-5006][Deploy]spark.port.maxRetries doesn't work
WangTaoTheTonic Jan 13, 2015
9dea64e
[SPARK-4697][YARN]System properties should override environment varia…
WangTaoTheTonic Jan 13, 2015
39e333e
[SPARK-5131][Streaming][DOC]: There is a discrepancy in WAL implement…
uncleGen Jan 13, 2015
8ead999
[SPARK-5223] [MLlib] [PySpark] fix MapConverter and ListConverter in …
Jan 13, 2015
6463e0b
[SPARK-4912][SQL] Persistent tables for the Spark SQL data sources api
yhuai Jan 13, 2015
14e3f11
[SPARK-5168] Make SQLConf a field rather than mixin in SQLContext
rxin Jan 13, 2015
f996909
[SPARK-5123][SQL] Reconcile Java/Scala API for data types.
rxin Jan 14, 2015
d5eeb35
[SPARK-5167][SQL] Move Row into sql package and make it usable for Java.
rxin Jan 14, 2015
a3f7421
[SPARK-5248] [SQL] move sql.types.decimal.Decimal to sql.types.Decimal
adrian-wang Jan 14, 2015
81f72a0
[SPARK-5211][SQL]Restore HiveMetastoreTypes.toDataType
yhuai Jan 14, 2015
38bdc99
[SQL] some comments fix for GROUPING SETS
adrian-wang Jan 14, 2015
5840f54
[SPARK-2909] [MLlib] [PySpark] SparseVector in pyspark now supports i…
MechCoder Jan 14, 2015
9d4449c
[SPARK-5228][WebUI] Hide tables for "Active Jobs/Completed Jobs/Faile…
sarutak Jan 14, 2015
259936b
[SPARK-4014] Add TaskContext.attemptNumber and deprecate TaskContext.…
JoshRosen Jan 14, 2015
2fd7f72
[SPARK-5235] Make SQLConf Serializable
alexbaretta Jan 14, 2015
76389c5
[SPARK-5234][ml]examples for ml don't have sparkContext.stop
Jan 14, 2015
13d2406
[SPARK-5254][MLLIB] Update the user guide to position spark.ml better
mengxr Jan 15, 2015
cfa397c
[SPARK-5193][SQL] Tighten up SQLContext API
rxin Jan 15, 2015
6abc45e
[SPARK-5254][MLLIB] remove developers section from spark.ml guide
mengxr Jan 15, 2015
4b325c7
[SPARK-5193][SQL] Tighten up HiveContext API
rxin Jan 15, 2015
3c8650c
[SPARK-5224] [PySpark] improve performance of parallelize list/ndarray
Jan 15, 2015
1881431
[SPARK-5274][SQL] Reconcile Java and Scala UDFRegistration.
rxin Jan 16, 2015
65858ba
[Minor] Fix tiny typo in BlockManager
sarutak Jan 16, 2015
96c2c71
[SPARK-4857] [CORE] Adds Executor membership events to SparkListener
Jan 16, 2015
a79a9f9
[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
Jan 16, 2015
2be82b1
[SPARK-1507][YARN]specify # cores for ApplicationMaster
WangTaoTheTonic Jan 16, 2015
e200ac8
[SPARK-5201][CORE] deal with int overflow in the ParallelCollectionRD…
advancedxy Jan 16, 2015
f6b852a
[DOCS] Fix typo in return type of cogroup
srowen Jan 16, 2015
e8422c5
[SPARK-5231][WebUI] History Server shows wrong job submission time.
sarutak Jan 16, 2015
ecf943d
[WebUI] Fix collapse of WebUI layout
sarutak Jan 16, 2015
d05c9ee
[SPARK-4923][REPL] Add Developer API to REPL to allow re-publishing t…
Jan 16, 2015
fd3a8a1
[SPARK-733] Add documentation on use of accumulators in lazy transfor…
Jan 16, 2015
ee1c1f3
[SPARK-4937][SQL] Adding optimization to simplify the And, Or condit…
scwf Jan 16, 2015
61b427d
[SPARK-5193][SQL] Remove Spark SQL Java-specific API.
rxin Jan 17, 2015
f3bfc76
[SQL][minor] Improved Row documentation.
rxin Jan 17, 2015
c1f3c27
[SPARK-4937][SQL] Comment for the newly optimization rules in `Boolea…
scwf Jan 17, 2015
6999910
[SPARK-5096] Use sbt tasks instead of vals to get hadoop version
marmbrus Jan 18, 2015
e7884bc
[SQL][Minor] Added comments and examples to explain BooleanSimplifica…
rxin Jan 18, 2015
e12b5b6
MAINTENANCE: Automated closing of pull requests.
pwendell Jan 18, 2015
ad16da1
[HOTFIX]: Minor clean up regarding skipped artifacts in build files.
pwendell Jan 18, 2015
1727e08
[SPARK-5279][SQL] Use java.math.BigDecimal as the exposed Decimal type.
rxin Jan 18, 2015
1a200a3
[SQL][Minor] Update sql doc according to data type APIs changes
scwf Jan 18, 2015
1955645
[SQL][minor] Put DataTypes.java in java dir.
rxin Jan 19, 2015
7dbf1fd
[SQL] fix typo in class description
Jan 19, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-4912][SQL] Persistent tables for the Spark SQL data sources api
With changes in this PR, users can persist metadata of tables created based on the data source API in metastore through DDLs.

Author: Yin Huai <[email protected]>
Author: Michael Armbrust <[email protected]>

Closes apache#3960 from yhuai/persistantTablesWithSchema2 and squashes the following commits:

069c235 [Yin Huai] Make exception messages user friendly.
c07cbc6 [Yin Huai] Get the location of test file in a correct way.
4456e98 [Yin Huai] Test data.
5315dfc [Yin Huai] rxin's comments.
7fc4b56 [Yin Huai] Add DDLStrategy and HiveDDLStrategy to plan DDLs based on the data source API.
aeaf4b3 [Yin Huai] Add comments.
06f9b0c [Yin Huai] Revert unnecessary changes.
feb88aa [Yin Huai] Merge remote-tracking branch 'apache/master' into persistantTablesWithSchema2
172db80 [Yin Huai] Fix unit test.
49bf1ac [Yin Huai] Unit tests.
8f8f1a1 [Yin Huai] [SPARK-4574][SQL] Adding support for defining schema in foreign DDL commands. apache#3431
f47fda1 [Yin Huai] Unit tests.
2b59723 [Michael Armbrust] Set external when creating tables
c00bb1b [Michael Armbrust] Don't use reflection to read options
1ea6e7b [Michael Armbrust] Don't fail when trying to uncache a table that doesn't exist
6edc710 [Michael Armbrust] Add tests.
d7da491 [Michael Armbrust] First draft of persistent tables.
  • Loading branch information
yhuai authored and marmbrus committed Jan 13, 2015
commit 6463e0b9e8067cce70602c5c9006a2546856a9d6
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def strategies: Seq[Strategy] =
extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
Expand Down Expand Up @@ -310,4 +311,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case _ => Nil
}
}

object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) =>
ExecutedCommand(
CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil

case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")

case _ => Nil
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ case class DescribeCommand(
override val output: Seq[Attribute]) extends RunnableCommand {

override def run(sqlContext: SQLContext) = {
Row("# Registered as a temporary table", null, null) +:
child.output.map(field => Row(field.name, field.dataType.toString, null))
child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}
53 changes: 37 additions & 16 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,21 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val ddl: Parser[LogicalPlan] = createTable

/**
* `CREATE TEMPORARY TABLE avroTable
* `CREATE [TEMPORARY] TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...)
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
*/
protected lazy val createTable: Parser[LogicalPlan] =
(
CREATE ~ TEMPORARY ~ TABLE ~> ident
(CREATE ~> TEMPORARY.? <~ TABLE) ~ ident
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ columns ~ provider ~ opts =>
case temp ~ tableName ~ columns ~ provider ~ opts =>
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts)
}
)

Expand Down Expand Up @@ -175,13 +175,12 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
primitiveType
}

private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext) = {
object ResolvedDataSource {
def apply(
sqlContext: SQLContext,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]): ResolvedDataSource = {
val loader = Utils.getContextOrSparkClassLoader
val clazz: Class[_] = try loader.loadClass(provider) catch {
case cnf: java.lang.ClassNotFoundException =>
Expand All @@ -199,22 +198,44 @@ private[sql] case class CreateTableUsing(
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
case _ =>
sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.")
sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.")
}
}
case None => {
clazz.newInstance match {
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options))
case _ =>
sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.")
sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.")
}
}
}

sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
new ResolvedDataSource(clazz, relation)
}
}

private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)

private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
temporary: Boolean,
options: Map[String, String]) extends Command

private [sql] case class CreateTempTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext) = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)

sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
Seq.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with a USING clause specified, this interface is used to
* pass in the parameters specified by a user.
* Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented
* RelationProvider), this interface is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
Expand All @@ -46,17 +46,22 @@ trait RelationProvider {

/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with
* 1. USING clause: to specify the implemented SchemaRelationProvider
* 2. User defined schema: users can define schema optionally when create table
* Implemented by objects that produce relations for a specific kind of data source
* with a given schema. When Spark SQL is given a DDL operation with a USING clause specified (
* to specify the implemented SchemaRelationProvider) and a user defined schema, this interface
* is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
*
* The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that
* users need to provide a schema when using a SchemaRelationProvider.
* A relation provider can inherits both [[RelationProvider]] and [[SchemaRelationProvider]]
* if it can support both schema inference and user-specified schemas.
*/
@DeveloperApi
trait SchemaRelationProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,34 @@ class TableScanSuite extends DataSourceTest {
sql("SELECT * FROM oneToTenDef"),
(1 to 10).map(Row(_)).toSeq)
}

test("exceptions") {
// Make sure we do throw correct exception when users use a relation provider that
// only implements the RelationProvier or the SchemaRelationProvider.
val schemaNotAllowed = intercept[Exception] {
sql(
"""
|CREATE TEMPORARY TABLE relationProvierWithSchema (i int)
|USING org.apache.spark.sql.sources.SimpleScanSource
|OPTIONS (
| From '1',
| To '10'
|)
""".stripMargin)
}
assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))

val schemaNeeded = intercept[Exception] {
sql(
"""
|CREATE TEMPORARY TABLE schemaRelationProvierWithoutSchema
|USING org.apache.spark.sql.sources.AllDataTypesScanSource
|OPTIONS (
| From '1',
| To '10'
|)
""".stripMargin)
}
assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}

def refreshTable(tableName: String): Unit = {
// TODO: Database support...
catalog.refreshTable("default", tableName)
}

protected[hive] def invalidateTable(tableName: String): Unit = {
// TODO: Database support...
catalog.invalidateTable("default", tableName)
}

/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
Expand Down Expand Up @@ -340,6 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
DataSourceStrategy,
HiveCommandStrategy(self),
HiveDDLStrategy,
DDLStrategy,
TakeOrdered,
ParquetOperations,
InMemoryScans,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.{List => JList}

import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}

import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
import org.apache.hadoop.hive.ql.metadata.InvalidTableException
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
Expand All @@ -39,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.util.Utils

/* Implicit conversions */
Expand All @@ -50,8 +52,76 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)

// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String) {
def toLowerCase = QualifiedTableName(database.toLowerCase, name.toLowerCase)
}

/** A cache of Spark SQL data source tables that have been accessed. */
protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
val table = client.getTable(in.database, in.name)
val schemaString = table.getProperty("spark.sql.sources.schema")
val userSpecifiedSchema =
if (schemaString == null) {
None
} else {
Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
}
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap

val resolvedRelation =
ResolvedDataSource(
hive,
userSpecifiedSchema,
table.getProperty("spark.sql.sources.provider"),
options)

LogicalRelation(resolvedRelation.relation)
}
}

CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
}

def refreshTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
}

def invalidateTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
}

val caseSensitive: Boolean = false

def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) = {
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
val tbl = new Table(dbName, tblName)

tbl.setProperty("spark.sql.sources.provider", provider)
if (userSpecifiedSchema.isDefined) {
tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
}
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }

tbl.setProperty("EXTERNAL", "TRUE")
tbl.setTableType(TableType.EXTERNAL_TABLE)

// create the table
synchronized {
client.createTable(tbl, false)
}
}

def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
Expand All @@ -72,7 +142,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
val table = client.getTable(databaseName, tblName)
if (table.isView) {

if (table.getProperty("spark.sql.sources.provider") != null) {
cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
} else if (table.isView) {
// if the unresolved relation is from hive view
// parse the text into logic node.
HiveQl.createPlanForView(table, alias)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.CreateTableUsing
import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -208,6 +209,16 @@ private[hive] trait HiveStrategies {
}
}

object HiveDDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
ExecutedCommand(
CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil

case _ => Nil
}
}

case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case describe: DescribeCommand =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {

clearCache()
loadedTables.clear()
catalog.cachedDataSourceTables.invalidateAll()
catalog.client.getAllTables("default").foreach { t =>
logDebug(s"Deleting table $t")
val table = catalog.client.getTable("default", t)
Expand Down
Loading