Skip to content

Conversation

@NVnavkumar
Copy link
Collaborator

@NVnavkumar NVnavkumar commented Jul 6, 2022

Fixes #4932, #5228 , #5188, and #5222. Requires rapidsai/cudf#11043 and rapidsai/cudf#11143 to be merged.

This implements the SQL functions for set-based operations on arrays array_intersect, array_union, array_except, and arrays_overlap for running on the GPU. A few caveats:

  1. There is a minor bug ([BUG] Exception calling collect() when partitioning using with arrays with null values using array_union(...) #5957) that came up when testing array_union, and it looks like a bug in the partitioning code when using collect()

  2. The 3 operations array_intersect, array_union and array_except cannot guarantee the same order when used on the GPU vs the CPU (because they are set-based operations), so we wrap these operations in sort_array(...) call for testing purposes.

  3. arrays_overlap returns a boolean result value, so there is no need to sort, but the implementation is complicated because of these requirements for the function implementation (see https://spark.apache.org/docs/3.2.1/api/sql/index.html#arrays_overlap)

arrays_overlap(a1, a2) - Returns true if a1 contains at least a non-null element present also in a2. If the arrays have no common element and they are both non-empty and either of them contains a null element null is returned, false otherwise.

@sameerz sameerz added the feature request New feature or request label Jul 6, 2022
@NVnavkumar NVnavkumar self-assigned this Jul 7, 2022
'arrays_overlap(array(), b)',
'arrays_overlap(a, a)',
)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be an empty line at the end of the file as an implicit convention.

Suggested change
)
)

@ttnghia ttnghia added SQL part of the SQL/Dataframe plugin cudf_dependency An issue or PR with this label depends on a new feature in cudf labels Jul 11, 2022
@NVnavkumar NVnavkumar marked this pull request as ready for review July 15, 2022 15:59
@NVnavkumar NVnavkumar marked this pull request as draft July 19, 2022 01:15
@NVnavkumar
Copy link
Collaborator Author

Another example:

scala> val df = List((Array(Double.NaN), Array(-Double.NaN))).toDF("a","b")
df: org.apache.spark.sql.DataFrame = [a: array<double>, b: array<double>]

scala> df.selectExpr("array_union(a, b)").collect
res1: Array[org.apache.spark.sql.Row] = Array([WrappedArray(NaN)])

@ttnghia
Copy link
Collaborator

ttnghia commented Jul 23, 2022

What's wrong with my spark-shell (spark 3.1.2)?

scala> spark.sql("SELECT array_union(array(cast('nan' as double)), array(-cast('-nan' as double)))").show
+------------------------------------------------------------------------+
|array_union(array(CAST(nan AS DOUBLE)), array((- CAST(-nan AS DOUBLE))))|
+------------------------------------------------------------------------+
|                                                             [NaN, null]|
+------------------------------------------------------------------------+


scala> spark.sql("SELECT array_union(array(cast('nan' as double)), array(-cast('nan' as double)))").show
+-----------------------------------------------------------------------+
|array_union(array(CAST(nan AS DOUBLE)), array((- CAST(nan AS DOUBLE))))|
+-----------------------------------------------------------------------+
|                                                             [NaN, NaN]|
+-----------------------------------------------------------------------+


scala> val df = List((Array(Double.NaN), Array(-Double.NaN))).toDF("a","b")
df: org.apache.spark.sql.DataFrame = [a: array<double>, b: array<double>]

scala> df.selectExpr("array_union(a, b)").collect
res17: Array[org.apache.spark.sql.Row] = Array([WrappedArray(NaN, NaN)])

@ttnghia
Copy link
Collaborator

ttnghia commented Jul 23, 2022

@NVnavkumar Did you generate your last example by running on the GPU? I did mine on the CPU. If the correct behavior for comparing NaNs is that they are always considered unequal then I need to fix the JNI layer to pass in the correct NaN comparison parameter.

@NVnavkumar
Copy link
Collaborator Author

@NVnavkumar Did you generate your last example by running on the GPU? I did mine on the CPU. If the correct behavior for comparing NaNs is that they are always considered unequal then I need to fix the JNI layer to pass in the correct NaN comparison parameter.

Both examples were on the CPU, Spark 3.2.1

@NVnavkumar
Copy link
Collaborator Author

Tried with Spark 3.1.3

scala> val df = List((Array(Double.NaN), Array(-Double.NaN))).toDF("a","b")
df: org.apache.spark.sql.DataFrame = [a: array<double>, b: array<double>]

scala> df.selectExpr("array_union(a, b)").collect
res0: Array[org.apache.spark.sql.Row] = Array([WrappedArray(NaN)])

But in Spark 3.1.1, there is this behavior:

scala> val df = List((Array(Double.NaN), Array(-Double.NaN))).toDF("a","b")
df: org.apache.spark.sql.DataFrame = [a: array<double>, b: array<double>]

scala> df.selectExpr("array_union(a, b)").collect
res0: Array[org.apache.spark.sql.Row] = Array([WrappedArray(NaN, NaN)])

@ttnghia
Copy link
Collaborator

ttnghia commented Jul 23, 2022

Do we have any config for this? If we can retrieve the config in Spark then we can select the right parameter to call cudf API.

Similar to this:

val legacyStatisticalAggregate = SQLConf.get.legacyStatisticalAggregate

@NVnavkumar
Copy link
Collaborator Author

This is interesting, because in the JDK itself, Double.doubleToLongBits actually shortcuts the NaN condition:

https://github.com/openjdk/jdk/blob/a0a0539b0d3f9b6809c9759e697bfafd7b138ec1/src/java.base/share/classes/java/lang/Double.java#L860-L865

So I guess Spark wasn't using it before 3.1.3

@NVnavkumar
Copy link
Collaborator Author

NVnavkumar commented Jul 23, 2022

Do we have any config for this? If we can retrieve the config in Spark then we can select the right parameter to call cudf API.

Similar to this:

val legacyStatisticalAggregate = SQLConf.get.legacyStatisticalAggregate

It looks like they switched to SQLOpenHashSet in 3.1.3 and added the NaN check:

https://github.com/apache/spark/blob/d1f8a503a26bcfb4e466d9accc5fa241a7933667/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3215

Here's the PR: apache/spark#33955 and the original Spark issue with NaN: https://issues.apache.org/jira/browse/SPARK-36702

@NVnavkumar
Copy link
Collaborator Author

I think I will try to xfail the test for 3.1.1 and 3.1.2 and file a follow up issue for the NaN case.

@NVnavkumar NVnavkumar requested a review from jlowe July 26, 2022 20:33
jlowe
jlowe previously approved these changes Jul 26, 2022
jlowe
jlowe previously approved these changes Jul 27, 2022
@NVnavkumar NVnavkumar marked this pull request as ready for review July 28, 2022 14:04
@NVnavkumar
Copy link
Collaborator Author

build

@ttnghia ttnghia requested a review from revans2 July 28, 2022 14:17
@ttnghia
Copy link
Collaborator

ttnghia commented Jul 28, 2022

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is documentation. We have some new functionality that is not 100% compatible with Spark. We should at a minimum document that it is not compatible and exactly in what ways it is not compatible. Ideally if there are issues with how Spark does something, like with -0.0 and we have filed an issue for it with Spark we should include all of that in the documentation.

@NVnavkumar
Copy link
Collaborator Author

build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cudf_dependency An issue or PR with this label depends on a new feature in cudf feature request New feature or request SQL part of the SQL/Dataframe plugin

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Support ArrayIntersect on at least Arrays of String

5 participants