Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9130563
Squashed commit of the following:
Mar 16, 2016
aa80f9c
Refactor SQLContext etc. to take in ExternalCatalog
Mar 17, 2016
1f1dd00
Attempt to remove old catalog from SessionState
Mar 17, 2016
5daa696
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 17, 2016
71a01e0
Fix style
Mar 17, 2016
9f5154f
Replace all usages of analysis.Catalog
Mar 17, 2016
78cbcbd
Fix tests
Mar 18, 2016
5e16480
Fix tests round 2
Mar 18, 2016
57c8c29
Fix MiMa
Mar 18, 2016
c439280
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 18, 2016
a3c6bf7
Minor fixes
Mar 18, 2016
193d93c
sessionState.sessionCatalog -> sessionState.catalog
Mar 18, 2016
f089e2b
Fix tests round 3 (small round)
Mar 18, 2016
9cd89f8
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 19, 2016
f41346b
Don't bother sessionizing HiveCatalog
Mar 19, 2016
4b37d7a
Fix tests (round 4) - ignored test in CliSuite
Mar 19, 2016
1e72b0a
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 21, 2016
52e0273
Clear temp tables after each suite
Mar 21, 2016
19750d7
Require DB exists before showing tables on them
Mar 21, 2016
561ca3c
Fix tests
Mar 21, 2016
b9de78c
Fix MultiDatabaseSuite
Mar 21, 2016
536cea2
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 22, 2016
4133d3f
Fix HiveUDFSuite + add tests
Mar 22, 2016
159e51c
Fix HiveCompatibilitySuite?
Mar 22, 2016
542283c
Fix CliSuite
Mar 22, 2016
98751cc
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 22, 2016
16a54ba
Fix HiveQuerySuite?
Mar 22, 2016
3439dc2
Ignore new test for now...
Mar 22, 2016
e552558
Fix HiveContextSuite?
Mar 23, 2016
5ea8469
Revert "Fix HiveContextSuite?"
Mar 23, 2016
9519cd8
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 23, 2016
e5f82b2
Use default as the db.
yhuai Mar 23, 2016
c53f483
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 23, 2016
350bd2e
Fix R test?
Mar 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
sessionState.sessionCatalog -> sessionState.catalog
  • Loading branch information
Andrew Or committed Mar 18, 2016
commit 193d93c670538a3fb7b64ea372a42c96d603de03
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
*/
def table(tableName: String): DataFrame = {
Dataset.newDataFrame(sqlContext,
sqlContext.sessionState.sessionCatalog.lookupRelation(
sqlContext.sessionState.catalog.lookupRelation(
sqlContext.sessionState.sqlParser.parseTableIdentifier(tableName)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}

private def saveAsTable(tableIdent: TableIdentifier): Unit = {
val tableExists = df.sqlContext.sessionState.sessionCatalog.tableExists(tableIdent)
val tableExists = df.sqlContext.sessionState.catalog.tableExists(tableIdent)

(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
Expand Down
10 changes: 5 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ class SQLContext private[sql](
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
sessionState.sessionCatalog.createTempTable(tableName, df.logicalPlan, ignoreIfExists = true)
sessionState.catalog.createTempTable(tableName, df.logicalPlan, ignoreIfExists = true)
}

/**
Expand All @@ -711,7 +711,7 @@ class SQLContext private[sql](
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
sessionState.sessionCatalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
}

/**
Expand Down Expand Up @@ -796,7 +796,7 @@ class SQLContext private[sql](
}

private def table(tableIdent: TableIdentifier): DataFrame = {
Dataset.newDataFrame(this, sessionState.sessionCatalog.lookupRelation(tableIdent))
Dataset.newDataFrame(this, sessionState.catalog.lookupRelation(tableIdent))
}

/**
Expand Down Expand Up @@ -838,7 +838,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(): Array[String] = {
tableNames(sessionState.sessionCatalog.getCurrentDatabase)
tableNames(sessionState.catalog.getCurrentDatabase)
}

/**
Expand All @@ -848,7 +848,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(databaseName: String): Array[String] = {
sessionState.sessionCatalog.listTables(databaseName).map(_.unquotedString).toArray
sessionState.catalog.listTables(databaseName).map(_.unquotedString).toArray
}

@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ case class DescribeCommand(
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = sqlContext.sessionState.sessionCatalog.lookupRelation(table)
val relation = sqlContext.sessionState.catalog.lookupRelation(table)
relation.schema.fields.map { field =>
val cmtKey = "comment"
val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else ""
Expand Down Expand Up @@ -339,7 +339,7 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
override def run(sqlContext: SQLContext): Seq[Row] = {
// Since we need to return a Seq of rows, we will call getTables directly
// instead of calling tables in sqlContext.
val catalog = sqlContext.sessionState.sessionCatalog
val catalog = sqlContext.sessionState.catalog
val db = databaseName.getOrElse(catalog.getCurrentDatabase)
val rows = catalog.listTables(db).map { t =>
val isTemp = t.database.isEmpty
Expand Down Expand Up @@ -428,7 +428,7 @@ case class DescribeFunction(
case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
sqlContext.sessionState.sessionCatalog.setCurrentDatabase(databaseName)
sqlContext.sessionState.catalog.setCurrentDatabase(databaseName)
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ case class CreateTempTableUsing(
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
options = options)
sqlContext.sessionState.sessionCatalog.createTempTable(
sqlContext.sessionState.catalog.createTempTable(
tableIdent.table,
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
ignoreIfExists = true)
Expand Down Expand Up @@ -135,7 +135,7 @@ case class CreateTempTableUsingAsSelect(
bucketSpec = None,
options = options)
val result = dataSource.write(mode, df)
sqlContext.sessionState.sessionCatalog.createTempTable(
sqlContext.sessionState.catalog.createTempTable(
tableIdent.table,
Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan,
ignoreIfExists = true)
Expand All @@ -149,11 +149,11 @@ case class RefreshTable(tableIdent: TableIdentifier)

override def run(sqlContext: SQLContext): Seq[Row] = {
// Refresh the given table's metadata first.
sqlContext.sessionState.sessionCatalog.refreshTable(tableIdent)
sqlContext.sessionState.catalog.refreshTable(tableIdent)

// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sqlContext.sessionState.sessionCatalog.lookupRelation(tableIdent)
val logicalPlan = sqlContext.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Internal catalog for managing table and database states.
*/
lazy val sessionCatalog = new SessionCatalog(ctx.externalCatalog, conf)
lazy val catalog = new SessionCatalog(ctx.externalCatalog, conf)

/**
* Internal catalog for managing functions registered by the user.
Expand All @@ -62,14 +62,14 @@ private[sql] class SessionState(ctx: SQLContext) {
* Logical query plan analyzer for resolving unresolved attributes and relations.
*/
lazy val analyzer: Analyzer = {
new Analyzer(sessionCatalog, functionRegistry, conf) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
python.ExtractPythonUDFs ::
PreInsertCastAndRename ::
DataSourceAnalysis ::
(if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)

override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, sessionCatalog))
override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
}

after {
sqlContext.sessionState.sessionCatalog.dropTable(
sqlContext.sessionState.catalog.dropTable(
TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
}

Expand All @@ -46,7 +46,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))

sqlContext.sessionState.sessionCatalog.dropTable(
sqlContext.sessionState.catalog.dropTable(
TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
Expand All @@ -60,7 +60,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))

sqlContext.sessionState.sessionCatalog.dropTable(
sqlContext.sessionState.catalog.dropTable(
TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
}
sqlContext.sessionState.sessionCatalog.dropTable(
sqlContext.sessionState.catalog.dropTable(
TableIdentifier("tmp"), ignoreIfNotExists = true)
}

Expand All @@ -62,7 +62,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
}
sqlContext.sessionState.sessionCatalog.dropTable(
sqlContext.sessionState.catalog.dropTable(
TableIdentifier("tmp"), ignoreIfNotExists = true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[hive] case class CurrentDatabase(ctx: HiveContext)
override def foldable: Boolean = true
override def nullable: Boolean = false
override def eval(input: InternalRow): Any = {
UTF8String.fromString(ctx.sessionState.sessionCatalog.getCurrentDatabase)
UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase)
}
}

Expand Down Expand Up @@ -210,12 +210,12 @@ class HiveContext private[hive](
*/
def refreshTable(tableName: String): Unit = {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
sessionState.sessionCatalog.refreshTable(tableIdent)
sessionState.catalog.refreshTable(tableIdent)
}

protected[hive] def invalidateTable(tableName: String): Unit = {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
sessionState.sessionCatalog.invalidateTable(tableIdent)
sessionState.catalog.invalidateTable(tableIdent)
}

/**
Expand All @@ -229,7 +229,7 @@ class HiveContext private[hive](
*/
def analyze(tableName: String) {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
val relation = EliminateSubqueryAliases(sessionState.sessionCatalog.lookupRelation(tableIdent))
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))

relation match {
case relation: MetastoreRelation =>
Expand Down Expand Up @@ -290,7 +290,7 @@ class HiveContext private[hive](
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
sessionState.sessionCatalog.alterTable(
sessionState.catalog.alterTable(
relation.table.copy(
properties = relation.table.properties +
(StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
case class QualifiedTableName(database: String, name: String)

private def getCurrentDatabase: String = {
hive.sessionState.sessionCatalog.getCurrentDatabase
hive.sessionState.catalog.getCurrentDatabase
}

def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
/**
* Internal catalog for managing table and database states.
*/
override lazy val sessionCatalog = {
override lazy val catalog = {
new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, conf)
}

Expand All @@ -53,17 +53,17 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
* An analyzer that uses the Hive metastore.
*/
override lazy val analyzer: Analyzer = {
new Analyzer(sessionCatalog, functionRegistry, conf) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
sessionCatalog.ParquetConversions ::
sessionCatalog.CreateTables ::
sessionCatalog.PreInsertionCasts ::
catalog.ParquetConversions ::
catalog.CreateTables ::
catalog.PreInsertionCasts ::
python.ExtractPythonUDFs ::
PreInsertCastAndRename ::
DataSourceAnalysis ::
(if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)

override val extendedCheckRules = Seq(PreWriteCheck(conf, sessionCatalog))
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ case class CreateTableAsSelect(
withFormat
}

hiveContext.sessionState.sessionCatalog.createTable(withSchema, ignoreIfExists = false)
hiveContext.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)

// Get the Metastore Relation
hiveContext.sessionState.sessionCatalog.lookupRelation(tableIdentifier) match {
hiveContext.sessionState.catalog.lookupRelation(tableIdentifier) match {
case r: MetastoreRelation => r
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (hiveContext.sessionState.sessionCatalog.tableExists(tableIdentifier)) {
if (hiveContext.sessionState.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[hive] case class CreateViewAsSelect(
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]

hiveContext.sessionState.sessionCatalog.tableExists(tableIdentifier) match {
hiveContext.sessionState.catalog.tableExists(tableIdentifier) match {
case true if allowExisting =>
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ case class DescribeHiveTableCommand(
// For other tables, delegate to DescribeCommand.

// In the future, we will consolidate the two and simply report what the catalog reports.
sqlContext.sessionState.sessionCatalog.lookupRelation(tableId) match {
sqlContext.sessionState.catalog.lookupRelation(tableId) match {
case table: MetastoreRelation =>
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class DropTable(
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.sessionState.sessionCatalog.dropTable(
hiveContext.sessionState.catalog.dropTable(
TableIdentifier(tableName), ignoreIfNotExists = true)
Seq.empty[Row]
}
Expand Down Expand Up @@ -131,7 +131,7 @@ case class CreateMetastoreDataSource(
val tableName = tableIdent.unquotedString
val hiveContext = sqlContext.asInstanceOf[HiveContext]

if (hiveContext.sessionState.sessionCatalog.tableExists(tableIdent)) {
if (hiveContext.sessionState.catalog.tableExists(tableIdent)) {
if (allowExisting) {
return Seq.empty[Row]
} else {
Expand All @@ -144,7 +144,7 @@ case class CreateMetastoreDataSource(
if (!options.contains("path") && managedIfNoPath) {
isExternal = false
options + ("path" ->
hiveContext.sessionState.sessionCatalog.hiveDefaultTableFilePath(tableIdent))
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}
Expand All @@ -157,7 +157,7 @@ case class CreateMetastoreDataSource(
bucketSpec = None,
options = optionsWithPath).resolveRelation()

hiveContext.sessionState.sessionCatalog.createDataSourceTable(
hiveContext.sessionState.catalog.createDataSourceTable(
tableIdent,
userSpecifiedSchema,
Array.empty[String],
Expand Down Expand Up @@ -203,13 +203,13 @@ case class CreateMetastoreDataSourceAsSelect(
if (!options.contains("path")) {
isExternal = false
options + ("path" ->
hiveContext.sessionState.sessionCatalog.hiveDefaultTableFilePath(tableIdent))
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}

var existingSchema = None: Option[StructType]
if (sqlContext.sessionState.sessionCatalog.tableExists(tableIdent)) {
if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
Expand All @@ -234,7 +234,7 @@ case class CreateMetastoreDataSourceAsSelect(
// inserting into (i.e. using the same compression).

EliminateSubqueryAliases(
sqlContext.sessionState.sessionCatalog.lookupRelation(tableIdent)) match {
sqlContext.sessionState.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
existingSchema = Some(l.schema)
case o =>
Expand Down Expand Up @@ -271,7 +271,7 @@ case class CreateMetastoreDataSourceAsSelect(
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
hiveContext.sessionState.sessionCatalog.createDataSourceTable(
hiveContext.sessionState.catalog.createDataSourceTable(
tableIdent,
Some(result.schema),
partitionColumns,
Expand All @@ -282,7 +282,7 @@ case class CreateMetastoreDataSourceAsSelect(
}

// Refresh the cache of the table in the catalog.
hiveContext.sessionState.sessionCatalog.refreshTable(tableIdent)
hiveContext.sessionState.catalog.refreshTable(tableIdent)
Seq.empty[Row]
}
}
Loading