-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-36753][SQL] ArrayExcept handle duplicated Double.NaN and Float.NaN #33994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 2 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
e9cb989
[SPARK-36753][SQL] ArrayExcept handle duplicated Double.NaN and Float…
AngersZhuuuu d73557c
update
AngersZhuuuu 2b607fa
Merge branch 'master' into SPARK-36753
AngersZhuuuu bde7927
Update collectionOperations.scala
AngersZhuuuu 061132f
Merge branch 'master' into SPARK-36753
AngersZhuuuu 662bf05
trigger
AngersZhuuuu 08f28c0
update
AngersZhuuuu 394df1d
Update collectionOperations.scala
AngersZhuuuu 5f4d1bb
Update collectionOperations.scala
AngersZhuuuu 3ef22d7
Update collectionOperations.scala
AngersZhuuuu 82640f5
Merge branch 'master' into SPARK-36753
AngersZhuuuu 77b1661
Update SQLOpenHashSet.scala
AngersZhuuuu cd3641d
Update collectionOperations.scala
AngersZhuuuu d319d65
Merge branch 'master' into SPARK-36753
AngersZhuuuu 797089d
Update collectionOperations.scala
AngersZhuuuu 6eaa96a
Update collectionOperations.scala
AngersZhuuuu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4073,14 +4073,20 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL | |
| if (TypeUtils.typeWithProperEquals(elementType)) { | ||
| (array1, array2) => | ||
| val hs = new OpenHashSet[Any] | ||
| val isNaN = SQLOpenHashSet.isNaN(elementType) | ||
| var notFoundNullElement = true | ||
| var notFoundNaNElement = true | ||
| var i = 0 | ||
| while (i < array2.numElements()) { | ||
| if (array2.isNullAt(i)) { | ||
| notFoundNullElement = false | ||
| } else { | ||
| val elem = array2.get(i, elementType) | ||
| hs.add(elem) | ||
| if (isNaN(elem)) { | ||
| notFoundNaNElement = false | ||
| } else { | ||
| hs.add(elem) | ||
| } | ||
| } | ||
| i += 1 | ||
| } | ||
|
|
@@ -4094,9 +4100,16 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL | |
| } | ||
| } else { | ||
| val elem = array1.get(i, elementType) | ||
| if (!hs.contains(elem)) { | ||
| arrayBuffer += elem | ||
| hs.add(elem) | ||
| if (isNaN(elem)) { | ||
| if (notFoundNaNElement) { | ||
| arrayBuffer += elem | ||
| notFoundNaNElement = false | ||
| } | ||
| } else { | ||
| if (!hs.contains(elem)) { | ||
| arrayBuffer += elem | ||
| hs.add(elem) | ||
| } | ||
| } | ||
| } | ||
| i += 1 | ||
|
|
@@ -4168,13 +4181,20 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL | |
| nullSafeCodeGen(ctx, ev, (array1, array2) => { | ||
| val notFoundNullElement = ctx.freshName("notFoundNullElement") | ||
| val nullElementIndex = ctx.freshName("nullElementIndex") | ||
| val notFoundNaNElement = ctx.freshName("notFoundNaNElement") | ||
| val builder = ctx.freshName("builder") | ||
| val openHashSet = classOf[OpenHashSet[_]].getName | ||
| val classTag = s"scala.reflect.ClassTag$$.MODULE$$.$hsTypeName()" | ||
| val hashSet = ctx.freshName("hashSet") | ||
| val arrayBuilder = classOf[mutable.ArrayBuilder[_]].getName | ||
| val arrayBuilderClass = s"$arrayBuilder$$of$ptName" | ||
|
|
||
| val isNaNMethod = elementType match { | ||
| case DoubleType => Some(s"java.lang.Double.isNaN((double)$value)") | ||
| case FloatType => Some(s"java.lang.Float.isNaN((float)$value)") | ||
| case _ => None | ||
| } | ||
|
|
||
| def withArray2NullCheck(body: String): String = | ||
| if (right.dataType.asInstanceOf[ArrayType].containsNull) { | ||
| if (left.dataType.asInstanceOf[ArrayType].containsNull) { | ||
|
|
@@ -4197,11 +4217,21 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL | |
| body | ||
| } | ||
|
|
||
| def withArray2NaNCheck(body: String): String = { | ||
| isNaNMethod.map { isNaN => | ||
| s""" | ||
| |if ($isNaN) { | ||
| | $notFoundNaNElement = false; | ||
| |} else { | ||
| | $body | ||
| |} | ||
| """.stripMargin | ||
| } | ||
| }.getOrElse(body) | ||
|
|
||
| val writeArray2ToHashSet = withArray2NullCheck( | ||
| s""" | ||
| |$jt $value = ${genGetValue(array2, i)}; | ||
| |$hashSet.add$hsPostFix($hsValueCast$value); | ||
| """.stripMargin) | ||
| s"$jt $value = ${genGetValue(array2, i)};" + | ||
| withArray2NaNCheck(s"$hashSet.add$hsPostFix($hsValueCast$value);")) | ||
|
|
||
| def withArray1NullAssignment(body: String) = | ||
| if (left.dataType.asInstanceOf[ArrayType].containsNull) { | ||
|
|
@@ -4221,17 +4251,35 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL | |
| body | ||
| } | ||
|
|
||
| val processArray1 = withArray1NullAssignment( | ||
| def withArray1NaNCheck(body: String): String = { | ||
| isNaNMethod.map { isNaN => | ||
| s""" | ||
| |if ($isNaN) { | ||
| | if ($notFoundNaNElement) { | ||
| | $notFoundNaNElement = false; | ||
| | $size++; | ||
| | $builder.$$plus$$eq($value); | ||
| | } | ||
| |} else { | ||
| | $body | ||
| |} | ||
| """.stripMargin | ||
| } | ||
| }.getOrElse(body) | ||
|
|
||
| val body = | ||
| s""" | ||
| |$jt $value = ${genGetValue(array1, i)}; | ||
| |if (!$hashSet.contains($hsValueCast$value)) { | ||
| | if (++$size > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { | ||
| | break; | ||
| | } | ||
| | $hashSet.add$hsPostFix($hsValueCast$value); | ||
| | $builder.$$plus$$eq($value); | ||
| |} | ||
| """.stripMargin) | ||
| """.stripMargin | ||
|
|
||
| val processArray1 = withArray1NullAssignment( | ||
| s"$jt $value = ${genGetValue(array1, i)};" + withArray1NaNCheck(body)) | ||
|
|
||
| // Only need to track null element index when array1's element is nullable. | ||
| val declareNullTrackVariables = if (left.dataType.asInstanceOf[ArrayType].containsNull) { | ||
|
|
@@ -4243,9 +4291,16 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL | |
| "" | ||
| } | ||
|
|
||
| // Only need to track NaN element index when array1's element is DoubleType or FloatType. | ||
| val declareNaNTrackVariables = elementType match { | ||
| case DoubleType | FloatType => s"boolean $notFoundNaNElement = true;" | ||
| case _ => "" | ||
| } | ||
|
|
||
|
||
| s""" | ||
| |$openHashSet $hashSet = new $openHashSet$hsPostFix($classTag); | ||
| |$declareNullTrackVariables | ||
| |$declareNaNTrackVariables | ||
| |for (int $i = 0; $i < $array2.numElements(); $i++) { | ||
| | $writeArray2ToHashSet | ||
| |} | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this, let's wait a little bit for the decision at the first PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done