Skip to content

Commit 10b6714

Browse files
lianchengyhuai
authored andcommitted
[SPARK-16033][SQL] insertInto() can't be used together with partitionBy()
## What changes were proposed in this pull request? When inserting into an existing partitioned table, partitioning columns should always be determined by catalog metadata of the existing table to be inserted. Extra `partitionBy()` calls don't make sense, and mess up existing data because newly inserted data may have wrong partitioning directory layout. ## How was this patch tested? New test case added in `InsertIntoHiveTableSuite`. Author: Cheng Lian <lian@databricks.com> Closes apache#13747 from liancheng/spark-16033-insert-into-without-partition-by.
1 parent ebb9a3b commit 10b6714

File tree

2 files changed

+46
-3
lines changed

2 files changed

+46
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import java.util.Properties
2121

2222
import scala.collection.JavaConverters._
2323

24-
import org.apache.hadoop.fs.Path
25-
2624
import org.apache.spark.sql.catalyst.TableIdentifier
2725
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2826
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
@@ -243,7 +241,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
243241

244242
private def insertInto(tableIdent: TableIdentifier): Unit = {
245243
assertNotBucketed("insertInto")
246-
val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap)
244+
245+
if (partitioningColumns.isDefined) {
246+
throw new AnalysisException(
247+
"insertInto() can't be used together with partitionBy(). " +
248+
"Partition columns are defined by the table into which is being inserted."
249+
)
250+
}
251+
252+
val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap)
247253
val overwrite = mode == SaveMode.Overwrite
248254

249255
// A partitioned relation's schema can be different from the input logicalPlan, since

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,43 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
346346
}
347347
}
348348

349+
private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = {
350+
test(s"Hive SerDe table - $testName") {
351+
val hiveTable = "hive_table"
352+
353+
withTable(hiveTable) {
354+
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
355+
sql(s"CREATE TABLE $hiveTable (a INT) PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE")
356+
f(hiveTable)
357+
}
358+
}
359+
}
360+
}
361+
362+
private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = {
363+
test(s"Data source table - $testName") {
364+
val dsTable = "ds_table"
365+
366+
withTable(dsTable) {
367+
sql(s"CREATE TABLE $dsTable (a INT, b INT, c INT) USING PARQUET PARTITIONED BY (b, c)")
368+
f(dsTable)
369+
}
370+
}
371+
}
372+
373+
private def testPartitionedTable(testName: String)(f: String => Unit): Unit = {
374+
testPartitionedHiveSerDeTable(testName)(f)
375+
testPartitionedDataSourceTable(testName)(f)
376+
}
377+
378+
testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
379+
val cause = intercept[AnalysisException] {
380+
Seq((1, 2, 3)).toDF("a", "b", "c").write.partitionBy("b", "c").insertInto(tableName)
381+
}
382+
383+
assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
384+
}
385+
349386
test("InsertIntoTable#resolved should include dynamic partitions") {
350387
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
351388
sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")

0 commit comments

Comments
 (0)