Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -104,7 +105,17 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
None
} else if (potentialSpecs.size == 1) {
val partValue = potentialSpecs.head._2
Some(Alias(cast(Literal(partValue), field.dataType), field.name)())
conf.storeAssignmentPolicy match {
// SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition
// values but not completely follow because we can't do static type checking due to
// the reason that the parser has erased the type info of static partition values
// and converted them to string.
case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
Some(Alias(AnsiCast(Literal(partValue), field.dataType,
Option(conf.sessionLocalTimeZone)), field.name)())
case _ =>
Some(Alias(cast(Literal(partValue), field.dataType), field.name)())
}
} else {
throw new AnalysisException(
s"Partition column ${field.name} have multiple values specified, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.execution.datasources.DataSourceAnalysis
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.{DataType, IntegerType, StructType}

class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -52,7 +53,12 @@ class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll {
Seq(true, false).foreach { caseSensitive =>
val conf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive)
def cast(e: Expression, dt: DataType): Expression = {
Cast(e, dt, Option(conf.sessionLocalTimeZone))
conf.storeAssignmentPolicy match {
case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
AnsiCast(e, dt, Option(conf.sessionLocalTimeZone))
case _ =>
Cast(e, dt, Option(conf.sessionLocalTimeZone))
}
}
val rule = DataSourceAnalysis(conf)
test(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,27 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
}

test("SPARK-30844: static partition should also follow StoreAssignmentPolicy") {
SQLConf.StoreAssignmentPolicy.values.foreach { policy =>
withSQLConf(
SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) {
withTable("t") {
sql("create table t(a int, b string) using parquet partitioned by (a)")
policy match {
case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT =>
val errorMsg = intercept[NumberFormatException] {
sql("insert into t partition(a='ansi') values('ansi')")
}.getMessage
assert(errorMsg.contains("invalid input syntax for type numeric: ansi"))
case SQLConf.StoreAssignmentPolicy.LEGACY =>
sql("insert into t partition(a='ansi') values('ansi')")
checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil)
}
}
}
}
}

test("SPARK-24860: dynamic partition overwrite specified per source without catalog table") {
withTempPath { path =>
Seq((1, 1), (2, 2)).toDF("i", "part")
Expand Down