Skip to content

Commit 5edd29d

Browse files
Spark: Fix Alignment of Merge Commands with Mixed Case (#4848) (#4874)
* Spark: Fix Alignment of Merge Commands with Mixed Case Prior to this a mixed-case insert statement would fail to be marked as aligned after our alignment rule was applied. This would then fail the entire MERGE INTO command. The commands were correctly aligned but our alignment check was always case sensitive.
1 parent 0168ec3 commit 5edd29d

File tree

4 files changed

+74
-1
lines changed

4 files changed

+74
-1
lines changed

spark/v3.2/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iceberg.spark.extensions
2121

2222
import org.apache.spark.sql.SparkSessionExtensions
23+
import org.apache.spark.sql.catalyst.analysis.AlignedRowLevelIcebergCommandCheck
2324
import org.apache.spark.sql.catalyst.analysis.AlignRowLevelCommandAssignments
2425
import org.apache.spark.sql.catalyst.analysis.CheckMergeIntoTableConditions
2526
import org.apache.spark.sql.catalyst.analysis.MergeIntoIcebergTableResolutionCheck
@@ -55,6 +56,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
5556
extensions.injectResolutionRule { _ => RewriteUpdateTable }
5657
extensions.injectResolutionRule { _ => RewriteMergeIntoTable }
5758
extensions.injectCheckRule { _ => MergeIntoIcebergTableResolutionCheck }
59+
extensions.injectCheckRule { _ => AlignedRowLevelIcebergCommandCheck }
5860

5961
// optimizer extensions
6062
extensions.injectOptimizerRule { _ => ExtendedSimplifyConditionalsInPredicate }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.catalyst.analysis
21+
22+
import org.apache.spark.sql.AnalysisException
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.plans.logical.MergeIntoIcebergTable
25+
import org.apache.spark.sql.catalyst.plans.logical.UpdateIcebergTable
26+
27+
object AlignedRowLevelIcebergCommandCheck extends (LogicalPlan => Unit) {
28+
29+
override def apply(plan: LogicalPlan): Unit = {
30+
plan foreach {
31+
case m: MergeIntoIcebergTable if !m.aligned =>
32+
throw new AnalysisException(s"Could not align Iceberg MERGE INTO: $m")
33+
case u: UpdateIcebergTable if !u.aligned =>
34+
throw new AnalysisException(s"Could not align Iceberg UPDATE: $u")
35+
case _ => // OK
36+
}
37+
}
38+
}

spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/expressions/AssignmentUtils.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ object AssignmentUtils extends SQLConfHelper {
4040
sameSize && table.output.zip(assignments).forall { case (attr, assignment) =>
4141
val key = assignment.key
4242
val value = assignment.value
43-
toAssignmentRef(attr) == toAssignmentRef(key) &&
43+
val refsEqual = toAssignmentRef(attr).zip(toAssignmentRef(key))
44+
.forall{ case (attrRef, keyRef) => conf.resolver(attrRef, keyRef)}
45+
46+
refsEqual &&
4447
DataType.equalsIgnoreCompatibleNullability(value.dataType, attr.dataType) &&
4548
(attr.nullable || !value.nullable)
4649
}

spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,36 @@ public void testMergeAlignsUpdateAndInsertActions() {
11891189
sql("SELECT * FROM %s ORDER BY id", tableName));
11901190
}
11911191

1192+
@Test
1193+
public void testMergeMixedCaseAlignsUpdateAndInsertActions() {
1194+
createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }");
1195+
createOrReplaceView(
1196+
"source",
1197+
"{ \"id\": 1, \"c1\": -2, \"c2\": \"new_str_1\" }\n" +
1198+
"{ \"id\": 2, \"c1\": -20, \"c2\": \"new_str_2\" }");
1199+
1200+
sql("MERGE INTO %s t USING source " +
1201+
"ON t.iD == source.Id " +
1202+
"WHEN MATCHED THEN " +
1203+
" UPDATE SET B = c2, A = c1, t.Id = source.ID " +
1204+
"WHEN NOT MATCHED THEN " +
1205+
" INSERT (b, A, iD) VALUES (c2, c1, id)", tableName);
1206+
1207+
assertEquals(
1208+
"Output should match",
1209+
ImmutableList.of(row(1, -2, "new_str_1"), row(2, -20, "new_str_2")),
1210+
sql("SELECT * FROM %s ORDER BY id", tableName));
1211+
1212+
assertEquals(
1213+
"Output should match",
1214+
ImmutableList.of(row(1, -2, "new_str_1")),
1215+
sql("SELECT * FROM %s WHERE id = 1 ORDER BY id", tableName));
1216+
assertEquals(
1217+
"Output should match",
1218+
ImmutableList.of(row(2, -20, "new_str_2")),
1219+
sql("SELECT * FROM %s WHERE b = 'new_str_2'ORDER BY id", tableName));
1220+
}
1221+
11921222
@Test
11931223
public void testMergeUpdatesNestedStructFields() {
11941224
createAndInitTable("id INT, s STRUCT<c1:INT,c2:STRUCT<a:ARRAY<INT>,m:MAP<STRING, STRING>>>",

0 commit comments

Comments
 (0)