-
Notifications
You must be signed in to change notification settings - Fork 29k
[WIP][SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce #46526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
collagtion enabled or not Factored in null count.: benchmark benchmark ready for review ready for review ready for review ready for review ready for review ready for review use collation id tests pass tidy implementation idea: tree map tests tests support mode eval test passes
…st can then be removed
…st can then be removed
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that these results are somewhat better, but still not too good
However - what's imperative right now is that we preserve the performance for UTF8_BINARY (by doing that if/else branch on supportsBinaryEquality). If we don't have a better approach at this moment, and we've already tried a couple of things - then I would say that's fine and we can proceed with the best of what we've got (which is this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Note, by the way that because we are relying on supportsBinaryEquality, this is about preserving not only the performance for UTF8_BINARY, but also that of UNICODE
- @uros-db check again. I believe the benchmarks are slightly more realistic now. In that for each string there are 3-6 that are equal by collation. EG:
collation unit benchmarks - mode - 30105 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE - mode - 30105 elements 6 6 0 5.1 195.6 1.0X
UNICODE - mode - 30105 elements 3 3 0 11.6 86.0 2.3X
UTF8_BINARY - mode - 30105 elements 3 3 0 11.6 85.9 2.3X
UNICODE_CI - mode - 30105 elements 12 12 1 2.6 382.9 0.5X
-
Still a slowdown (though there is more work, so how would we expect any different). I willl run these new benchmarks on the other approaches and assuming this one is best, we can get this one ready for final stages of cleanup and review...
-
I will leave one benchmark for mode rather then having three for different input sizes... that was just a temporary setup.
-
An idea would be for the case of UTF8_BINARY and UNICODE to go through the
loweroperation first. This would be a better way to check that, as the design doc instructs: "Performance regression for case insensitive collation must be no worse than using upper() or ilike() explicitly" . Let me know whether to change the benchmark accordingly. There will probably still be a performance degradation, but it would at least be a fairer comparison.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apropos going through lower first, we need to be careful so as not to destroy the original data
consider an example of finding the mode in ['a', 'B', 'B', 'A']. Here, correct answers would be:
- 'a'
- 'A'
- 'B'
but NOT:
'b'
because that value is not found in the original data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apropos altering the benchmark to yield better results for this particular expression, I'm not sure if that's something we should encourage - the benchmark is not perfect and should only be used for rough estimates. it's fine to consider the worst case scenario (all different elements), and I think we should look for the best approach anyways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all things considered, I would say proceed with this approach - clean everything up and get it running with your tests and all CI checks in order. then we can call in other reviewers and see where it goes
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
Outdated
Show resolved
Hide resolved
| modeMap | ||
| } else { | ||
| buff | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going back to the original issue (why Mode doesn't work already, while otherwise Aggregation generally works with collated strings in Spark), here's what I'm interested in: why does PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]] not work here automatically?
afaik, ordering for PhysicalStringType is defined correctly:
private[sql] val ordering = CollationFactory.fetchCollation(collationId).comparator.compare(_, _)
so one would naturally expect Mode to work "as is"
did you investigate this maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uros-db While the ordering for PhysicalStringType is correctly established using private[sql] val ordering = CollationFactory.fetchCollation(collationId).comparator.compare(_, _), this does not automatically resolve the issue with Mode. To illustrate, consider the example of UTF8_BINARY_LCASE where an input like Map("a" -> 3L, "b" -> 2L, "B" -> 2L) results in evaluating the maximum over the tuples (2L, "B"), (2L, "b"), (3L, "a") rather than the expected (3L, "a"), (4L, "b"). This indicates that the current approach doesn't aggregate values as required for Mode to operate correctly. Unit tests confirm that Mode otherwise won't work for such cases.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
Outdated
Show resolved
Hide resolved
…essions/aggregate/Mode.scala rename to collationAwareBuffer Co-authored-by: Uros Bojanic <[email protected]>
sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala
Outdated
Show resolved
Hide resolved
### What changes were proposed in this pull request? [SPARK-47353](https://issues.apache.org/jira/browse/SPARK-47353) #### Pull requests [Scala TreeMap (RB Tree)](#46404) [GroupMapReduce](#46526) <- Most performant [GroupMapReduce (Cleaned up) (This PR)](#46597) <- Most performant [Comparing Experimental Approaches ](#46488) https://github.com/apache/spark/pull/46597/files#r1626058908 -> #46917 (comment) #### Central Change to Mode `eval` Algorithm: - Update to `eval` Method: The `eval` method now checks if the column being looked at is string with non-default collation and if so, uses a grouping ``` buff.toSeq.groupMapReduce { case (key: String, _) => CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) case (key: UTF8String, _) => CollationFactory.getCollationKey(key, collationId) case (key, _) => key }(x => x)((x, y) => (x._1, x._2 + y._2)).values ``` #### Minor Change to Mode: - Introduction of `collationId`: A new lazy value `collationId` is computed from the `dataType` of the `child` expression, used to fetch the appropriate collation comparator when `collationEnabled` is true. This PR will fail for complex types containing collated strings Follow up PR will implement that #### Unit Test Enhancements: Significant additions to `CollationStringExpressionsSuite` to test new functionality including: - Tests for the `Mode` function when handling strings with different collation settings. #### Benchmark Updates: - Enhanced the `CollationBenchmark` classes to include benchmarks for the new mode functionality with and without collation settings, as well as numerical types. ### Why are the changes needed? 1. Ensures consistency in handling string comparisons under various collation settings. 2. Improves global usability by enabling compatibility with different collation standards. ### Does this PR introduce _any_ user-facing change? Yes, this PR introduces the following user-facing changes: 1. Adds a new `collationEnabled` property to the `Mode` expression. 2. Users can now specify collation settings for the `Mode` expression to customize its behavior. ### How was this patch tested? This patch was tested through a combination of new and existing unit and end-to-end SQL tests. 1. **Unit Tests:** - **CollationStringExpressionsSuite:** - Make the newly added tests more in the same design pattern as the existing tests - Added multiple test cases to verify that the `Mode` function correctly handles strings with different collation settings. Out of scope: Special Unicode Cases higher planes Tests do not need to include Null Handling. 3. **Benchmark Tests:** 4. **Manual Testing:** ``` ./build/mvn -DskipTests clean package export SPARK_HOME=/Users/gideon/repos/spark $SPARK_HOME/bin/spark-shell spark.sqlContext.setConf("spark.sql.collation.enabled", "true") import org.apache.spark.sql.types.StringType import org.apache.spark.sql.functions import spark.implicits._ val data = Seq(("Def"), ("def"), ("DEF"), ("abc"), ("abc")) val df = data.toDF("word") val dfLC = df.withColumn("word", col("word").cast(StringType("UTF8_BINARY_LCASE"))) val dfLCA = dfLC.agg(org.apache.spark.sql.functions.mode(functions.col("word")).as("count")) dfLCA.show() /* BEFORE: -----+ |count| +-----+ | abc| +-----+ AFTER: +-----+ |count| +-----+ | Def| +-----+ */ ``` 6. **Continuous Integration (CI):** - The patch passed all relevant Continuous Integration (CI) checks, including: - Unit test suite - Benchmark suite - Consider moving the new benchmark to the catalyst module ### Was this patch authored or co-authored using generative AI tooling? Nope! Closes #46597 from GideonPotok/spark_47353_3_clean. Lead-authored-by: GideonPotok <[email protected]> Co-authored-by: Gideon Potok <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
SPARK-47353
Pull requests
Scala TreeMap (RB Tree)
GroupMapReduce <- Most performant
Comparing Experimental Approaches
Central Change to Mode
evalAlgorithm:evalMethod: Theevalmethod now checks if the column being looked at is string with non-default collation and if so, uses a groupingMinor Change to Mode:
collationId: A new lazy valuecollationIdis computed from thedataTypeof thechildexpression, used to fetch the appropriate collation comparator whencollationEnabledis true.Unit Test Enhancements: Significant additions to
CollationStringExpressionsSuiteto test new functionality including:Modefunction when handling strings with different collation settings.Benchmark Updates:
CollationBenchmarkclasses to include benchmarks for the new mode functionality with and without collation settings, as well as numerical types.Why are the changes needed?
Does this PR introduce any user-facing change?
Yes, this PR introduces the following user-facing changes:
collationEnabledproperty to theModeexpression.Modeexpression to customize its behavior.How was this patch tested?
This patch was tested through a combination of new and existing unit and end-to-end SQL tests.
Modefunction correctly handles strings with different collation settings.Tests do not need to include Null Handling.
Benchmark Tests:
Manual Testing:
Was this patch authored or co-authored using generative AI tooling?
No, but the PR Description was co-authored with Chat-GPT.
What is left to do before taking off the WIP tag?
Need to check but I think it is arbitrary in mode