Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Static partition should also follow StoreAssignmentPolicy when insert…
… into v2 tables
  • Loading branch information
cloud-fan committed Mar 3, 2021
commit 6741c3e1b8fd99739f48ee4cda919f1e3eca51d0
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,11 @@ class Analyzer(override val catalogManager: CatalogManager)
relation.output.flatMap { col =>
outputNameToStaticName.get(col.name).flatMap(staticPartitions.get) match {
case Some(staticValue) =>
Some(Alias(Cast(Literal(staticValue), col.dataType), col.name)())
// SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition
// values but not completely follow because we can't do static type checking due to
// the reason that the parser has erased the type info of static partition values
// and converted them to string.
Some(Alias(AnsiCast(Literal(staticValue), col.dataType), col.name)())
case _ if queryColumns.hasNext =>
Some(queryColumns.next)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.unsafe.types.UTF8String

/**
* The base trait for DML - insert syntax
* The base trait for SQL INSERT.
*/
trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {

Expand Down Expand Up @@ -278,6 +278,33 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-30844: static partition should also follow StoreAssignmentPolicy") {
val testingPolicies = if (format == "foo") {
// DS v2 doesn't support the legacy policy
Seq(SQLConf.StoreAssignmentPolicy.ANSI, SQLConf.StoreAssignmentPolicy.STRICT)
} else {
SQLConf.StoreAssignmentPolicy.values
}
testingPolicies.foreach { policy =>
withSQLConf(
SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) {
withTable("t") {
sql("create table t(a int, b string) using parquet partitioned by (a)")
policy match {
case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT =>
val errorMsg = intercept[NumberFormatException] {
sql("insert into t partition(a='ansi') values('ansi')")
}.getMessage
assert(errorMsg.contains("invalid input syntax for type numeric: ansi"))
case SQLConf.StoreAssignmentPolicy.LEGACY =>
sql("insert into t partition(a='ansi') values('ansi')")
checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil)
}
}
}
}
}
}

class FileSourceSQLInsertTestSuite extends SQLInsertTestSuite with SharedSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,27 +797,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
}

test("SPARK-30844: static partition should also follow StoreAssignmentPolicy") {
SQLConf.StoreAssignmentPolicy.values.foreach { policy =>
withSQLConf(
SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) {
withTable("t") {
sql("create table t(a int, b string) using parquet partitioned by (a)")
policy match {
case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT =>
val errorMsg = intercept[NumberFormatException] {
sql("insert into t partition(a='ansi') values('ansi')")
}.getMessage
assert(errorMsg.contains("invalid input syntax for type numeric: ansi"))
case SQLConf.StoreAssignmentPolicy.LEGACY =>
sql("insert into t partition(a='ansi') values('ansi')")
checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil)
}
}
}
}
}

test("SPARK-24860: dynamic partition overwrite specified per source without catalog table") {
withTempPath { path =>
Seq((1, 1), (2, 2)).toDF("i", "part")
Expand Down