-
Notifications
You must be signed in to change notification settings - Fork 29k
[WIP][SPARK-47353][SQL] Enable collation support for the Mode expression using a scala TreeMap (RB Tree) #46404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@uros-db This is ready for the initial round of review. Let me know what you think! |
43b4177 to
4214dd2
Compare
|
Can you fill the PR description please |
collagtion enabled or not Factored in null count.: benchmark benchmark ready for review ready for review ready for review ready for review ready for review ready for review use collation id tests pass tidy implementation idea: tree map tests tests support mode eval test passes
@HyukjinKwon done. |
|
@cloud-fan @MaxGekk @dbatomic just letting y'all know this is ready for first round of review. Thanks! |
sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
Outdated
Show resolved
Hide resolved
| val buffer = if (child.dataType.isInstanceOf[StringType] && collationEnabled) { | ||
| val modeMap = buff.foldLeft( | ||
| new TreeMap[org.apache.spark.unsafe.types.UTF8String, Long]()(Ordering.comparatorToOrdering( | ||
| CollationFactory.fetchCollation(collationId).comparator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the general approach of using TreeMap with custom comparator - however, I do worry about the performance
it's good that you added the benchmark, so we can take a look at the overall results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will be running it soon in GHA and will then add the ...results.txt report to the code change.
Here is a sneak Peek.
Note that once I make your other suggested changes, the UTF8_BINARY case will become the "Collation Not Enabled" Case, once I remove the collationEnabled flag from the Mode class signature...
[info] OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Mac OS X 14.4.1
[info] Apple M3 Max
[info] collation unit benchmarks - mode: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] UTF8_BINARY_LCASE 1027 1198 241 0.1 10272.2 1.0X
[info] UNICODE 3249 3262 19 0.0 32485.8 0.3X
[info] UTF8_BINARY 805 806 1 0.1 8050.6 1.3X
[info] UNICODE_CI 3623 3632 13 0.0 36233.6 0.3X
[info] Collation Not Enabled 28 30 1 3.5 284.6 36.1X
[info] Numerical Type 28 29 1 3.6 277.5 37.0X
As you can see, the performance is impacted!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is my plan:
- Get your thoughts on these performance results...
- Analyze the computation complexity in theory. Being very standard java Collection classes, the TreeMap vs OpenHashMap documentation will surely will state what their complexity class/big-o is known to be. After that, I will provide this analysis in the PR description.
- I can analyze the computation complexity empirically: I can either add to the benchmark or do an ad-hoc benchmark to determine how it grows as input grows the time grows .
Do you have any thoughts on what else I might want to look into?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uros-db see above comments -- I forgot to tag you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I think we can agree that a 130x slowdown isn't exactly great... perhaps we should look into other approaches - I would suggest that you open another PR for that tho
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uros-db org.apache.spark.util.collection.OpenHashMap is actually a spark data structure, so I could try to modify it in a second candidate PR and then see about having Mode rely on a OpenHashMap configured to be collation aware..
The way I could do it would be to modify org.apache.spark.util.collection.OpenHashSet.Hasher.hash() , specifically the override of hash within a class we would create Hasher[String with Collation...], to branch to an alternative hashing method that is collation sensitive.
(I would not be modifying org.apache.spark.util.collection.OpenHashSet#hashcode , which distributes the hashes more evenly and is called on values returned by Hasher.hash)
This should work because OpenHashMap relies internally on OpenHashSet which uses org.apache.spark.util.collection.OpenHashSet.Hasher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uros-db I basically implemented a proof of concept here.
I believe the benchmarking I did there (see the PR description) maybe shows 28x slowdown rather then a 89x slowdown. (I know we had a 130x slowdown before, but I modified the benchmarks slightly and now it is the following
Here is for the originally proposed implementation (This PR) which was the Red-black tree / TreeMap
[info] collation unit benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ---------------------------------------------------------------------------------------------------------------------------------
[info] UTF8_BINARY_LCASE - mode - 10000 elements 13 15 5 7.6 130.9 1.0X
[info] UNICODE - mode - 10000 elements 39 41 2 2.6 392.0 0.3X
[info] UTF8_BINARY - mode - 10000 elements 1 1 0 129.0 7.7 16.9X
[info] UNICODE_CI - mode - 10000 elements 39 41 2 2.6 391.4 0.3X
[info] Numerical Type - mode - 10000 elements 0 1 0 204.5 4.9 26.8X
[info] OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Mac OS X 14.4.1
[info] Apple M3 Max
[info] collation unit benchmarks - mode - 5000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] --------------------------------------------------------------------------------------------------------------------------------
[info] UTF8_BINARY_LCASE - mode - 5000 elements 4 5 0 11.1 89.7 1.0X
[info] UNICODE - mode - 5000 elements 13 14 1 3.8 266.2 0.3X
[info] UTF8_BINARY - mode - 5000 elements 0 0 0 140.5 7.1 12.6X
[info] UNICODE_CI - mode - 5000 elements 13 14 1 3.8 265.1 0.3X
[info] Numerical Type - mode - 5000 elements 0 0 0 251.4 4.0 22.5X
And here are results for the OpenHashMap implementation seen here
collation unit benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ---------------------------------------------------------------------------------------------------------------------------------
[info] UTF8_BINARY_LCASE - mode - 10000 elements 8 9 1 12.6 79.5 1.0X
[info] UNICODE - mode - 10000 elements 13 14 1 7.4 135.0 0.6X
[info] UTF8_BINARY - mode - 10000 elements 1 1 0 120.0 8.3 9.5X
[info] UNICODE_CI - mode - 10000 elements 14 14 1 7.3 137.9 0.6X
[info] Numerical Type - mode - 10000 elements 0 1 0 218.0 4.6 17.3X
[info] OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Mac OS X 14.4.1
[info] Apple M3 Max
[info] collation unit benchmarks - mode - 5000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] --------------------------------------------------------------------------------------------------------------------------------
[info] UTF8_BINARY_LCASE - mode - 5000 elements 3 3 0 18.6 53.6 1.0X
[info] UNICODE - mode - 5000 elements 5 6 0 9.5 104.9 0.5X
[info] UTF8_BINARY - mode - 5000 elements 0 0 0 148.4 6.7 8.0X
[info] UNICODE_CI - mode - 5000 elements 5 5 0 11.1 90.3 0.6X
[info] Numerical Type - mode - 5000 elements 0 0 0 267.6 3.7 14.3X
The thing I notice though is the slowdown is really happening most accutely for the unicode collations. Is there some underlying reason they are so much less performance then either UTF8_BINARY Collation? Is it because it can't compare byte by byte? I get why that would affect the TreeMap implementation, but if it is affecting the prototype for use of OpenHashMap with different hashing function, maybe it is from the conversion to UTF8String by both implementations. Which is not related to choice of data structures.
Please let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the Numerical benchmark case should be ignored for this analysis. The case we can compare the various cases against is the UTF8_BINARY case. Thus, For this implementation, the slowdown ranges from 16.9x to 56x. And for the prototype, it ranges from 9.5x to 15x
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am happy to try additional alternative approaches, BTW.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also added a third approach to the experimental pr #46488
What kinds of cardinalities are expected? Eg even if the data frame has 1 Trillion Rows, would it not be highly rare use case to have 1 Trillion unique values? That might be worth thinking about when it comes to the range of what to benchmark.
And what percent of values in the data frames to be benchmarked should be duplicate values, within the same collation? It is really just one very specific case we are benchmarking right now, and at least in the experimental branch, it could be useful to play around with that while we evaluate different algorithms.
| case (_, _) => | ||
| throw new IllegalArgumentException("Mode expects non-null string type.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just checking - is this how Mode handles null values currently in Spark?
(without the collation changes in this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uros-db Yep.
The buffer should never contain a key that is null, because it is not added to the map during Mode.update(). The behavior this supports is that, according to the docs: "NULL values are ignored. If all the values are NULL, or there are 0 rows, returns NULL."
The subtleties of all this are fresh in my mind from looking into it recently -- So just let me know if there is an aspect of this you need more details about and I am happy to explain.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
Outdated
Show resolved
Hide resolved
uros-db
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kudos for your effort on this ticket! I left some comments
also, please try to ensure that your code doesn't introduce any breaking changes (for example, I see that SQLQueryTestSuite has a failing test for Mode) before requesting thorough review
however, it's also good to verify the approach as soon as possible - so I would advise making these fixes and committing the benchmark results
|
just another quick note based on your previous PRs - no need to force push and disrupt the commit history, in general I think it's easier to follow changes in this PR as we comment & resolve conversations, and look at commits as they come |
@uros-db Thanks for the review! I hear you that I might have waited before I brought in the other reviewers. Thanks for initial feedback! |
…st can then be removed
…st can then be removed
|
|
||
| override def eval(buffer: OpenHashMap[AnyRef, Long]): Any = { | ||
| if (buffer.isEmpty) { | ||
| override def eval(buff: OpenHashMap[AnyRef, Long]): Any = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid the change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either I am renaming here or line 83. If I change line 83 instead, I will have to change all of the references to buffer on L85 and L86. Do you have a particular preference and if so, why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some changes may be unavoidable, but since we can choose - let's opt for keeping the method signature intact
| return null | ||
| } | ||
| val buffer = if (isCollatedString(child)) { | ||
| val modeMap = buff.foldLeft( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
place val collationId: Int = child.dataType.asInstanceOf[StringType].collationId here so that avoid lazy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am copying a pattern I have seen in other Expression implementations of collation. But I am fine with making the suggested change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine either way, I think we should generally aim to include collationId as part of the expression field (as we've done for other expressions), and there should be no harm since it's a lazy val
on the other hand, collationId is used only in one place here, but still I'd keep it as it is
This comment has been minimized.
This comment has been minimized.
### What changes were proposed in this pull request? [SPARK-47353](https://issues.apache.org/jira/browse/SPARK-47353) #### Pull requests [Scala TreeMap (RB Tree)](#46404) [GroupMapReduce](#46526) <- Most performant [GroupMapReduce (Cleaned up) (This PR)](#46597) <- Most performant [Comparing Experimental Approaches ](#46488) https://github.com/apache/spark/pull/46597/files#r1626058908 -> #46917 (comment) #### Central Change to Mode `eval` Algorithm: - Update to `eval` Method: The `eval` method now checks if the column being looked at is string with non-default collation and if so, uses a grouping ``` buff.toSeq.groupMapReduce { case (key: String, _) => CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) case (key: UTF8String, _) => CollationFactory.getCollationKey(key, collationId) case (key, _) => key }(x => x)((x, y) => (x._1, x._2 + y._2)).values ``` #### Minor Change to Mode: - Introduction of `collationId`: A new lazy value `collationId` is computed from the `dataType` of the `child` expression, used to fetch the appropriate collation comparator when `collationEnabled` is true. This PR will fail for complex types containing collated strings Follow up PR will implement that #### Unit Test Enhancements: Significant additions to `CollationStringExpressionsSuite` to test new functionality including: - Tests for the `Mode` function when handling strings with different collation settings. #### Benchmark Updates: - Enhanced the `CollationBenchmark` classes to include benchmarks for the new mode functionality with and without collation settings, as well as numerical types. ### Why are the changes needed? 1. Ensures consistency in handling string comparisons under various collation settings. 2. Improves global usability by enabling compatibility with different collation standards. ### Does this PR introduce _any_ user-facing change? Yes, this PR introduces the following user-facing changes: 1. Adds a new `collationEnabled` property to the `Mode` expression. 2. Users can now specify collation settings for the `Mode` expression to customize its behavior. ### How was this patch tested? This patch was tested through a combination of new and existing unit and end-to-end SQL tests. 1. **Unit Tests:** - **CollationStringExpressionsSuite:** - Make the newly added tests more in the same design pattern as the existing tests - Added multiple test cases to verify that the `Mode` function correctly handles strings with different collation settings. Out of scope: Special Unicode Cases higher planes Tests do not need to include Null Handling. 3. **Benchmark Tests:** 4. **Manual Testing:** ``` ./build/mvn -DskipTests clean package export SPARK_HOME=/Users/gideon/repos/spark $SPARK_HOME/bin/spark-shell spark.sqlContext.setConf("spark.sql.collation.enabled", "true") import org.apache.spark.sql.types.StringType import org.apache.spark.sql.functions import spark.implicits._ val data = Seq(("Def"), ("def"), ("DEF"), ("abc"), ("abc")) val df = data.toDF("word") val dfLC = df.withColumn("word", col("word").cast(StringType("UTF8_BINARY_LCASE"))) val dfLCA = dfLC.agg(org.apache.spark.sql.functions.mode(functions.col("word")).as("count")) dfLCA.show() /* BEFORE: -----+ |count| +-----+ | abc| +-----+ AFTER: +-----+ |count| +-----+ | Def| +-----+ */ ``` 6. **Continuous Integration (CI):** - The patch passed all relevant Continuous Integration (CI) checks, including: - Unit test suite - Benchmark suite - Consider moving the new benchmark to the catalyst module ### Was this patch authored or co-authored using generative AI tooling? Nope! Closes #46597 from GideonPotok/spark_47353_3_clean. Lead-authored-by: GideonPotok <[email protected]> Co-authored-by: Gideon Potok <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
SPARK-47353
Pull requests
Scala TreeMap (RB Tree)
GroupMapReduce <- Most performant
Comparing Experimental Approaches
Central Change to Mode
evalAlgorithm:evalMethod: Theevalmethod now checks if the column being looked at is string with non-default collation and if so, uses aTreeMapto manage keys and counts. - TreeMap takes a comparator to determine equality, so supports an efficient combination of counts by keys equal under that comparator but that were not by the OpenHashMap, which uses the default physical equality.Minor Change to Mode:
collationId: A new lazy valuecollationIdis computed from thedataTypeof thechildexpression, used to fetch the appropriate collation comparator whencollationEnabledis true.Unit Test Enhancements: Significant additions to
CollationStringExpressionsSuiteto test new functionality including:Modefunction when handling strings with different collation settings.Benchmark Updates:
CollationBenchmarkclasses to include benchmarks for the new mode functionality with and without collation settings, as well as numerical types.Why are the changes needed?
Does this PR introduce any user-facing change?
Yes, this PR introduces the following user-facing changes:
collationEnabledproperty to theModeexpression.Modeexpression to customize its behavior.How was this patch tested?
This patch was tested through a combination of new and existing unit and end-to-end SQL tests.
Unit Tests:
Modefunction correctly handles strings with different collation settings.Benchmark Tests:
Manual Testing:
Was this patch authored or co-authored using generative AI tooling?
No, but the PR Description was co-authored with Chat-GPT.
What is left to do before taking off the WIP tag?
Need to check but I think it is arbitrary in mode