Skip to content
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
| insert | cache
| insert | cache | unCache
)

protected lazy val select: Parser[LogicalPlan] =
Expand Down Expand Up @@ -183,9 +183,17 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
}

protected lazy val cache: Parser[LogicalPlan] =
(CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ {
case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache)
CACHE ~ TABLE ~> ident ~ opt(AS ~ select) <~ opt(";") ^^ {
case tableName ~ None =>
CacheCommand(tableName, true)
case tableName ~ Some(as ~ plan) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove as ~ if we use opt(AS ~> select) in line 186.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion. I have updated it as per your comment.

CacheTableAsSelectCommand(tableName, plan)
}

protected lazy val unCache: Parser[LogicalPlan] =
UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ {
case tableName => CacheCommand(tableName, false)
}

protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,8 @@ case class DescribeCommand(
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)())
}

/**
* Returned for the "CACHE TABLE tableName AS SELECT .." command.
*/
case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
case logical.CacheCommand(tableName, cache) =>
Seq(execution.CacheCommand(tableName, cache)(context))
case logical.CacheTableAsSelectCommand(tableName, plan) =>
Seq(execution.CacheTableAsSelectCommand(tableName, plan))
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,22 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult = {
sqlContext.catalog.registerTable(None, tableName, sqlContext.executePlan(plan).analyzed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Probably my final comment on this PR :) )

As described in PR #2382, we shouldn't store analyzed logical plan when registering tables any more (see here).

To prevent duplicated code, I'd suggest to import SQLContext._ so that we can leverage the implicit conversion from LogicalPlan to SchemaRDD, and then simply do this:

sqlContext.executePlan(plan).logical.registerTempTable(tableName)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comment. It is a good idea to import sqlContext._. But we can simplify as below code if we import it. Please comment on it.

import sqlContext._
plan.registerTempTable(tableName)
cacheTable(tableName) 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes you're right, we can use plan directly. And instead of importing sqlContext._, I'd import SQLContext._ in the import section at the begin of this file:

import org.apache.spark.sql.SQLContext._

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we cannot use the import org.apache.spark.sql.SQLContext._ at the beginning of file to use implicit. Because there is no object defined for SQLContext and implicits are only part of class SQLContext. We can only use the import on instance like import sqlContext._ Please correct me if I am wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry again, you're right, I mistook sqlContext._ for SparkContext._...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code. Please review

sqlContext.cacheTable(tableName)
// It does the caching eager.
sqlContext.table(tableName).count
Seq.empty[Row]
}

override def output: Seq[Attribute] = Seq.empty

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,17 @@ class CachedTableSuite extends QueryTest {
}
assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached")
}

test("CACHE TABLE tableName AS SELECT Star Table") {
TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect()
assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
TestSQLContext.uncacheTable("testCacheTable")
}

test("'CACHE TABLE tableName AS SELECT ..'") {
TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
TestSQLContext.uncacheTable("testCacheTable")
}
}
31 changes: 21 additions & 10 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,13 @@ private[hive] object HiveQl {
SetCommand(Some(key), Some(value))
}
} else if (sql.trim.toLowerCase.startsWith("cache table")) {
CacheCommand(sql.trim.drop(12).trim, true)
sql.trim.drop(12).trim.split(" ").toSeq match {
case Seq(tableName) =>
CacheCommand(tableName, true)
case Seq(tableName,as, select@_*) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space after ,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will suggest to move the cache table into the function nodeToPlan, just as we did for token TOK_CREATETABLE (CTAS):

  • Parsing the SQL manually probably not the best practice, for example, it will mis-match if more spaces between cache and table.
  • EXPLAIN CACHE TABLE xxx may not work(raise exception), since CACHE TABLE will be considered as the NATIVE COMMAND.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng Added space after , Thank you

@chenghao-intel I can see a difference between CREATE TABLE and CACHE TABLE. Because the later is supported by hive parser but CACHE TABLE is not supported by hive parser.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenghao-intel I agree that currently our HiveQL syntax extension scheme is quite hacky and brittle in Spark SQL... Other commands like SET, ADD JAR and DFS etc. also suffer the same problem. However, I'd like to fix them altogether in a future PR :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @ravipesala @liancheng , let's improve that in the future. :)

CacheTableAsSelectCommand(tableName,
createPlan(sql.trim.drop(12 + tableName.length() + as.length() + 2)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this branch can be simplified as:

case Seq(tableName, _, select @ _*) =>
  CacheTableAsSelectCommand(tableName, createPlan(select.mkString(" ").trim)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Than you for your suggestion. It simplifies well.I have modified it.

}
} else if (sql.trim.toLowerCase.startsWith("uncache table")) {
CacheCommand(sql.trim.drop(14).trim, false)
} else if (sql.trim.toLowerCase.startsWith("add jar")) {
Expand All @@ -243,15 +249,7 @@ private[hive] object HiveQl {
} else if (sql.trim.startsWith("!")) {
ShellCommand(sql.drop(1))
} else {
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
NativeCommand(sql)
} else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other
}
}
createPlan(sql)
}
} catch {
case e: Exception => throw new ParseException(sql, e)
Expand All @@ -262,6 +260,19 @@ private[hive] object HiveQl {
""".stripMargin)
}
}

/** Creates LogicalPlan for a given HiveQL string. */
def createPlan(sql: String) = {
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
NativeCommand(sql)
} else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other
}
}
}

def parseDdl(ddl: String): Seq[Attribute] = {
val tree =
Expand Down