Skip to content

Commit 86d310b

Browse files
Chris Boumalhabcboumalh
authored andcommitted
[SPARK-53559][SQL][CATALYST] Fix HLL sketch updates to use raw collation key bytes
### What changes were proposed in this pull request? - Extract the input UTF8String. - Ignore strings that are collation equal to the empty string when updating the sketch. Before: ``` val cKey = CollationFactory.getCollationKey(v.asInstanceOf[UTF8String], st.collationId) sketch.update(cKey.toString) ``` After: ``` val collation = CollationFactory.fetchCollation(st.collationId) val str = v.asInstanceOf[UTF8String] if (!collation.equalsFunction(str, UTF8String.EMPTY_UTF8)) { sketch.update(collation.sortKeyFunction.apply(str)) } ```` ### Why are the changes needed? As discussed in #51298 (comment). Collation keys are arbitrary byte sequences, not guaranteed to be valid UTF-8. Converting them to a Java String replaces invalid UTF-8 bytes with U+FFFD (the replacement character). This can collapse distinct strings into identical values, causing the sketch to treat different strings as the same. Also, string collations must be considered when updating a sketch. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Build repo and test suites ### Was this patch authored or co-authored using generative AI tooling? No ### Jira https://issues.apache.org/jira/browse/SPARK-53559 Closes #52316 from cboumalh/SPARK-53559_refactor_hll_sketch_update. Lead-authored-by: Chris Boumalhab <[email protected]> Co-authored-by: Chris Boumalhab <[email protected]> Signed-off-by: Daniel Tenedorio <[email protected]>
1 parent f03c644 commit 86d310b

File tree

4 files changed

+305
-2
lines changed

4 files changed

+305
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ case class HllSketchAgg(
130130
* Evaluate the input row and update the HllSketch instance with the row's value. The update
131131
* function only supports a subset of Spark SQL types, and an exception will be thrown for
132132
* unsupported types.
133+
* Notes:
134+
* - Null values are ignored.
135+
* - Empty byte arrays are ignored.
136+
* - Strings that are collation-equal to the empty string are ignored.
133137
*
134138
* @param sketch The HllSketch instance.
135139
* @param input an input row
@@ -146,8 +150,11 @@ case class HllSketchAgg(
146150
case IntegerType => sketch.update(v.asInstanceOf[Int])
147151
case LongType => sketch.update(v.asInstanceOf[Long])
148152
case st: StringType =>
149-
val cKey = CollationFactory.getCollationKey(v.asInstanceOf[UTF8String], st.collationId)
150-
sketch.update(cKey.toString)
153+
val collation = CollationFactory.fetchCollation(st.collationId)
154+
val str = v.asInstanceOf[UTF8String]
155+
if (!collation.equalsFunction(str, UTF8String.EMPTY_UTF8)) {
156+
sketch.update(collation.sortKeyFunction.apply(str))
157+
}
151158
case BinaryType => sketch.update(v.asInstanceOf[Array[Byte]])
152159
case dataType => throw new SparkUnsupportedOperationException(
153160
errorClass = "_LEGACY_ERROR_TEMP_3121",

sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ DropTable true, false
66
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t1
77

88

9+
-- !query
10+
DROP TABLE IF EXISTS hll_binary_test
11+
-- !query analysis
12+
DropTable true, false
13+
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.hll_binary_test
14+
15+
16+
-- !query
17+
DROP TABLE IF EXISTS hll_string_test
18+
-- !query analysis
19+
DropTable true, false
20+
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.hll_string_test
21+
22+
923
-- !query
1024
CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col)
1125
-- !query analysis
@@ -14,6 +28,34 @@ CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`t1`, ErrorIfExis
1428
+- LocalRelation [col#x]
1529

1630

31+
-- !query
32+
CREATE TABLE hll_binary_test (bytes BINARY) USING PARQUET
33+
-- !query analysis
34+
CreateDataSourceTableCommand `spark_catalog`.`default`.`hll_binary_test`, false
35+
36+
37+
-- !query
38+
CREATE TABLE hll_string_test (s STRING) USING PARQUET
39+
-- !query analysis
40+
CreateDataSourceTableCommand `spark_catalog`.`default`.`hll_string_test`, false
41+
42+
43+
-- !query
44+
INSERT INTO hll_binary_test VALUES (X''), (CAST(' ' AS BINARY)), (X'e280'), (X'c1'), (X'c120')
45+
-- !query analysis
46+
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/hll_binary_test, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/hll_binary_test], Append, `spark_catalog`.`default`.`hll_binary_test`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/hll_binary_test), [bytes]
47+
+- Project [col1#x AS bytes#x]
48+
+- LocalRelation [col1#x]
49+
50+
51+
-- !query
52+
INSERT INTO hll_string_test VALUES (''), (' '), (CAST(X'C1' AS STRING)), (CAST(X'80' AS STRING)), ('\uFFFD'), ('Å'), ('å'), ('a\u030A'), ('Å '), ('å '), ('a\u030A ')
53+
-- !query analysis
54+
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/hll_string_test, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/hll_string_test], Append, `spark_catalog`.`default`.`hll_string_test`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/hll_string_test), [s]
55+
+- Project [col1#x AS s#x]
56+
+- LocalRelation [col1#x]
57+
58+
1759
-- !query
1860
SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1
1961
-- !query analysis
@@ -22,6 +64,78 @@ Aggregate [hll_sketch_estimate(hll_sketch_agg(col#x, 12, 0, 0)) AS result#xL]
2264
+- Relation spark_catalog.default.t1[col#x] json
2365

2466

67+
-- !query
68+
SELECT hll_sketch_estimate(hll_sketch_agg(bytes)) FROM hll_binary_test
69+
-- !query analysis
70+
Aggregate [hll_sketch_estimate(hll_sketch_agg(bytes#x, 12, 0, 0)) AS hll_sketch_estimate(hll_sketch_agg(bytes, 12))#xL]
71+
+- SubqueryAlias spark_catalog.default.hll_binary_test
72+
+- Relation spark_catalog.default.hll_binary_test[bytes#x] parquet
73+
74+
75+
-- !query
76+
SELECT hll_sketch_estimate(hll_sketch_agg(s)) utf8_b FROM hll_string_test
77+
-- !query analysis
78+
Aggregate [hll_sketch_estimate(hll_sketch_agg(s#x, 12, 0, 0)) AS utf8_b#xL]
79+
+- SubqueryAlias spark_catalog.default.hll_string_test
80+
+- Relation spark_catalog.default.hll_string_test[s#x] parquet
81+
82+
83+
-- !query
84+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UTF8_LCASE)) utf8_lc FROM hll_string_test
85+
-- !query analysis
86+
Aggregate [hll_sketch_estimate(hll_sketch_agg(collate(s#x, UTF8_LCASE), 12, 0, 0)) AS utf8_lc#xL]
87+
+- SubqueryAlias spark_catalog.default.hll_string_test
88+
+- Relation spark_catalog.default.hll_string_test[s#x] parquet
89+
90+
91+
-- !query
92+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE)) unicode FROM hll_string_test
93+
-- !query analysis
94+
Aggregate [hll_sketch_estimate(hll_sketch_agg(collate(s#x, UNICODE), 12, 0, 0)) AS unicode#xL]
95+
+- SubqueryAlias spark_catalog.default.hll_string_test
96+
+- Relation spark_catalog.default.hll_string_test[s#x] parquet
97+
98+
99+
-- !query
100+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE_CI)) unicode_ci FROM hll_string_test
101+
-- !query analysis
102+
Aggregate [hll_sketch_estimate(hll_sketch_agg(collate(s#x, UNICODE_CI), 12, 0, 0)) AS unicode_ci#xL]
103+
+- SubqueryAlias spark_catalog.default.hll_string_test
104+
+- Relation spark_catalog.default.hll_string_test[s#x] parquet
105+
106+
107+
-- !query
108+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UTF8_BINARY_RTRIM)) utf8_b_rt FROM hll_string_test
109+
-- !query analysis
110+
Aggregate [hll_sketch_estimate(hll_sketch_agg(collate(s#x, UTF8_BINARY_RTRIM), 12, 0, 0)) AS utf8_b_rt#xL]
111+
+- SubqueryAlias spark_catalog.default.hll_string_test
112+
+- Relation spark_catalog.default.hll_string_test[s#x] parquet
113+
114+
115+
-- !query
116+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UTF8_LCASE_RTRIM)) utf8_lc_rt FROM hll_string_test
117+
-- !query analysis
118+
Aggregate [hll_sketch_estimate(hll_sketch_agg(collate(s#x, UTF8_LCASE_RTRIM), 12, 0, 0)) AS utf8_lc_rt#xL]
119+
+- SubqueryAlias spark_catalog.default.hll_string_test
120+
+- Relation spark_catalog.default.hll_string_test[s#x] parquet
121+
122+
123+
-- !query
124+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE_RTRIM)) unicode_rt FROM hll_string_test
125+
-- !query analysis
126+
Aggregate [hll_sketch_estimate(hll_sketch_agg(collate(s#x, UNICODE_RTRIM), 12, 0, 0)) AS unicode_rt#xL]
127+
+- SubqueryAlias spark_catalog.default.hll_string_test
128+
+- Relation spark_catalog.default.hll_string_test[s#x] parquet
129+
130+
131+
-- !query
132+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE_CI_RTRIM)) unicode_ci_rt FROM hll_string_test
133+
-- !query analysis
134+
Aggregate [hll_sketch_estimate(hll_sketch_agg(collate(s#x, UNICODE_CI_RTRIM), 12, 0, 0)) AS unicode_ci_rt#xL]
135+
+- SubqueryAlias spark_catalog.default.hll_string_test
136+
+- Relation spark_catalog.default.hll_string_test[s#x] parquet
137+
138+
25139
-- !query
26140
SELECT hll_sketch_estimate(hll_sketch_agg(col, 12))
27141
FROM VALUES (50), (60), (60), (60), (75), (100) tab(col)
@@ -213,3 +327,17 @@ DROP TABLE IF EXISTS t1
213327
-- !query analysis
214328
DropTable true, false
215329
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t1
330+
331+
332+
-- !query
333+
DROP TABLE IF EXISTS hll_binary_test
334+
-- !query analysis
335+
DropTable true, false
336+
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.hll_binary_test
337+
338+
339+
-- !query
340+
DROP TABLE IF EXISTS hll_string_test
341+
-- !query analysis
342+
DropTable true, false
343+
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.hll_string_test

sql/core/src/test/resources/sql-tests/inputs/hll.sql

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,40 @@
11
-- Positive test cases
22
-- Create a table with some testing data.
33
DROP TABLE IF EXISTS t1;
4+
DROP TABLE IF EXISTS hll_binary_test;
5+
DROP TABLE IF EXISTS hll_string_test;
6+
47
CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col);
8+
CREATE TABLE hll_binary_test (bytes BINARY) USING PARQUET;
9+
CREATE TABLE hll_string_test (s STRING) USING PARQUET;
10+
11+
INSERT INTO hll_binary_test VALUES (X''), (CAST(' ' AS BINARY)), (X'e280'), (X'c1'), (X'c120');
12+
13+
-- `\u030A` is the "combining ring above" Unicode character: https://www.compart.com/en/unicode/U+030A
14+
-- `\uFFFD is the Unicode replacement character
15+
-- `\xC1` is an invalid Unicode byte.
16+
-- `\x80` is a Unicode continuation byte, that is it cannot be the first byte of a multi-byte UTF8 character.
17+
-- All strings are different based on the UTF8_BINARY collation.
18+
-- The first and second strings are equal for any collation with the RTRIM modifier, and equal to the empty string.
19+
-- The last three strings are respectively equal to the next last three strings for any collation with the RTRIM modifier.
20+
-- The strings "\xC1", "\x80" and "\uFFFD" are equal for all collations except UTF8_BINARY.
21+
-- The (sub)strings `å` and `a\u030A` are equal for the UNICODE family of collations.
22+
-- `å` is the lowercase version of `Å`.
23+
INSERT INTO hll_string_test VALUES (''), (' '), (CAST(X'C1' AS STRING)), (CAST(X'80' AS STRING)), ('\uFFFD'), ('Å'), ('å'), ('a\u030A'), ('Å '), ('å '), ('a\u030A ');
524

625
SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1;
726

27+
SELECT hll_sketch_estimate(hll_sketch_agg(bytes)) FROM hll_binary_test;
28+
29+
SELECT hll_sketch_estimate(hll_sketch_agg(s)) utf8_b FROM hll_string_test;
30+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UTF8_LCASE)) utf8_lc FROM hll_string_test;
31+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE)) unicode FROM hll_string_test;
32+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE_CI)) unicode_ci FROM hll_string_test;
33+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UTF8_BINARY_RTRIM)) utf8_b_rt FROM hll_string_test;
34+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UTF8_LCASE_RTRIM)) utf8_lc_rt FROM hll_string_test;
35+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE_RTRIM)) unicode_rt FROM hll_string_test;
36+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE_CI_RTRIM)) unicode_ci_rt FROM hll_string_test;
37+
838
SELECT hll_sketch_estimate(hll_sketch_agg(col, 12))
939
FROM VALUES (50), (60), (60), (60), (75), (100) tab(col);
1040

@@ -74,3 +104,5 @@ FROM (SELECT CAST('abc' AS BINARY) AS buffer);
74104

75105
-- Clean up
76106
DROP TABLE IF EXISTS t1;
107+
DROP TABLE IF EXISTS hll_binary_test;
108+
DROP TABLE IF EXISTS hll_string_test;

sql/core/src/test/resources/sql-tests/results/hll.sql.out

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,22 @@ struct<>
77

88

99

10+
-- !query
11+
DROP TABLE IF EXISTS hll_binary_test
12+
-- !query schema
13+
struct<>
14+
-- !query output
15+
16+
17+
18+
-- !query
19+
DROP TABLE IF EXISTS hll_string_test
20+
-- !query schema
21+
struct<>
22+
-- !query output
23+
24+
25+
1026
-- !query
1127
CREATE TABLE t1 USING JSON AS VALUES (0), (1), (2), (2), (2), (3), (4) as tab(col)
1228
-- !query schema
@@ -15,6 +31,38 @@ struct<>
1531

1632

1733

34+
-- !query
35+
CREATE TABLE hll_binary_test (bytes BINARY) USING PARQUET
36+
-- !query schema
37+
struct<>
38+
-- !query output
39+
40+
41+
42+
-- !query
43+
CREATE TABLE hll_string_test (s STRING) USING PARQUET
44+
-- !query schema
45+
struct<>
46+
-- !query output
47+
48+
49+
50+
-- !query
51+
INSERT INTO hll_binary_test VALUES (X''), (CAST(' ' AS BINARY)), (X'e280'), (X'c1'), (X'c120')
52+
-- !query schema
53+
struct<>
54+
-- !query output
55+
56+
57+
58+
-- !query
59+
INSERT INTO hll_string_test VALUES (''), (' '), (CAST(X'C1' AS STRING)), (CAST(X'80' AS STRING)), ('\uFFFD'), ('Å'), ('å'), ('a\u030A'), ('Å '), ('å '), ('a\u030A ')
60+
-- !query schema
61+
struct<>
62+
-- !query output
63+
64+
65+
1866
-- !query
1967
SELECT hll_sketch_estimate(hll_sketch_agg(col)) AS result FROM t1
2068
-- !query schema
@@ -23,6 +71,78 @@ struct<result:bigint>
2371
5
2472

2573

74+
-- !query
75+
SELECT hll_sketch_estimate(hll_sketch_agg(bytes)) FROM hll_binary_test
76+
-- !query schema
77+
struct<hll_sketch_estimate(hll_sketch_agg(bytes, 12)):bigint>
78+
-- !query output
79+
4
80+
81+
82+
-- !query
83+
SELECT hll_sketch_estimate(hll_sketch_agg(s)) utf8_b FROM hll_string_test
84+
-- !query schema
85+
struct<utf8_b:bigint>
86+
-- !query output
87+
10
88+
89+
90+
-- !query
91+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UTF8_LCASE)) utf8_lc FROM hll_string_test
92+
-- !query schema
93+
struct<utf8_lc:bigint>
94+
-- !query output
95+
7
96+
97+
98+
-- !query
99+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE)) unicode FROM hll_string_test
100+
-- !query schema
101+
struct<unicode:bigint>
102+
-- !query output
103+
7
104+
105+
106+
-- !query
107+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE_CI)) unicode_ci FROM hll_string_test
108+
-- !query schema
109+
struct<unicode_ci:bigint>
110+
-- !query output
111+
6
112+
113+
114+
-- !query
115+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UTF8_BINARY_RTRIM)) utf8_b_rt FROM hll_string_test
116+
-- !query schema
117+
struct<utf8_b_rt:bigint>
118+
-- !query output
119+
6
120+
121+
122+
-- !query
123+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UTF8_LCASE_RTRIM)) utf8_lc_rt FROM hll_string_test
124+
-- !query schema
125+
struct<utf8_lc_rt:bigint>
126+
-- !query output
127+
3
128+
129+
130+
-- !query
131+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE_RTRIM)) unicode_rt FROM hll_string_test
132+
-- !query schema
133+
struct<unicode_rt:bigint>
134+
-- !query output
135+
3
136+
137+
138+
-- !query
139+
SELECT hll_sketch_estimate(hll_sketch_agg(s COLLATE UNICODE_CI_RTRIM)) unicode_ci_rt FROM hll_string_test
140+
-- !query schema
141+
struct<unicode_ci_rt:bigint>
142+
-- !query output
143+
2
144+
145+
26146
-- !query
27147
SELECT hll_sketch_estimate(hll_sketch_agg(col, 12))
28148
FROM VALUES (50), (60), (60), (60), (75), (100) tab(col)
@@ -267,3 +387,19 @@ DROP TABLE IF EXISTS t1
267387
struct<>
268388
-- !query output
269389

390+
391+
392+
-- !query
393+
DROP TABLE IF EXISTS hll_binary_test
394+
-- !query schema
395+
struct<>
396+
-- !query output
397+
398+
399+
400+
-- !query
401+
DROP TABLE IF EXISTS hll_string_test
402+
-- !query schema
403+
struct<>
404+
-- !query output
405+

0 commit comments

Comments
 (0)