Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1696,8 +1696,8 @@ class Analyzer(
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode if q.childrenResolved =>
resolveSubQueries(q, q.children)
case d: DeleteFromTable if d.childrenResolved =>
resolveSubQueries(d, d.children)
case s: SupportsSubquery if s.childrenResolved =>
resolveSubQueries(s, s.children)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,19 +593,19 @@ trait CheckAnalysis extends PredicateHelper {
// Only certain operators are allowed to host subquery expression containing
// outer references.
plan match {
case _: Filter | _: Aggregate | _: Project | _: DeleteFromTable => // Ok
case _: Filter | _: Aggregate | _: Project | _: SupportsSubquery => // Ok
case other => failAnalysis(
"Correlated scalar sub-queries can only be used in a " +
s"Filter/Aggregate/Project: $plan")
s"Filter/Aggregate/Project and a few commands: $plan")
}
}

case inSubqueryOrExistsSubquery =>
plan match {
case _: Filter | _: DeleteFromTable => // Ok
case _: Filter | _: SupportsSubquery => // Ok
case _ =>
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
s" Filter/DeleteFromTable: $plan")
s" Filter and a few commands: $plan")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val aliased = tableAlias.map(SubqueryAlias(_, r)).getOrElse(r)
DeleteFromTable(aliased, condition)

case update: UpdateTableStatement =>
throw new AnalysisException(s"UPDATE TABLE is not supported temporarily.")
case u @ UpdateTableStatement(
nameParts @ CatalogAndIdentifierParts(catalog, tableName), _, _, _, _) =>
val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier)
val aliased = u.tableAlias.map(SubqueryAlias(_, r)).getOrElse(r)
val columns = u.columns.map(UnresolvedAttribute(_))
UpdateTable(aliased, columns, u.values, u.condition)

case DescribeTableStatement(
nameParts @ NonSessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,16 @@ object IntegerLiteral {
}
}

/**
* Extractor for retrieving String literals.
*/
object StringLiteral {
def unapply(a: Any): Option[String] = a match {
case Literal(s: UTF8String, StringType) => Some(s.toString)
case _ => None
}
}

/**
* Extractor for and other utility methods for decimal literals.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,10 +599,17 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm
}

case class DeleteFromTable(
child: LogicalPlan,
condition: Option[Expression]) extends Command {
table: LogicalPlan,
condition: Option[Expression]) extends Command with SupportsSubquery {
override def children: Seq[LogicalPlan] = table :: Nil
}

override def children: Seq[LogicalPlan] = child :: Nil
case class UpdateTable(
table: LogicalPlan,
columns: Seq[Expression],
values: Seq[Expression],
condition: Option[Expression]) extends Command with SupportsSubquery {
override def children: Seq[LogicalPlan] = table :: Nil
}

/**
Expand Down Expand Up @@ -1241,6 +1248,12 @@ case class Deduplicate(
override def output: Seq[Attribute] = child.output
}

/**
* A trait to represent the commands that support subqueries.
* This is used to whitelist such commands in the subquery-related checks.
*/
trait SupportsSubquery extends LogicalPlan

/** A trait used for logical plan nodes that create or replace V2 table definitions. */
trait V2CreateTablePlan extends LogicalPlan {
def tableName: Identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ import org.apache.spark.sql.catalyst.expressions.Expression
case class UpdateTableStatement(
tableName: Seq[String],
tableAlias: Option[String],
attrs: Seq[Seq[String]],
columns: Seq[Seq[String]],
values: Seq[Expression],
condition: Option[Expression]) extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ class AnalysisErrorSuite extends AnalysisTest {
Seq(a, Alias(InSubquery(Seq(a), ListQuery(LocalRelation(b))), "c")()),
LocalRelation(a))
assertAnalysisError(plan, "Predicate sub-queries can only be used" +
" in Filter/DeleteFromTable" :: Nil)
" in Filter" :: Nil)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there isn't the new SupportsSubquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SupportsSubquery is an internal class and end-users don't know what it is. So I just put "a few commands", see https://github.com/apache/spark/pull/26025/files#diff-1d14ac233eac6f233c027dba0bdf871dR608

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, got it.

}

test("PredicateSubQuery is used is a nested condition") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
case _: UpdateTable =>
throw new UnsupportedOperationException(s"UPDATE TABLE is not supported temporarily.")
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,19 +938,19 @@ class DataSourceV2SQLSuite
val errorMsg = "Found duplicate column(s) in the table definition of `t`"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
errorMsg
)
Expand All @@ -962,19 +962,19 @@ class DataSourceV2SQLSuite
val errorMsg = "Found duplicate column(s) in the table definition of `t`"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
errorMsg
)
Expand All @@ -984,20 +984,20 @@ class DataSourceV2SQLSuite

test("tableCreation: bucket column names not in table definition") {
val errorMsg = "Couldn't find column c in"
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source " +
"CLUSTERED BY (c) INTO 4 BUCKETS",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source " +
"CLUSTERED BY (c) INTO 4 BUCKETS",
errorMsg
Expand All @@ -1008,19 +1008,19 @@ class DataSourceV2SQLSuite
val errorMsg = "Found duplicate column(s) in the partitioning"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
errorMsg
)
Expand All @@ -1032,22 +1032,22 @@ class DataSourceV2SQLSuite
val errorMsg = "Found duplicate column(s) in the bucket definition"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE t ($c0 INT) USING $v2Source " +
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source " +
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source " +
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
errorMsg
)
testCreateAnalysisError(
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source " +
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
errorMsg
Expand Down Expand Up @@ -1120,7 +1120,7 @@ class DataSourceV2SQLSuite
}
}

test("Update: basic - update all") {
test("UPDATE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
Expand All @@ -1129,23 +1129,29 @@ class DataSourceV2SQLSuite
|USING foo
|PARTITIONED BY (id, p)
""".stripMargin)
sql(
s"""
|INSERT INTO $t
|VALUES (1L, 'Herry', 26, 1),
|(2L, 'Jack', 31, 2),
|(3L, 'Lisa', 28, 3),
|(4L, 'Frank', 33, 3)
""".stripMargin)

// UPDATE non-existing table
assertAnalysisError(
"UPDATE dummy SET name='abc'",
"Table not found")

// UPDATE non-existing column
assertAnalysisError(
s"UPDATE $t SET dummy='abc'",
"cannot resolve")
assertAnalysisError(
s"UPDATE $t SET name='abc' WHERE dummy=1",
"cannot resolve")

// UPDATE is not implemented yet.
val e = intercept[UnsupportedOperationException] {
sql(s"UPDATE $t SET name='Robert', age=32 WHERE p=1")
}
assert(e.getMessage.contains("UPDATE TABLE is not supported temporarily"))
}
val errMsg = "UPDATE TABLE is not supported temporarily"
testCreateAnalysisError(
s"UPDATE $t SET name='Robert', age=32",
errMsg
)
}

private def testCreateAnalysisError(sqlStatement: String, expectedError: String): Unit = {
private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)
}.getMessage
Expand Down
Loading