Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d88555c
Factored in null count.:
GideonPotok May 4, 2024
0f0eedf
fixed up tests according to expectations surrounding nulls.
GideonPotok May 7, 2024
e1a533f
mode benchmark
GideonPotok May 7, 2024
a98eebe
mode benchmark
GideonPotok May 7, 2024
e4bf907
remove class member for collatrion enabled
GideonPotok May 7, 2024
c89af54
remove class member for collatrion enabled
GideonPotok May 7, 2024
794b20a
remove class member for collatrion enabled
GideonPotok May 7, 2024
3e891df
dataType check can be incorporated into the previous test, so this te…
GideonPotok May 7, 2024
0849a21
dataType check can be incorporated into the previous test, so this te…
GideonPotok May 7, 2024
e79e14e
scalastyle
GideonPotok May 7, 2024
9a32243
scalastyle
GideonPotok May 7, 2024
506f7fc
fix add back ._1
GideonPotok May 8, 2024
16ed98f
fix up
GideonPotok May 8, 2024
6891c9b
tests pass
GideonPotok May 8, 2024
3a0edac
tests pass
GideonPotok May 8, 2024
abea836
test
GideonPotok May 8, 2024
7fc7561
import _
GideonPotok May 8, 2024
05fe1a9
added bm results
GideonPotok May 9, 2024
a5710d1
tests pass
GideonPotok May 10, 2024
76f089f
tests pass
GideonPotok May 10, 2024
d8ea771
tests pass
GideonPotok May 10, 2024
cc63899
Merge branch 'master' into spark_47353_3
GideonPotok May 10, 2024
af187ac
Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expr…
GideonPotok May 13, 2024
184317d
removed withCollatedString
GideonPotok May 13, 2024
cd74bc8
buff->buffer
GideonPotok May 13, 2024
8f2525a
added the jdk 17 benchmarks.
GideonPotok May 13, 2024
7aca2e3
better benchmark
GideonPotok May 13, 2024
81e2b44
better benchmark
GideonPotok May 13, 2024
dec21a5
better benchmark
GideonPotok May 13, 2024
045c007
Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expr…
GideonPotok May 14, 2024
d85c052
up to date benchmarks
GideonPotok May 14, 2024
af016f4
scalastyle
GideonPotok May 14, 2024
3fbe2b2
scalastyle
GideonPotok May 14, 2024
ab7fa8e
tests with higher unicode planes - corner cases
GideonPotok May 14, 2024
c86e01a
removed those unicode tests for now at least
GideonPotok May 14, 2024
08a3e0a
removed extra benchmarks
GideonPotok May 14, 2024
6ff346d
Merge branch 'master' into spark_47353_3
GideonPotok May 14, 2024
d3911cb
move tests to CollationSQLExpressionsSuite
GideonPotok May 14, 2024
904b9c5
undo inadvertant change
GideonPotok May 14, 2024
4ae4534
imports
GideonPotok May 14, 2024
0fac136
Add back old benchmark logic, too.
GideonPotok May 14, 2024
9b60a31
Update sql/core/src/test/scala/org/apache/spark/sql/CollationStringEx…
GideonPotok May 14, 2024
36f38e3
wip
GideonPotok May 15, 2024
0f2a456
wip
GideonPotok May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, UnresolvedWith
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.catalyst.types.PhysicalDataType
import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.catalyst.util.{CollationFactory, GenericArrayData}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType}
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.collection.OpenHashMap


case class Mode(
child: Expression,
mutableAggBufferOffset: Int = 0,
Expand All @@ -35,6 +37,8 @@ case class Mode(
extends TypedAggregateWithHashMapAsBuffer with ImplicitCastInputTypes
with SupportsOrderingWithinGroup with UnaryLike[Expression] {

final lazy val collationId: Int = child.dataType.asInstanceOf[StringType].collationId

def this(child: Expression) = this(child, 0, 0)

def this(child: Expression, reverse: Boolean) = {
Expand Down Expand Up @@ -74,6 +78,19 @@ case class Mode(
if (buffer.isEmpty) {
return null
}
val collationAwareBuffer =
if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
val modeMap = buffer.toSeq.groupMapReduce {
case (key: String, _) =>
CollationFactory.getCollationKey(UTF8String.fromString(key), collationId)
case (key: UTF8String, _) =>
CollationFactory.getCollationKey(key, collationId)
case (key, _) => key
}(x => x)((x, y) => (x._1, x._2 + y._2)).values
modeMap
} else {
buffer
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going back to the original issue (why Mode doesn't work already, while otherwise Aggregation generally works with collated strings in Spark), here's what I'm interested in: why does PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]] not work here automatically?

afaik, ordering for PhysicalStringType is defined correctly:

private[sql] val ordering = CollationFactory.fetchCollation(collationId).comparator.compare(_, _)

so one would naturally expect Mode to work "as is"

did you investigate this maybe?

Copy link
Contributor Author

@GideonPotok GideonPotok May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uros-db While the ordering for PhysicalStringType is correctly established using private[sql] val ordering = CollationFactory.fetchCollation(collationId).comparator.compare(_, _), this does not automatically resolve the issue with Mode. To illustrate, consider the example of UTF8_BINARY_LCASE where an input like Map("a" -> 3L, "b" -> 2L, "B" -> 2L) results in evaluating the maximum over the tuples (2L, "B"), (2L, "b"), (3L, "a") rather than the expected (3L, "a"), (4L, "b"). This indicates that the current approach doesn't aggregate values as required for Mode to operate correctly. Unit tests confirm that Mode otherwise won't work for such cases.


reverseOpt.map { reverse =>
val defaultKeyOrdering = if (reverse) {
Expand All @@ -82,8 +99,8 @@ case class Mode(
PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]]
}
val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering)
buffer.maxBy { case (key, count) => (count, key) }(ordering)
}.getOrElse(buffer.maxBy(_._2))._1
collationAwareBuffer.maxBy { case (key, count) => (count, key) }(ordering)
}.getOrElse(collationAwareBuffer.maxBy(_._2))._1
}

override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Mode =
Expand Down Expand Up @@ -249,7 +266,7 @@ case class PandasMode(
val (key, count) = iter.next()
if (maxCount < count) {
modes.clear()
modes.append(key)
modes. append(key)
maxCount = count
} else if (maxCount == count) {
modes.append(key)
Expand Down
56 changes: 32 additions & 24 deletions sql/core/benchmarks/CollationBenchmark-jdk21-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,61 @@ OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 2948 2958 13 0.0 29483.6 1.0X
UNICODE 2040 2042 3 0.0 20396.6 1.4X
UTF8_BINARY 2043 2043 0 0.0 20426.3 1.4X
UNICODE_CI 16318 16338 28 0.0 163178.4 0.2X
UTF8_BINARY_LCASE 2889 2923 48 0.0 28892.1 1.0X
UNICODE 2748 2748 1 0.0 27476.5 1.1X
UTF8_BINARY 2744 2745 1 0.0 27439.5 1.1X
UNICODE_CI 16815 16817 2 0.0 168154.3 0.2X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 3227 3228 1 0.0 32272.1 1.0X
UNICODE 16637 16643 9 0.0 166367.7 0.2X
UTF8_BINARY 3132 3137 7 0.0 31319.2 1.0X
UNICODE_CI 17816 17829 18 0.0 178162.4 0.2X
UTF8_BINARY_LCASE 4782 4784 3 0.0 47819.3 1.0X
UNICODE 18986 18995 13 0.0 189855.8 0.3X
UTF8_BINARY 5026 5048 31 0.0 50258.2 1.0X
UNICODE_CI 19735 19771 50 0.0 197351.1 0.2X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 4824 4824 0 0.0 48243.7 1.0X
UNICODE 69416 69475 84 0.0 694158.3 0.1X
UTF8_BINARY 3806 3808 2 0.0 38062.8 1.3X
UNICODE_CI 60943 60975 45 0.0 609426.2 0.1X
UTF8_BINARY_LCASE 4933 4933 1 0.0 49330.9 1.0X
UNICODE 68091 68119 40 0.0 680908.8 0.1X
UTF8_BINARY 3878 3879 2 0.0 38782.4 1.3X
UNICODE_CI 55501 55526 35 0.0 555014.2 0.1X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 11979 11980 1 0.0 119790.4 1.0X
UNICODE 6469 6474 7 0.0 64694.8 1.9X
UTF8_BINARY 7253 7253 1 0.0 72528.3 1.7X
UNICODE_CI 319124 319881 1070 0.0 3191244.0 0.0X
UTF8_BINARY_LCASE 10441 10444 4 0.0 104412.3 1.0X
UNICODE 5811 5812 1 0.0 58106.6 1.8X
UTF8_BINARY 6397 6411 19 0.0 63971.7 1.6X
UNICODE_CI 323853 324618 1082 0.0 3238530.0 0.0X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - startsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 11584 11595 15 0.0 115841.4 1.0X
UNICODE 6155 6156 2 0.0 61548.7 1.9X
UTF8_BINARY 6979 6982 5 0.0 69785.6 1.7X
UNICODE_CI 318228 318726 705 0.0 3182275.2 0.0X
UTF8_BINARY_LCASE 10123 10154 44 0.0 101227.4 1.0X
UNICODE 5682 5686 7 0.0 56815.0 1.8X
UTF8_BINARY 6296 6300 5 0.0 62961.9 1.6X
UNICODE_CI 318720 318957 336 0.0 3187199.4 0.0X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - endsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 11655 11664 12 0.0 116552.8 1.0X
UNICODE 6235 6239 5 0.0 62350.8 1.9X
UTF8_BINARY 7066 7069 5 0.0 70658.1 1.6X
UNICODE_CI 313515 313999 685 0.0 3135149.1 0.0X
UTF8_BINARY_LCASE 10195 10198 5 0.0 101948.5 1.0X
UNICODE 5731 5732 1 0.0 57314.8 1.8X
UTF8_BINARY 6344 6366 31 0.0 63443.6 1.6X
UNICODE_CI 324196 325450 1772 0.0 3241964.4 0.0X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - mode - 30105 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE - mode - 30105 elements 6 6 0 5.1 195.6 1.0X
UNICODE - mode - 30105 elements 3 3 0 11.6 86.0 2.3X
UTF8_BINARY - mode - 30105 elements 3 3 0 11.6 85.9 2.3X
UNICODE_CI - mode - 30105 elements 12 12 1 2.6 382.9 0.5X
Loading