-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-2594][SQL] Add CACHE TABLE <name> AS SELECT ... (Updated as per review comments) #2390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b803fc8
4e858d8
13c8e27
7459ce3
6758f80
eebc0c1
b5276b2
dc33895
aaf5b59
724b9db
e3265d0
bc0bffc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -127,6 +127,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { | |
| protected val SUBSTRING = Keyword("SUBSTRING") | ||
| protected val SQRT = Keyword("SQRT") | ||
| protected val ABS = Keyword("ABS") | ||
| protected val ADD = Keyword("ADD") | ||
|
|
||
| // Use reflection to find the reserved words defined in this class. | ||
| protected val reservedWords = | ||
|
|
@@ -151,7 +152,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { | |
| EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | | ||
| UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } | ||
| ) | ||
| | insert | cache | ||
| | insert | cache | unCache | ||
| ) | ||
|
|
||
| protected lazy val select: Parser[LogicalPlan] = | ||
|
|
@@ -181,11 +182,25 @@ class SqlParser extends StandardTokenParsers with PackratParsers { | |
| val overwrite: Boolean = o.getOrElse("") == "OVERWRITE" | ||
| InsertIntoTable(r, Map[String, Option[String]](), s, overwrite) | ||
| } | ||
|
|
||
| protected lazy val addCache: Parser[LogicalPlan] = | ||
| ADD ~ CACHE ~ TABLE ~> ident ~ AS ~ select <~ opt(";") ^^ { | ||
| case tableName ~ as ~ s => | ||
| CacheTableAsSelectCommand(tableName,s) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be removed, as mentioned by @marmbrus in this comment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will remove it |
||
|
|
||
| protected lazy val cache: Parser[LogicalPlan] = | ||
| (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ { | ||
| case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache) | ||
| CACHE ~ TABLE ~> ident ~ opt(AS) ~ opt(select) <~ opt(";") ^^ { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your comments. Yes,It is better to add as |
||
| case tableName ~ None ~ None => | ||
| CacheCommand(tableName, true) | ||
| case tableName ~ as ~ Some(plan) => | ||
| CacheTableAsSelectCommand(tableName,plan) | ||
| } | ||
|
|
||
| protected lazy val unCache: Parser[LogicalPlan] = | ||
| UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ { | ||
| case tableName => CacheCommand(tableName, false) | ||
| } | ||
|
|
||
| protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,3 +75,8 @@ case class DescribeCommand( | |
| AttributeReference("data_type", StringType, nullable = false)(), | ||
| AttributeReference("comment", StringType, nullable = false)()) | ||
| } | ||
|
|
||
| /** | ||
| * Returned for the "ADD CACHE TABLE tableName AS SELECT .." command. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove "ADD".
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will remove it |
||
| */ | ||
| case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -305,6 +305,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
| Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) | ||
| case logical.CacheCommand(tableName, cache) => | ||
| Seq(execution.CacheCommand(tableName, cache)(context)) | ||
| case logical.CacheTableAsSelectCommand(tableName,plan) => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Space after comma please.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will update |
||
| Seq(execution.CacheTableAsSelectCommand(tableName, plan)) | ||
| case _ => Nil | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -166,3 +166,22 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( | |
| child.output.map(field => Row(field.name, field.dataType.toString, null)) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| */ | ||
| @DeveloperApi | ||
| case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking about merging |
||
| extends LeafNode with Command { | ||
|
|
||
| override protected[sql] lazy val sideEffectResult = { | ||
| sqlContext.catalog.registerTable(None, tableName, sqlContext.executePlan(plan).analyzed) | ||
| sqlContext.cacheTable(tableName) | ||
| // It does the caching eager. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @marmbrus I've seen your comments in SPARK-2594, I'd be OK with both being lazy or eager, but I think this statement should be consistent with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Either way is OK. But eager caching is existed only for this feature, so it may look inconsistent compared to other caching commands. |
||
| sqlContext.table(tableName).count | ||
| Seq.empty[Row] | ||
| } | ||
|
|
||
| override def output: Seq[Attribute] = Seq.empty | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,4 +119,16 @@ class CachedTableSuite extends QueryTest { | |
| } | ||
| assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached") | ||
| } | ||
|
|
||
| test("CACHE TABLE tableName AS SELECT Star Table") { | ||
| TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") | ||
| TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be better to assert
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will add |
||
| TestSQLContext.uncacheTable("testCacheTable") | ||
| } | ||
|
|
||
| test("'CACHE TABLE tableName AS SELECT ..'") { | ||
| TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") | ||
| assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached") | ||
| TestSQLContext.uncacheTable("testCacheTable") | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -214,6 +214,7 @@ private[hive] object HiveQl { | |
| */ | ||
| def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) | ||
|
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this redundant new line.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for trouble. I am new to git,so added accidently |
||
| /** Returns a LogicalPlan for a given HiveQL string. */ | ||
| def parseSql(sql: String): LogicalPlan = { | ||
| try { | ||
|
|
@@ -229,11 +230,17 @@ private[hive] object HiveQl { | |
| SetCommand(Some(key), Some(value)) | ||
| } | ||
| } else if (sql.trim.toLowerCase.startsWith("cache table")) { | ||
| CacheCommand(sql.trim.drop(12).trim, true) | ||
| sql.trim.drop(12).trim.split(" ").toSeq match { | ||
| case Seq(tableName) => | ||
| CacheCommand(tableName, true) | ||
| case Seq(tableName,as, select@_*) => | ||
| CacheTableAsSelectCommand(tableName, | ||
| createPlan(sql.trim.drop(12 + tableName.length() + as.length() + 2))) | ||
| } | ||
| } else if (sql.trim.toLowerCase.startsWith("uncache table")) { | ||
| CacheCommand(sql.trim.drop(14).trim, false) | ||
| } else if (sql.trim.toLowerCase.startsWith("add jar")) { | ||
| AddJar(sql.trim.drop(8).trim) | ||
| NativeCommand(sql) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I guess you probably rebased to an older version of master? We definitely shouldn't revert
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for trouble. I am new to git,so added accidently |
||
| } else if (sql.trim.toLowerCase.startsWith("add file")) { | ||
| AddFile(sql.trim.drop(9)) | ||
| } else if (sql.trim.toLowerCase.startsWith("dfs")) { | ||
|
|
@@ -243,15 +250,7 @@ private[hive] object HiveQl { | |
| } else if (sql.trim.startsWith("!")) { | ||
| ShellCommand(sql.drop(1)) | ||
| } else { | ||
| val tree = getAst(sql) | ||
| if (nativeCommands contains tree.getText) { | ||
| NativeCommand(sql) | ||
| } else { | ||
| nodeToPlan(tree) match { | ||
| case NativePlaceholder => NativeCommand(sql) | ||
| case other => other | ||
| } | ||
| } | ||
| createPlan(sql) | ||
| } | ||
| } catch { | ||
| case e: Exception => throw new ParseException(sql, e) | ||
|
|
@@ -262,6 +261,19 @@ private[hive] object HiveQl { | |
| """.stripMargin) | ||
| } | ||
| } | ||
|
|
||
| /** Creates LogicalPlan for a given HiveQL string. */ | ||
| def createPlan(sql: String) = { | ||
| val tree = getAst(sql) | ||
| if (nativeCommands contains tree.getText) { | ||
| NativeCommand(sql) | ||
| } else { | ||
| nodeToPlan(tree) match { | ||
| case NativePlaceholder => NativeCommand(sql) | ||
| case other => other | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def parseDdl(ddl: String): Seq[Attribute] = { | ||
| val tree = | ||
|
|
@@ -1097,7 +1109,7 @@ private[hive] object HiveQl { | |
|
|
||
| case Token("TOK_FUNCTION", Token(functionName, Nil) :: children) => | ||
| HiveGenericUdtf(functionName, attributes, children.map(nodeToExpr)) | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove these spaces
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok |
||
| case a: ASTNode => | ||
| throw new NotImplementedError( | ||
| s"""No parse rules for ASTNode type: ${a.getType}, text: ${a.getText}, tree: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove it