diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5d41c07b4784..ba7c5ff6d66a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3661,20 +3661,23 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor object ResolveOutputRelation extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(COMMAND), ruleId) { - case v2Write: V2WriteCommand - if v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved => + case v2Write: V2WriteCommand => validateStoreAssignmentPolicy() - TableOutputResolver.suitableForByNameCheck(v2Write.isByName, - expected = v2Write.table.output, queryOutput = v2Write.query.output) - val projection = TableOutputResolver.resolveOutputColumns( - v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf) - if (projection != v2Write.query) { - val cleanedTable = v2Write.table match { - case r: DataSourceV2Relation => - r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata)) - case other => other + if (v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved) { + TableOutputResolver.suitableForByNameCheck(v2Write.isByName, + expected = v2Write.table.output, queryOutput = v2Write.query.output) + val projection = TableOutputResolver.resolveOutputColumns( + v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf) + if (projection != v2Write.query) { + val cleanedTable = v2Write.table match { + case r: DataSourceV2Relation => + r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata)) + case other => other + } + v2Write.withNewQuery(projection).withNewTable(cleanedTable) + } else { + v2Write } - v2Write.withNewQuery(projection).withNewTable(cleanedTable) } else { v2Write } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index b7ac6af22a20..4598b3619e8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -839,4 +839,18 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo condition = "CALL_ON_STREAMING_DATASET_UNSUPPORTED", parameters = Map("methodName" -> "`writeTo`")) } + + test("SPARK-49975: write should fail with legacy store assignment policy") { + withSQLConf((SQLConf.STORE_ASSIGNMENT_POLICY.key, "LEGACY")) { + checkError( + exception = intercept[AnalysisException] { + spark.range(10) + .writeTo("table_name") + .append() + }, + condition = "_LEGACY_ERROR_TEMP_1000", + parameters = Map("configKey" -> "spark.sql.storeAssignmentPolicy") + ) + } + } }