Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
draft
  • Loading branch information
scwf committed Apr 27, 2015
commit b40ad71e66046455edb1ac01ef74855b6dd43436
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[hive] class ExtendedHiveQlParser extends AbstractSparkSQLParser {

protected lazy val hiveQl: Parser[LogicalPlan] =
restInput ^^ {
case statement => HiveQl.createPlan(statement.trim)
case statement => HiveQlConverter.createPlan(statement.trim)
}

protected lazy val dfs: Parser[LogicalPlan] =
Expand Down
106 changes: 106 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveASTToken.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

private[hive] object HiveASTToken {
protected val nativeCommands = Seq(
"TOK_ALTERDATABASE_OWNER",
"TOK_ALTERDATABASE_PROPERTIES",
"TOK_ALTERINDEX_PROPERTIES",
"TOK_ALTERINDEX_REBUILD",
"TOK_ALTERTABLE_ADDCOLS",
"TOK_ALTERTABLE_ADDPARTS",
"TOK_ALTERTABLE_ALTERPARTS",
"TOK_ALTERTABLE_ARCHIVE",
"TOK_ALTERTABLE_CLUSTER_SORT",
"TOK_ALTERTABLE_DROPPARTS",
"TOK_ALTERTABLE_PARTITION",
"TOK_ALTERTABLE_PROPERTIES",
"TOK_ALTERTABLE_RENAME",
"TOK_ALTERTABLE_RENAMECOL",
"TOK_ALTERTABLE_REPLACECOLS",
"TOK_ALTERTABLE_SKEWED",
"TOK_ALTERTABLE_TOUCH",
"TOK_ALTERTABLE_UNARCHIVE",
"TOK_ALTERVIEW_ADDPARTS",
"TOK_ALTERVIEW_AS",
"TOK_ALTERVIEW_DROPPARTS",
"TOK_ALTERVIEW_PROPERTIES",
"TOK_ALTERVIEW_RENAME",

"TOK_CREATEDATABASE",
"TOK_CREATEFUNCTION",
"TOK_CREATEINDEX",
"TOK_CREATEROLE",
"TOK_CREATEVIEW",

"TOK_DESCDATABASE",
"TOK_DESCFUNCTION",

"TOK_DROPDATABASE",
"TOK_DROPFUNCTION",
"TOK_DROPINDEX",
"TOK_DROPROLE",
"TOK_DROPTABLE_PROPERTIES",
"TOK_DROPVIEW",
"TOK_DROPVIEW_PROPERTIES",

"TOK_EXPORT",

"TOK_GRANT",
"TOK_GRANT_ROLE",

"TOK_IMPORT",

"TOK_LOAD",

"TOK_LOCKTABLE",

"TOK_MSCK",

"TOK_REVOKE",

"TOK_SHOW_COMPACTIONS",
"TOK_SHOW_CREATETABLE",
"TOK_SHOW_GRANT",
"TOK_SHOW_ROLE_GRANT",
"TOK_SHOW_ROLE_PRINCIPALS",
"TOK_SHOW_ROLES",
"TOK_SHOW_SET_ROLE",
"TOK_SHOW_TABLESTATUS",
"TOK_SHOW_TBLPROPERTIES",
"TOK_SHOW_TRANSACTIONS",
"TOK_SHOWCOLUMNS",
"TOK_SHOWDATABASES",
"TOK_SHOWFUNCTIONS",
"TOK_SHOWINDEXES",
"TOK_SHOWLOCKS",
"TOK_SHOWPARTITIONS",

"TOK_SWITCHDATABASE",

"TOK_UNLOCKTABLE"
)

// Commands that we do not need to explain.
protected val noExplainCommands = Seq(
"TOK_DESCTABLE",
"TOK_SHOWTABLES",
"TOK_TRUNCATETABLE" // truncate table" is a NativeCommand, does not need to explain.
) ++ nativeCommands
}
59 changes: 30 additions & 29 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,45 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[sql] def convertCTAS: Boolean =
getConf("spark.sql.hive.convertCTAS", "false").toBoolean

override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)
/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reorder to make catalog, functionRegistry, analyzer, sqlParser togethor


// Note that HiveUDFs will be overridden by functions registered in this context.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need this, since if we override sqlParser, we can inherited from sqlcontext the ddlParser

  protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

@transient
protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_))
override protected[sql] lazy val functionRegistry =
new HiveFunctionRegistry with OverrideFunctionRegistry {
def caseSensitive: Boolean = false
}

/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.CreateTables ::
catalog.PreInsertionCasts ::
ExtractPythonUdfs ::
sources.PreInsertCastAndRename ::
Nil
}

override protected[sql] val sqlParser = {
val fallback = new ExtendedHiveQlParser
new SparkSQLParser(fallback.parse(_))
}

override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)

override def sql(sqlText: String): DataFrame = {
val substituted = new VariableSubstitution().substitute(hiveconf, sqlText)
// TODO: Create a framework for registering parsers instead of just hardcoding if statements.
if (conf.dialect == "sql") {
super.sql(substituted)
} else if (conf.dialect == "hiveql") {
val ddlPlan = ddlParserWithHiveQL.parse(sqlText, exceptionOnError = false)
DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted)))
DataFrame(this, parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
}
Expand Down Expand Up @@ -229,30 +254,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
runSqlHive(s"SET $key=$value")
}

/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog

// Note that HiveUDFs will be overridden by functions registered in this context.
@transient
override protected[sql] lazy val functionRegistry =
new HiveFunctionRegistry with OverrideFunctionRegistry {
def caseSensitive: Boolean = false
}

/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.CreateTables ::
catalog.PreInsertionCasts ::
ExtractPythonUdfs ::
sources.PreInsertCastAndRename ::
Nil
}

override protected[sql] def createSession(): SQLSession = {
new this.SQLSession()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
} else if (table.isView) {
// if the unresolved relation is from hive view
// parse the text into logic node.
HiveQl.createPlanForView(table, alias)
createPlanForView(table, alias)
} else {
val partitions: Seq[Partition] =
if (table.isPartitioned) {
Expand All @@ -234,6 +234,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}

/** Creates LogicalPlan for a given VIEW */
private def createPlanForView(view: Table, alias: Option[String]): Subquery = alias match {
// because hive use things like `_c0` to build the expanded text
// currently we cannot support view from "create view v1(c1) as ..."
case None => Subquery(view.getTableName, createPlan(view.getViewExpandedText))
case Some(aliasText) => Subquery(aliasText, createPlan(view.getViewExpandedText))
}

private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
Expand Down
Loading