-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18544] [SQL] Append with df.saveAsTable writes data to wrong location #15983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| ) | ||
| df.sparkSession.sessionState.executePlan( | ||
| CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd | ||
| if (tableDesc.partitionColumnNames.nonEmpty && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is unnecessary since the create / insert commands already recover partitions themselves. We'll see if any tests fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed when we create the data source table.
|
cc @yhuai @cloud-fan |
|
Test build #69039 has finished for PR 15983 at commit
|
| case _ => | ||
| val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) | ||
| val tableType = if (storage.locationUri.isDefined) { | ||
| val existingTable = if (tableExists) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we move this logic to CreateDataSourceTableAsSelectCommand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, I think it's fine here
|
Test build #69045 has finished for PR 15983 at commit
|
|
retest this please |
|
Test build #69223 has started for PR 15983 at commit |
|
retest this please |
|
Test build #69281 has finished for PR 15983 at commit
|
|
Merging in master/branch-2.1. |
…cation ## What changes were proposed in this pull request? We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location. ## How was this patch tested? Unit test that fails before the patch. Author: Eric Liang <[email protected]> Closes #15983 from ericl/spark-18544. (cherry picked from commit e2318ed) Signed-off-by: Reynold Xin <[email protected]>
…cation ## What changes were proposed in this pull request? We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location. ## How was this patch tested? Unit test that fails before the patch. Author: Eric Liang <[email protected]> Closes apache#15983 from ericl/spark-18544.
| withTempDir { dir => | ||
| setupPartitionedDatasourceTable("test", dir) | ||
| if (enabled) { | ||
| spark.sql("msck repair table test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If HIVE_MANAGE_FILESOURCE_PARTITIONS is on, we have to repair the table; otherwise the table is empty to the external users. This looks weird to me. It is also inconsistent with the behavior when HIVE_MANAGE_FILESOURCE_PARTITIONS is off. I think we should repair the table after we create the table. Let me submit a PR and cc you then.
|
It's the hive behavior to not repair the table. Otherwise, create table can
have an unbounded cost if there are many partitions.
…On Sat, Dec 17, 2016, 5:12 PM Xiao Li ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
<#15983 (review)>:
> @@ -188,6 +188,25 @@ class PartitionProviderCompatibilitySuite
}
}
+ for (enabled <- Seq(true, false)) {
+ test(s"SPARK-18544 append with saveAsTable - partition management $enabled") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedDatasourceTable("test", dir)
+ if (enabled) {
+ spark.sql("msck repair table test")
If HIVE_MANAGE_FILESOURCE_PARTITIONS is on, we have to repair the table;
otherwise the table is empty to the external users. This looks weird to me.
It is also inconsistent with the behavior when
HIVE_MANAGE_FILESOURCE_PARTITIONS is off. I think we should repair the
table after we create the table. Let me submit a PR and cc you then.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#15983 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAA6Sq9Ee_qTVz9aI85PcIu2z0MEoJKDks5rJIhrgaJpZM4K6Hh0>
.
|
|
Yeah, table repair is expensive, but this causes an external behavior change. I tried it in 2.0. It can show the whole data source table without repairing the table. In 2.1, it returns empty unless we repair the table. scala> spark.range(5).selectExpr("id as fieldOne", "id as partCol").write.partitionBy("partCol").mode("overwrite").saveAsTable("test")
[Stage 0:======================> (3 + 5) / 8]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
16/12/17 17:41:20 WARN CreateDataSourceTableUtils: Persisting partitioned data source relation `test` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. Input path(s):
file:/Users/xiaoli/sparkBin/spark-2.0.2-bin-hadoop2.7/bin/spark-warehouse/test
scala> spark.sql("select * from test").show()
+--------+-------+
|fieldOne|partCol|
+--------+-------+
| 2| 2|
| 1| 1|
| 3| 3|
| 0| 0|
| 4| 4|
+--------+-------+
scala> spark.sql("desc formatted test").show(50, false)
+----------------------------+------------------------------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+------------------------------------------------------------------------------+-------+
...
| path |file:/Users/xiaoli/sparkBin/spark-2.0.2-bin-hadoop2.7/bin/spark-warehouse/test| |
+----------------------------+------------------------------------------------------------------------------+-------+
scala> spark.sql(s"create table newTab (fieldOne long, partCol int) using parquet options (path '/Users/xiaoli/sparkBin/spark-2.0.2-bin-hadoop2.7/bin/spark-warehouse/test') partitioned by (partCol)")
16/12/17 17:43:24 WARN CreateDataSourceTableUtils: Persisting partitioned data source relation `newTab` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. Input path(s):
file:/Users/xiaoli/sparkBin/spark-2.0.2-bin-hadoop2.7/bin/spark-warehouse/test
res3: org.apache.spark.sql.DataFrame = []
scala> spark.table("newTab").show()
+--------+-------+
|fieldOne|partCol|
+--------+-------+
| 2| 2|
| 1| 1|
| 3| 3|
| 0| 0|
| 4| 4|
+--------+-------+ |
|
Regarding the concern of the repair cost, I think we still face the same issue. Each time when we append an extra row, we also repair the table, right? That is still expensive. Update: when we insert the table, we do not repair the table. We repair the table only in CTAS of partitioned data source tables. |
|
@gatorsmile I think this behavior change is necessary for the new "Scalable Partition Handling" feature. BTW, the table created by Spark 2.0 can still be read by Spark 2.1 without |
|
First, if this behavior change is required, we need to document it. I think this is not clear to external users when they do not realize the underlying change since Second, let us discuss the existing user interface when they creating data source tables.
It is pretty complex to document/rememeber the above behaviors.
BTW, will try the behaviors for CTAS and update the above bullets. |
|
If we do not repair the table when we creating such a table, we are facing another behavior inconsistency between test("append with saveAsTable + partition management") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable("test", dir)
spark.range(2).selectExpr("id as fieldOne", "id as partCol")
.write.partitionBy("partCol").mode("append").saveAsTable("test")
// all the partitions are visible
assert(spark.sql("select * from test").count() == 7)
}
}
}
}
test("insertInto + partition management") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable("test", dir)
spark.range(2).selectExpr("id as fieldOne", "id as partCol")
.write.insertInto("test")
// only the involved partitions are visible
assert(spark.sql("select * from test").count() == 4)
}
}
}
} |
I'd like to hide the
As hive can't append a table with CTAS, we can define our own semantic here. InsertInto only update the involved partitions, I think it makes more sense to follow it for CTAS, which will be done in #15996 |
|
I see the plan, but the behavior difference will still be affected by the value of I might need more time to chew over it to find out the potential impacts. |
…cation ## What changes were proposed in this pull request? We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location. ## How was this patch tested? Unit test that fails before the patch. Author: Eric Liang <[email protected]> Closes apache#15983 from ericl/spark-18544.
What changes were proposed in this pull request?
We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location.
How was this patch tested?
Unit test that fails before the patch.