Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
gatorsmile committed Nov 4, 2016
commit 4dbd3b6f328ec4d686331c31bba340192ddf91df
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class Analyzer(
case u : UnresolvedRelation =>
val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
.map(_._2).map { relation =>
val withAlias = u.alias.map(SubqueryAlias(_, relation, None)())
val withAlias = u.alias.map(SubqueryAlias(_, relation, None))
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,16 +553,16 @@ class SessionCatalog(
val relationAlias = alias.getOrElse(table)
if (db == globalTempViewManager.database) {
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))(isGeneratedByTempTable = true)
SubqueryAlias(relationAlias, viewDef, Some(name))
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
val view = Option(metadata.tableType).collect {
case CatalogTableType.VIEW => name
}
SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view)()
SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view)
} else {
SubqueryAlias(relationAlias, tempTables(table), Option(name))(isGeneratedByTempTable = true)
SubqueryAlias(relationAlias, tempTables(table), Option(name))
}
}
}
Expand Down Expand Up @@ -926,7 +926,7 @@ class SessionCatalog(
/**
* Returns whether it is a temporary function.
*/
def isTempFunction(name: FunctionIdentifier): Boolean = {
def isTemporaryFunction(name: FunctionIdentifier): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a unit test for this function?

Copy link
Contributor

@rxin rxin Nov 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also what's the behavior if the function doesn't exist? make sure you test it in the unit test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will resolve this tomorrow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like isTemporaryTable, we return false when the function/table does not exist

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea please docuemnt it.

// copied from HiveSessionCatalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd update HiveSessionCatalog to say don't forget to update this place. Otherwise it will be inconsistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Thanks!

val hiveFunctions = Seq(
"hash",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ package object dsl {
orderSpec: Seq[SortOrder]): LogicalPlan =
Window(windowExpressions, partitionSpec, orderSpec, logicalPlan)

def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None)()
def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None)

def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan)

Expand All @@ -371,7 +371,7 @@ package object dsl {

def as(alias: String): LogicalPlan = logicalPlan match {
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
case plan => SubqueryAlias(alias, plan, None)()
case plan => SubqueryAlias(alias, plan, None)
}

def repartition(num: Integer): LogicalPlan =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
case Project(projList, _) =>
subqueryRoot = Project(projList ++ havingInputs, subqueryRoot)
case s @ SubqueryAlias(alias, _, None) =>
subqueryRoot = SubqueryAlias(alias, subqueryRoot, None)()
subqueryRoot = SubqueryAlias(alias, subqueryRoot, None)
case op => sys.error(s"Unexpected operator $op in corelated subquery")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
* This is only used for Common Table Expressions.
*/
override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) {
SubqueryAlias(ctx.name.getText, plan(ctx.query), None)()
SubqueryAlias(ctx.name.getText, plan(ctx.query), None)
}

/**
Expand Down Expand Up @@ -729,7 +729,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
* Create an alias (SubqueryAlias) for a LogicalPlan.
*/
private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = {
SubqueryAlias(alias.getText, plan, None)()
SubqueryAlias(alias.getText, plan, None)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,13 +709,10 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
case class SubqueryAlias(
alias: String,
child: LogicalPlan,
view: Option[TableIdentifier])(
val isGeneratedByTempTable: java.lang.Boolean = false)
view: Option[TableIdentifier])
extends UnaryNode {

override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))

override protected def otherCopyArgs: Seq[AnyRef] = isGeneratedByTempTable :: Nil
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers {
val query =
Project(Seq($"x.key", $"y.key"),
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)()),
Project(Seq($"y.key"), SubqueryAlias("y", input, None)()),
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
Project(Seq($"y.key"), SubqueryAlias("y", input, None)),
Cross, None))

assertAnalysisSuccess(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,24 +406,24 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)())
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None))
// Otherwise, we'll first look up a temporary table with the same name
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1")))())
== SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
// Then, if that does not exist, look up the relation in the current database
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)())
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None))
}

test("lookup table relation with alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val alias = "monster"
val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None)()
val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None)
val relationWithAlias =
SubqueryAlias(alias,
SimpleCatalogRelation("db2", tableMetadata), None)()
SimpleCatalogRelation("db2", tableMetadata), None)
assert(catalog.lookupRelation(
TableIdentifier("tbl1", Some("db2")), alias = None) == relation)
assert(catalog.lookupRelation(
Expand All @@ -435,7 +435,7 @@ class SessionCatalogSuite extends SparkFunSuite {
val tmpView = Range(1, 10, 2, 10)
catalog.createTempView("vw1", tmpView, overrideIfExists = false)
val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1")))())
assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
}

test("table exists") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,16 @@ class ColumnPruningSuite extends PlanTest {
val query =
Project(Seq($"x.key", $"y.key"),
Join(
SubqueryAlias("x", input, None)(),
BroadcastHint(SubqueryAlias("y", input, None)()), Inner, None)).analyze
SubqueryAlias("x", input, None),
BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze

val optimized = Optimize.execute(query)

val expected =
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)()),
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
BroadcastHint(
Project(Seq($"y.key"), SubqueryAlias("y", input, None)())),
Project(Seq($"y.key"), SubqueryAlias("y", input, None))),
Inner, None).analyze

comparePlans(optimized, expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper {

test("eliminate top level subquery") {
val input = LocalRelation('a.int, 'b.int)
val query = SubqueryAlias("a", input, None)()
val query = SubqueryAlias("a", input, None)
comparePlans(afterOptimization(query), input)
}

test("eliminate mid-tree subquery") {
val input = LocalRelation('a.int, 'b.int)
val query = Filter(TrueLiteral, SubqueryAlias("a", input, None)())
val query = Filter(TrueLiteral, SubqueryAlias("a", input, None))
comparePlans(
afterOptimization(query),
Filter(TrueLiteral, LocalRelation('a.int, 'b.int)))
Expand All @@ -61,7 +61,7 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper {
test("eliminate multiple subqueries") {
val input = LocalRelation('a.int, 'b.int)
val query = Filter(TrueLiteral,
SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None)(), None)(), None)())
SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None), None), None))
comparePlans(
afterOptimization(query),
Filter(TrueLiteral, LocalRelation('a.int, 'b.int)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ class JoinOptimizationSuite extends PlanTest {
val query =
Project(Seq($"x.key", $"y.key"),
Join(
SubqueryAlias("x", input, None)(),
BroadcastHint(SubqueryAlias("y", input, None)()), Cross, None)).analyze
SubqueryAlias("x", input, None),
BroadcastHint(SubqueryAlias("y", input, None)), Cross, None)).analyze

val optimized = Optimize.execute(query)

val expected =
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)()),
BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None)())),
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None))),
Cross, None).analyze

comparePlans(optimized, expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class PlanParserSuite extends PlanTest {
def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = {
val ctes = namedPlans.map {
case (name, cte) =>
name -> SubqueryAlias(name, cte, None)()
name -> SubqueryAlias(name, cte, None)
}
With(plan, ctes)
}
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def as(alias: String): Dataset[T] = withTypedPlan {
SubqueryAlias(alias, logicalPlan, None)()
SubqueryAlias(alias, logicalPlan, None)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class SQLBuilder private (
val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map {
case (attr, name) => Alias(attr.withQualifier(None), name)()
}
val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None)())
val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None))

try {
val replaced = finalPlan.transformAllExpressions {
Expand Down Expand Up @@ -563,7 +563,7 @@ class SQLBuilder private (
}

private def addSubquery(plan: LogicalPlan): SubqueryAlias = {
SubqueryAlias(newSubqueryName(), plan, None)()
SubqueryAlias(newSubqueryName(), plan, None)
}

private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package org.apache.spark.sql.execution.command

import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.QueryPlan
Expand Down Expand Up @@ -133,18 +132,17 @@ case class CreateViewCommand(

// When creating a permanent view, not allowed to reference temporary objects.
if (!isTemporary) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also i'd move the check into a function, so it is more obvious what's going on with the main workflow.

// Disallow creating permanent views based on temporary views.
analyzedPlan.collectFirst {
case s: SubqueryAlias if s.isGeneratedByTempTable =>
child.collect {
// Disallow creating permanent views based on temporary views.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd also copy paste what you put in the pr description in here, on why you are traversing the unresolved plan.

case s: UnresolvedRelation
if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) =>
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temp view `${s.alias}`. " +
s"referencing a temp view ${s.tableIdentifier}. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temp -> temporary

originalText.map(sql => s"""SQL: "$sql".""").getOrElse(""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to show the entire sql? it's basically what the user just typed in

}

// Disallow creating permanent views based on temporary UDFs.
child.collect {
case other if !other.resolved => other.expressions.flatMap(_.collect {
case e: UnresolvedFunction if sparkSession.sessionState.catalog.isTempFunction(e.name) =>
// Disallow creating permanent views based on temporary UDFs.
case e: UnresolvedFunction
if sparkSession.sessionState.catalog.isTemporaryFunction(e.name) =>
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temp function `${e.name}`. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temp -> temporary

originalText.map(sql => s"""SQL: "$sql".""").getOrElse(""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here do we really need to show the entire sql? it's basically what the user just typed in

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
s"${u.tableIdentifier.database.get}")
}
val plan = LogicalRelation(dataSource.resolveRelation())
u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)()).getOrElse(plan)
u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)).getOrElse(plan)
} catch {
case e: ClassNotFoundException => u
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log

if (DDLUtils.isDatasourceTable(table)) {
val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)()
val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
alias.map(a => SubqueryAlias(a, qualifiedTable, None)()).getOrElse(qualifiedTable)
alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
} else if (table.tableType == CatalogTableType.VIEW) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
SubqueryAlias(
alias.getOrElse(table.identifier.table),
sparkSession.sessionState.sqlParser.parsePlan(viewText),
Option(table.identifier))()
Option(table.identifier))
} else {
val qualifiedTable =
MetastoreRelation(
qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession)
alias.map(a => SubqueryAlias(a, qualifiedTable, None)()).getOrElse(qualifiedTable)
alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
}
}

Expand Down Expand Up @@ -312,7 +312,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// Read path
case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
val parquetRelation = convertToParquetRelation(relation)
SubqueryAlias(relation.tableName, parquetRelation, None)()
SubqueryAlias(relation.tableName, parquetRelation, None)
}
}
}
Expand Down Expand Up @@ -350,7 +350,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// Read path
case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
val orcRelation = convertToOrcRelation(relation)
SubqueryAlias(relation.tableName, orcRelation, None)()
SubqueryAlias(relation.tableName, orcRelation, None)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ private[sql] class HiveSessionCatalog(
if (db == globalTempViewManager.database) {
val relationAlias = alias.getOrElse(table)
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))(isGeneratedByTempTable = true)
SubqueryAlias(relationAlias, viewDef, Some(name))
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val database = name.database.map(formatDatabaseName)
val newName = name.copy(database = database, table = table)
metastoreCatalog.lookupRelation(newName, alias)
} else {
val relation = tempTables(table)
val tableWithQualifiers = SubqueryAlias(table, relation, None)(isGeneratedByTempTable = true)
val tableWithQualifiers = SubqueryAlias(table, relation, None)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)()).getOrElse(tableWithQualifiers)
alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
}
}

Expand Down Expand Up @@ -232,6 +232,7 @@ private[sql] class HiveSessionCatalog(
// current_user, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or, field,
// in_file, index, matchpath, ngrams, noop, noopstreaming, noopwithmap,
// noopwithmapstreaming, parse_url_tuple, reflect2, windowingtablefunction.
// Note: don't forget to update SessionCatalog.isTemporaryFunction
private val hiveFunctions = Seq(
"hash",
"histogram_numeric",
Expand Down
Loading