From d88555c19298ebbf987e200a7ba40facb70bad32 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Sat, 4 May 2024 16:52:10 -0400 Subject: [PATCH 01/42] Factored in null count.: collagtion enabled or not Factored in null count.: benchmark benchmark ready for review ready for review ready for review ready for review ready for review ready for review use collation id tests pass tidy implementation idea: tree map tests tests support mode eval test passes --- .../catalyst/expressions/aggregate/Mode.scala | 64 ++++++++++--- .../catalyst/util/V2ExpressionBuilder.scala | 2 +- .../sql/CollationStringExpressionsSuite.scala | 90 ++++++++++++++++++- .../benchmark/CollationBenchmark.scala | 55 ++++++++++++ 4 files changed, 196 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index d1a9cafdf61fa..d1b98adaacdfb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -17,28 +17,36 @@ package org.apache.spark.sql.catalyst.expressions.aggregate +import scala.collection.mutable.TreeMap + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, UnresolvedWithinGroup} import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder} import org.apache.spark.sql.catalyst.trees.UnaryLike import org.apache.spark.sql.catalyst.types.PhysicalDataType -import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.catalyst.util.{CollationFactory, GenericArrayData} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.collection.OpenHashMap + case class Mode( child: Expression, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0, - reverseOpt: Option[Boolean] = None) + reverseOpt: Option[Boolean] = None, + collationEnabled: Boolean = false) extends TypedAggregateWithHashMapAsBuffer with ImplicitCastInputTypes with SupportsOrderingWithinGroup with UnaryLike[Expression] { + final lazy val collationId: Int = child.dataType.asInstanceOf[StringType].collationId + def this(child: Expression) = this(child, 0, 0) - def this(child: Expression, reverse: Boolean) = { - this(child, 0, 0, Some(reverse)) + def this(child: Expression, reverse: Boolean, collationEnabled: Boolean) = { + this(child, 0, 0, Some(reverse), collationEnabled = collationEnabled) } // Returns null for empty inputs @@ -70,12 +78,36 @@ case class Mode( buffer } - override def eval(buffer: OpenHashMap[AnyRef, Long]): Any = { - if (buffer.isEmpty) { + override def eval(buff: OpenHashMap[AnyRef, Long]): Any = { + if (buff.isEmpty) { return null } + var nullCount = 0L + val buffer = if (child.dataType.isInstanceOf[StringType] && collationEnabled) { + val modeMap = buff.foldLeft( + new TreeMap[org.apache.spark.unsafe.types.UTF8String, Long]()(Ordering.comparatorToOrdering( + CollationFactory.fetchCollation(collationId).comparator + ))) + { + case (map, (key: String, count)) => + map(org.apache.spark.unsafe.types.UTF8String.fromString(key)) = + map.getOrElse(org.apache.spark.unsafe.types.UTF8String.fromString(key), 0L) + count + map + case (map, (key: UTF8String, count)) => + map(key) = map.getOrElse(key, 0L) + count + map + case (map, (null, count)) => + nullCount = count + map + case (_, _) => + throw new IllegalArgumentException("Mode expects string type") + } + modeMap + } else { + buff + } - reverseOpt.map { reverse => + val t2 = reverseOpt.map { reverse => val defaultKeyOrdering = if (reverse) { PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]].reverse } else { @@ -83,7 +115,13 @@ case class Mode( } val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering) buffer.maxBy { case (key, count) => (count, key) }(ordering) - }.getOrElse(buffer.maxBy(_._2))._1 + }.getOrElse(buffer.maxBy(_._2)) + + if (nullCount > t2._2) { + null + } else { + t2._1 + } } override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Mode = @@ -162,10 +200,10 @@ object ModeBuilder extends ExpressionBuilder { override def build(funcName: String, expressions: Seq[Expression]): Expression = { val numArgs = expressions.length if (numArgs == 0) { - Mode(UnresolvedWithinGroup) + Mode(UnresolvedWithinGroup, collationEnabled = SQLConf.get.collationEnabled) } else if (numArgs == 1) { // For compatibility with function calls without WITHIN GROUP. - Mode(expressions(0)) + Mode(expressions(0), collationEnabled = SQLConf.get.collationEnabled) } else if (numArgs == 2) { // For compatibility with function calls without WITHIN GROUP. if (!expressions(1).foldable) { @@ -181,9 +219,9 @@ object ModeBuilder extends ExpressionBuilder { funcName, 2, BooleanType, expressions(1)) } if (deterministicResult.asInstanceOf[Boolean]) { - new Mode(expressions(0), true) + new Mode(expressions(0), true, collationEnabled = SQLConf.get.collationEnabled) } else { - Mode(expressions(0)) + Mode(expressions(0), collationEnabled = SQLConf.get.collationEnabled) } } else { throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(0), numArgs) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index 398f21e01b806..63fc3b5bdf609 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -345,7 +345,7 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { case aggregate.RegrSXY(PushableExpression(left), PushableExpression(right)) => Some(new GeneralAggregateFunc("REGR_SXY", isDistinct, Array(left, right))) // Translate Mode if it is deterministic or reverse is defined. - case aggregate.Mode(PushableExpression(expr), _, _, Some(reverse)) => + case aggregate.Mode(PushableExpression(expr), _, _, Some(reverse), _) => Some(new GeneralAggregateFunc( "MODE", isDistinct, Array.empty, Array(generateSortValue(expr, !reverse)))) case aggregate.Percentile( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index b9a4fecd0465b..17214bb15a590 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -18,10 +18,14 @@ package org.apache.spark.sql import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.aggregate.Mode import org.apache.spark.sql.catalyst.util.CollationFactory import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataType, IntegerType, StringType} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap // scalastyle:off nonascii class CollationStringExpressionsSuite @@ -800,6 +804,90 @@ class CollationStringExpressionsSuite assert(collationMismatch.getErrorClass === "COLLATION_MISMATCH.EXPLICIT") } + test("Support mode for string expression with collation") { + val query = "SELECT mode(collate('abc', 'utf8_binary'))" + checkAnswer(sql(query), Row("abc")) + assert(sql(query).schema.fields.head.dataType.sameType(StringType("utf8_binary"))) + } + + test("Support mode for string expression with collation ID on table") { + withTable("t") { + sql("CREATE TABLE t(i STRING) USING parquet") + sql("INSERT INTO t VALUES " + + "('a'), ('a'), ('a'), ('a'), ('a'), " + + "('b'), ('b'), ('b'), " + + "('B'), ('B'), ('B'), ('B')") + val query = "SELECT mode(collate(i, 'UTF8_BINARY_LCASE')) FROM t" + checkAnswer(sql(query), Row("b")) + } + } + + test("Support mode for string expression with collation ID") { + val query = "SELECT mode(collate('lorem epsum', 'UTF8_BINARY_LCASE'))" + checkAnswer(sql(query), Row("lorem epsum")) + assert(sql(query).schema.fields.head.dataType.sameType(StringType("UTF8_BINARY_LCASE"))) + } + + test("Support Mode.eval(buffer)") { + val myMode = Mode(child = Literal("some_column_name"), collationEnabled = true) + val buffer = new OpenHashMap[AnyRef, Long](5) + buffer.update("b", 1L) + buffer.update("B", 1L) + buffer.update("c", 1L) + buffer.update("d", 1L) + buffer.update("a", 2L) + assert(myMode.eval(buffer).toString == "a") + val buffer2 = new OpenHashMap[AnyRef, Long](5) + buffer2.update(UTF8String.fromString("b"), 1L) + buffer2.update(UTF8String.fromString("B"), 1L) + buffer2.update(UTF8String.fromString("c"), 1L) + buffer2.update(UTF8String.fromString("d"), 1L) + buffer2.update(UTF8String.fromString("a"), 2L) + assert(myMode.eval(buffer2).toString == "a") + } + + test("Support Mode.eval(buffer) with non-default collation") { + val myMode = Mode( + child = Literal.create("some_column_name", StringType("utf8_binary_lcase")), + collationEnabled = true) + val buffer = new OpenHashMap[AnyRef, Long](11) + buffer.update("b", 2L) + buffer.update("B", 2L) + buffer.update("c", 2L) + buffer.update("d", 2L) + buffer.update("a", 3L) + assert(myMode.eval(buffer).toString == "B") + } + + + test("Support Mode.eval(buffer) with some null values") { + val modeDefaultCollation = Mode(child = Literal("some_column_name")) + val modeLowercaseCollation = Mode(child = + Literal.create("some_column_name", StringType("unicode_ci")), + collationEnabled = true) + val buffer = new OpenHashMap[AnyRef, Long](7) + buffer.update("bb", 1L) + buffer.update("Bb", 1L) + buffer.update("BB", 1L) + buffer.update("c", 1L) + buffer.update(null, 1L) + buffer.update("a", 2L) + assert(modeDefaultCollation.eval(buffer).toString == "a") + assert(modeLowercaseCollation.eval(buffer).toString.toLowerCase() == "bb") + } + + test("Support Mode.eval(buffer) with mode should = null (not yet implemented)") { + val modeDefaultCollation = Mode(child = Literal("some_column_name")) + val modeLowercaseCollation = Mode( + child = Literal.create("some_column_name", StringType("unicode_ci")), + collationEnabled = true) + val buffer = new OpenHashMap[AnyRef, Long](7) + buffer.update(null, 2L) + buffer.update("c", 1L) + assert(modeDefaultCollation.eval(buffer) == null) + assert(modeLowercaseCollation.eval(buffer) == null) + } + // TODO: Add more tests for other string expressions } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index 03b638b786bfd..057b548a5141e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -19,8 +19,12 @@ package org.apache.spark.sql.execution.benchmark import scala.concurrent.duration._ import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.aggregate.Mode import org.apache.spark.sql.catalyst.util.{CollationFactory, CollationSupport} +import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.OpenHashMap abstract class CollationBenchmarkBase extends BenchmarkBase { protected val collationTypes: Seq[String] = @@ -185,6 +189,55 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { } benchmark.run() } + + def benchmarkMode( + collationTypes: Seq[String], + value: Seq[UTF8String]): Unit = { + val benchmark = new Benchmark( + "collation unit benchmarks - mode", + value.size * 10, + warmupTime = 10.seconds, + output = output) + collationTypes.foreach { collationType => { + val collation = CollationFactory.fetchCollation(collationType) + benchmark.addCase(s"$collationType") { _ => + val modeDefaultCollation = Mode(child = + Literal.create("some_column_name", StringType(collationType)), collationEnabled = true) + val buffer = new OpenHashMap[AnyRef, Long](value.size) + value.zipWithIndex.sliding(20, 20).foreach(slide => { + slide.foreach { case (v: UTF8String, i: Int) => + buffer.update(v, (i % 1000).toLong) + } + modeDefaultCollation.eval(buffer) + + }) + } + } + } + benchmark.addCase(s"Collation Not Enabled") { _ => + val modeNoCollation = Mode(child = + Literal("some_column_name"), collationEnabled = false) + val buffer = new OpenHashMap[AnyRef, Long](value.size) + value.zipWithIndex.sliding(20, 20).foreach(slide => { + slide.foreach { case (v: UTF8String, i: Int) => + buffer.update(v, (i % 1000).toLong) + } + modeNoCollation.eval(buffer) + }) + } + benchmark.addCase(s"Numerical Type") { _ => + val modeIntType = Mode(child = Literal.create(1, IntegerType)) + val buffer = new OpenHashMap[AnyRef, Long](value.size) + value.zipWithIndex.sliding(20, 20).foreach(slide => { + slide.foreach { + case (_, i: Int) => + buffer.update(i.asInstanceOf[AnyRef], (i % 1000).toLong) + } + modeIntType.eval(buffer) + }) + } + benchmark.run() + } } /** @@ -213,6 +266,7 @@ object CollationBenchmark extends CollationBenchmarkBase { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val inputs = generateSeqInput(10000L) + benchmarkMode(collationTypes, inputs) benchmarkUTFStringEquals(collationTypes, inputs) benchmarkUTFStringCompare(collationTypes, inputs) benchmarkUTFStringHashFunction(collationTypes, inputs) @@ -248,5 +302,6 @@ object CollationNonASCIIBenchmark extends CollationBenchmarkBase { benchmarkContains(collationTypes, inputs) benchmarkStartsWith(collationTypes, inputs) benchmarkEndsWith(collationTypes, inputs) + benchmarkMode(collationTypes, inputs) } } From 0f0eedfe7a0be053641183d602d18ff2f0b9861f Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 10:00:39 -0400 Subject: [PATCH 02/42] fixed up tests according to expectations surrounding nulls. --- .../catalyst/expressions/aggregate/Mode.scala | 8 ++--- .../sql/CollationStringExpressionsSuite.scala | 29 ------------------- 2 files changed, 2 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index d1b98adaacdfb..292e26f9ffb84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -82,7 +82,6 @@ case class Mode( if (buff.isEmpty) { return null } - var nullCount = 0L val buffer = if (child.dataType.isInstanceOf[StringType] && collationEnabled) { val modeMap = buff.foldLeft( new TreeMap[org.apache.spark.unsafe.types.UTF8String, Long]()(Ordering.comparatorToOrdering( @@ -96,11 +95,8 @@ case class Mode( case (map, (key: UTF8String, count)) => map(key) = map.getOrElse(key, 0L) + count map - case (map, (null, count)) => - nullCount = count - map case (_, _) => - throw new IllegalArgumentException("Mode expects string type") + throw new IllegalArgumentException("Mode expects non-null string type.") } modeMap } else { @@ -117,7 +113,7 @@ case class Mode( buffer.maxBy { case (key, count) => (count, key) }(ordering) }.getOrElse(buffer.maxBy(_._2)) - if (nullCount > t2._2) { + if (t2._2 == 0L) { null } else { t2._1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 17214bb15a590..d7e295a12d33f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -859,35 +859,6 @@ class CollationStringExpressionsSuite assert(myMode.eval(buffer).toString == "B") } - - test("Support Mode.eval(buffer) with some null values") { - val modeDefaultCollation = Mode(child = Literal("some_column_name")) - val modeLowercaseCollation = Mode(child = - Literal.create("some_column_name", StringType("unicode_ci")), - collationEnabled = true) - val buffer = new OpenHashMap[AnyRef, Long](7) - buffer.update("bb", 1L) - buffer.update("Bb", 1L) - buffer.update("BB", 1L) - buffer.update("c", 1L) - buffer.update(null, 1L) - buffer.update("a", 2L) - assert(modeDefaultCollation.eval(buffer).toString == "a") - assert(modeLowercaseCollation.eval(buffer).toString.toLowerCase() == "bb") - } - - test("Support Mode.eval(buffer) with mode should = null (not yet implemented)") { - val modeDefaultCollation = Mode(child = Literal("some_column_name")) - val modeLowercaseCollation = Mode( - child = Literal.create("some_column_name", StringType("unicode_ci")), - collationEnabled = true) - val buffer = new OpenHashMap[AnyRef, Long](7) - buffer.update(null, 2L) - buffer.update("c", 1L) - assert(modeDefaultCollation.eval(buffer) == null) - assert(modeLowercaseCollation.eval(buffer) == null) - } - // TODO: Add more tests for other string expressions } From e1a533f2710b43f5b7ddd02e9bfdad187f81e9d8 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 14:21:34 -0400 Subject: [PATCH 03/42] mode benchmark --- .../spark/sql/execution/benchmark/CollationBenchmark.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index 057b548a5141e..c0657acd8101a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -199,14 +199,13 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { warmupTime = 10.seconds, output = output) collationTypes.foreach { collationType => { - val collation = CollationFactory.fetchCollation(collationType) benchmark.addCase(s"$collationType") { _ => val modeDefaultCollation = Mode(child = Literal.create("some_column_name", StringType(collationType)), collationEnabled = true) val buffer = new OpenHashMap[AnyRef, Long](value.size) value.zipWithIndex.sliding(20, 20).foreach(slide => { slide.foreach { case (v: UTF8String, i: Int) => - buffer.update(v, (i % 1000).toLong) + buffer.update(v.toString + s"_${i.toString}", (i % 1000).toLong) } modeDefaultCollation.eval(buffer) From a98eebece0938bf1b33dce186b9a3e9798ddc532 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 14:27:38 -0400 Subject: [PATCH 04/42] mode benchmark --- .../spark/sql/execution/benchmark/CollationBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index c0657acd8101a..07417bf464486 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -219,7 +219,7 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { val buffer = new OpenHashMap[AnyRef, Long](value.size) value.zipWithIndex.sliding(20, 20).foreach(slide => { slide.foreach { case (v: UTF8String, i: Int) => - buffer.update(v, (i % 1000).toLong) + buffer.update(v.toString + s"_${i.toString}", (i % 1000).toLong) } modeNoCollation.eval(buffer) }) From e4bf90773192de616526e9300b12df620bc9eaf7 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 19:25:20 -0400 Subject: [PATCH 05/42] remove class member for collatrion enabled --- .../catalyst/expressions/aggregate/Mode.scala | 26 ++++++++++++------- .../catalyst/util/V2ExpressionBuilder.scala | 2 +- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index 292e26f9ffb84..b175952df3652 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import scala.collection.mutable.TreeMap - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, UnresolvedWithinGroup} import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder} @@ -36,8 +35,7 @@ case class Mode( child: Expression, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0, - reverseOpt: Option[Boolean] = None, - collationEnabled: Boolean = false) + reverseOpt: Option[Boolean] = None) extends TypedAggregateWithHashMapAsBuffer with ImplicitCastInputTypes with SupportsOrderingWithinGroup with UnaryLike[Expression] { @@ -45,8 +43,8 @@ case class Mode( def this(child: Expression) = this(child, 0, 0) - def this(child: Expression, reverse: Boolean, collationEnabled: Boolean) = { - this(child, 0, 0, Some(reverse), collationEnabled = collationEnabled) + def this(child: Expression, reverse: Boolean) = { + this(child, 0, 0, Some(reverse)) } // Returns null for empty inputs @@ -82,7 +80,7 @@ case class Mode( if (buff.isEmpty) { return null } - val buffer = if (child.dataType.isInstanceOf[StringType] && collationEnabled) { + val buffer = if (isCollatedString(child)) { val modeMap = buff.foldLeft( new TreeMap[org.apache.spark.unsafe.types.UTF8String, Long]()(Ordering.comparatorToOrdering( CollationFactory.fetchCollation(collationId).comparator @@ -120,6 +118,14 @@ case class Mode( } } + private def isCollatedString(child: Expression): Boolean = { + child match { + case s: StringType if s.collationId != CollationFactory.UTF8_BINARY_COLLATION_ID => true + // maybe use supportsBinaryEquality or something else + case _ => false + } + } + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Mode = copy(mutableAggBufferOffset = newMutableAggBufferOffset) @@ -196,10 +202,10 @@ object ModeBuilder extends ExpressionBuilder { override def build(funcName: String, expressions: Seq[Expression]): Expression = { val numArgs = expressions.length if (numArgs == 0) { - Mode(UnresolvedWithinGroup, collationEnabled = SQLConf.get.collationEnabled) + Mode(UnresolvedWithinGroup) } else if (numArgs == 1) { // For compatibility with function calls without WITHIN GROUP. - Mode(expressions(0), collationEnabled = SQLConf.get.collationEnabled) + Mode(expressions(0)) } else if (numArgs == 2) { // For compatibility with function calls without WITHIN GROUP. if (!expressions(1).foldable) { @@ -215,9 +221,9 @@ object ModeBuilder extends ExpressionBuilder { funcName, 2, BooleanType, expressions(1)) } if (deterministicResult.asInstanceOf[Boolean]) { - new Mode(expressions(0), true, collationEnabled = SQLConf.get.collationEnabled) + new Mode(expressions(0), true) } else { - Mode(expressions(0), collationEnabled = SQLConf.get.collationEnabled) + Mode(expressions(0)) } } else { throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(0), numArgs) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index 63fc3b5bdf609..398f21e01b806 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -345,7 +345,7 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { case aggregate.RegrSXY(PushableExpression(left), PushableExpression(right)) => Some(new GeneralAggregateFunc("REGR_SXY", isDistinct, Array(left, right))) // Translate Mode if it is deterministic or reverse is defined. - case aggregate.Mode(PushableExpression(expr), _, _, Some(reverse), _) => + case aggregate.Mode(PushableExpression(expr), _, _, Some(reverse)) => Some(new GeneralAggregateFunc( "MODE", isDistinct, Array.empty, Array(generateSortValue(expr, !reverse)))) case aggregate.Percentile( From c89af544e0cac754f2acac082eb9e1c018e3028a Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 19:26:53 -0400 Subject: [PATCH 06/42] remove class member for collatrion enabled --- .../spark/sql/catalyst/expressions/aggregate/Mode.scala | 2 +- .../spark/sql/execution/benchmark/CollationBenchmark.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index b175952df3652..96e4aec136ccf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import scala.collection.mutable.TreeMap + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, UnresolvedWithinGroup} import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder} @@ -25,7 +26,6 @@ import org.apache.spark.sql.catalyst.trees.UnaryLike import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.catalyst.util.{CollationFactory, GenericArrayData} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.collection.OpenHashMap diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index 07417bf464486..e9c790dd84b2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -201,7 +201,7 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { collationTypes.foreach { collationType => { benchmark.addCase(s"$collationType") { _ => val modeDefaultCollation = Mode(child = - Literal.create("some_column_name", StringType(collationType)), collationEnabled = true) + Literal.create("some_column_name", StringType(collationType))) val buffer = new OpenHashMap[AnyRef, Long](value.size) value.zipWithIndex.sliding(20, 20).foreach(slide => { slide.foreach { case (v: UTF8String, i: Int) => @@ -215,7 +215,7 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { } benchmark.addCase(s"Collation Not Enabled") { _ => val modeNoCollation = Mode(child = - Literal("some_column_name"), collationEnabled = false) + Literal("some_column_name")) val buffer = new OpenHashMap[AnyRef, Long](value.size) value.zipWithIndex.sliding(20, 20).foreach(slide => { slide.foreach { case (v: UTF8String, i: Int) => From 794b20a63bf205508e39e0c3d7c6143690846886 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 19:27:38 -0400 Subject: [PATCH 07/42] remove class member for collatrion enabled --- .../apache/spark/sql/catalyst/expressions/aggregate/Mode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index 96e4aec136ccf..9d83c64c581cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -119,7 +119,7 @@ case class Mode( } private def isCollatedString(child: Expression): Boolean = { - child match { + child.dataType match { case s: StringType if s.collationId != CollationFactory.UTF8_BINARY_COLLATION_ID => true // maybe use supportsBinaryEquality or something else case _ => false From 3e891dfc46c619bbd5e6d5369bd40fd5411e583b Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 19:30:47 -0400 Subject: [PATCH 08/42] dataType check can be incorporated into the previous test, so this test can then be removed --- .../spark/sql/CollationStringExpressionsSuite.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index d7e295a12d33f..abad69e0d0bbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -819,17 +819,13 @@ class CollationStringExpressionsSuite "('B'), ('B'), ('B'), ('B')") val query = "SELECT mode(collate(i, 'UTF8_BINARY_LCASE')) FROM t" checkAnswer(sql(query), Row("b")) - } - } + assert(sql(query).schema.fields.head.dataType.sameType(StringType("UTF8_BINARY_LCASE"))) - test("Support mode for string expression with collation ID") { - val query = "SELECT mode(collate('lorem epsum', 'UTF8_BINARY_LCASE'))" - checkAnswer(sql(query), Row("lorem epsum")) - assert(sql(query).schema.fields.head.dataType.sameType(StringType("UTF8_BINARY_LCASE"))) + } } test("Support Mode.eval(buffer)") { - val myMode = Mode(child = Literal("some_column_name"), collationEnabled = true) + val myMode = Mode(child = Literal("some_column_name")) val buffer = new OpenHashMap[AnyRef, Long](5) buffer.update("b", 1L) buffer.update("B", 1L) @@ -848,8 +844,7 @@ class CollationStringExpressionsSuite test("Support Mode.eval(buffer) with non-default collation") { val myMode = Mode( - child = Literal.create("some_column_name", StringType("utf8_binary_lcase")), - collationEnabled = true) + child = Literal.create("some_column_name", StringType("utf8_binary_lcase"))) val buffer = new OpenHashMap[AnyRef, Long](11) buffer.update("b", 2L) buffer.update("B", 2L) From 0849a21baf1c0974a3994b545c14b19427743d2c Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 19:54:58 -0400 Subject: [PATCH 09/42] dataType check can be incorporated into the previous test, so this test can then be removed --- .../catalyst/expressions/aggregate/Mode.scala | 8 +- .../sql/CollationStringExpressionsSuite.scala | 100 ++++++++++++------ 2 files changed, 69 insertions(+), 39 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index 9d83c64c581cf..a67b357b2eb8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -101,7 +101,7 @@ case class Mode( buff } - val t2 = reverseOpt.map { reverse => + reverseOpt.map { reverse => val defaultKeyOrdering = if (reverse) { PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]].reverse } else { @@ -110,12 +110,6 @@ case class Mode( val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering) buffer.maxBy { case (key, count) => (count, key) }(ordering) }.getOrElse(buffer.maxBy(_._2)) - - if (t2._2 == 0L) { - null - } else { - t2._1 - } } private def isCollatedString(child: Expression): Boolean = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index abad69e0d0bbd..16d0e93303f97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -804,10 +804,39 @@ class CollationStringExpressionsSuite assert(collationMismatch.getErrorClass === "COLLATION_MISMATCH.EXPLICIT") } - test("Support mode for string expression with collation") { - val query = "SELECT mode(collate('abc', 'utf8_binary'))" - checkAnswer(sql(query), Row("abc")) - assert(sql(query).schema.fields.head.dataType.sameType(StringType("utf8_binary"))) + test("Support mode for string expression with collation - Basic Test") { + Seq("utf8_binary", "utf8_binary_lcase", "unicode_ci", "unicode").foreach { collationId => + val query = s"SELECT mode(collate('abc', '${collationId}'))" + checkAnswer(sql(query), Row("abc")) + assert(sql(query).schema.fields.head.dataType.sameType(StringType(collationId))) + } + } + + test("Support mode for string expression with collation2 ") { + case class ModeTestCase[R](collationId: String, bufferValues: Map[String, Long], result: R) + val testCases = Seq( + ModeTestCase("utf8_binary", Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L), + "a"), + ModeTestCase("utf8_binary_lcase", Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L), + "b"), + ModeTestCase("unicode_ci", Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L), + "b"), + ModeTestCase("unicode", Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L), + "a") + ) + testCases.foreach(t => { + withTable("t") { + sql("CREATE TABLE t(i STRING) USING parquet") + sql("INSERT INTO t VALUES " + + t.bufferValues.map { case (elt, numRepeats) => + (0L to numRepeats).map(_ => s"('$elt')").mkString(",") + }.mkString(",")) + val query = s"SELECT mode(collate(i, '${t.collationId}')) FROM t" + checkAnswer(sql(query), Row(t.result)) + assert(sql(query).schema.fields.head.dataType.sameType(StringType(t.collationId))) + + } + }) } test("Support mode for string expression with collation ID on table") { @@ -820,38 +849,45 @@ class CollationStringExpressionsSuite val query = "SELECT mode(collate(i, 'UTF8_BINARY_LCASE')) FROM t" checkAnswer(sql(query), Row("b")) assert(sql(query).schema.fields.head.dataType.sameType(StringType("UTF8_BINARY_LCASE"))) - } } test("Support Mode.eval(buffer)") { - val myMode = Mode(child = Literal("some_column_name")) - val buffer = new OpenHashMap[AnyRef, Long](5) - buffer.update("b", 1L) - buffer.update("B", 1L) - buffer.update("c", 1L) - buffer.update("d", 1L) - buffer.update("a", 2L) - assert(myMode.eval(buffer).toString == "a") - val buffer2 = new OpenHashMap[AnyRef, Long](5) - buffer2.update(UTF8String.fromString("b"), 1L) - buffer2.update(UTF8String.fromString("B"), 1L) - buffer2.update(UTF8String.fromString("c"), 1L) - buffer2.update(UTF8String.fromString("d"), 1L) - buffer2.update(UTF8String.fromString("a"), 2L) - assert(myMode.eval(buffer2).toString == "a") - } - - test("Support Mode.eval(buffer) with non-default collation") { - val myMode = Mode( - child = Literal.create("some_column_name", StringType("utf8_binary_lcase"))) - val buffer = new OpenHashMap[AnyRef, Long](11) - buffer.update("b", 2L) - buffer.update("B", 2L) - buffer.update("c", 2L) - buffer.update("d", 2L) - buffer.update("a", 3L) - assert(myMode.eval(buffer).toString == "B") + case class ModeTestCase[R]( + collationId: String, + bufferValues: Map[String, Long], + result: R) + case class UTF8StringModeTestCase[R]( + collationId: String, + bufferValues: Map[UTF8String, Long], + result: R) + + val bufferValues = Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L) + val testCasesStrings = Seq(ModeTestCase("utf8_binary", bufferValues, "a"), + ModeTestCase("utf8_binary_lcase", bufferValues, "b"), + ModeTestCase("unicode_ci", bufferValues, "b"), + ModeTestCase("unicode", bufferValues, "a")) + + val bufferValuesUTF8String = Map(UTF8String.fromString("a") -> 5L, UTF8String.fromString("b") -> 4L, + UTF8String.fromString("B") -> 3L, UTF8String.fromString("d") -> 2L, UTF8String.fromString("e") -> 1L) + val testCasesUTF8String = Seq(UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"), + UTF8StringModeTestCase("utf8_binary_lcase", bufferValuesUTF8String, "b"), + UTF8StringModeTestCase("unicode_ci", bufferValuesUTF8String, "b"), + UTF8StringModeTestCase("unicode", bufferValuesUTF8String, "a")) + + testCasesStrings.foreach(t => { + val buffer = new OpenHashMap[AnyRef, Long](5) + val myMode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) + t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } + assert(myMode.eval(buffer).toString == t.result) + }) + + testCasesUTF8String.foreach(t => { + val buffer = new OpenHashMap[AnyRef, Long](5) + val myMode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) + t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } + assert(myMode.eval(buffer).toString == t.result) + }) } // TODO: Add more tests for other string expressions From e79e14e93c9b9ebc12deb5b5081087ac0dde1d4a Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 19:55:32 -0400 Subject: [PATCH 10/42] scalastyle --- .../spark/sql/CollationStringExpressionsSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 16d0e93303f97..50591ad099160 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -868,8 +868,12 @@ class CollationStringExpressionsSuite ModeTestCase("unicode_ci", bufferValues, "b"), ModeTestCase("unicode", bufferValues, "a")) - val bufferValuesUTF8String = Map(UTF8String.fromString("a") -> 5L, UTF8String.fromString("b") -> 4L, - UTF8String.fromString("B") -> 3L, UTF8String.fromString("d") -> 2L, UTF8String.fromString("e") -> 1L) + val bufferValuesUTF8String = Map( + UTF8String.fromString("a") -> 5L, + UTF8String.fromString("b") -> 4L, + UTF8String.fromString("B") -> 3L, + UTF8String.fromString("d") -> 2L, + UTF8String.fromString("e") -> 1L) val testCasesUTF8String = Seq(UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"), UTF8StringModeTestCase("utf8_binary_lcase", bufferValuesUTF8String, "b"), UTF8StringModeTestCase("unicode_ci", bufferValuesUTF8String, "b"), From 9a32243bf13e94cd1c0a62e4bb16705236f88f01 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 7 May 2024 19:56:04 -0400 Subject: [PATCH 11/42] scalastyle --- .../apache/spark/sql/CollationStringExpressionsSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 50591ad099160..2110df3042183 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -874,7 +874,9 @@ class CollationStringExpressionsSuite UTF8String.fromString("B") -> 3L, UTF8String.fromString("d") -> 2L, UTF8String.fromString("e") -> 1L) - val testCasesUTF8String = Seq(UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"), + + val testCasesUTF8String = Seq( + UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"), UTF8StringModeTestCase("utf8_binary_lcase", bufferValuesUTF8String, "b"), UTF8StringModeTestCase("unicode_ci", bufferValuesUTF8String, "b"), UTF8StringModeTestCase("unicode", bufferValuesUTF8String, "a")) From 506f7fc900dba4dc7a6bff0fd2ed6f27b9a4d1df Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Wed, 8 May 2024 10:52:25 -0400 Subject: [PATCH 12/42] fix add back ._1 --- .../catalyst/expressions/aggregate/Mode.scala | 3 +- .../sql/CollationStringExpressionsSuite.scala | 59 +++++++++---------- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index a67b357b2eb8c..da197a66cb229 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -109,10 +109,11 @@ case class Mode( } val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering) buffer.maxBy { case (key, count) => (count, key) }(ordering) - }.getOrElse(buffer.maxBy(_._2)) + }.getOrElse(buffer.maxBy(_._2))._1 } private def isCollatedString(child: Expression): Boolean = { + println("child.dataType: " + child.dataType) child.dataType match { case s: StringType if s.collationId != CollationFactory.UTF8_BINARY_COLLATION_ID => true // maybe use supportsBinaryEquality or something else diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 2110df3042183..14f362cef8f5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -815,22 +815,19 @@ class CollationStringExpressionsSuite test("Support mode for string expression with collation2 ") { case class ModeTestCase[R](collationId: String, bufferValues: Map[String, Long], result: R) val testCases = Seq( - ModeTestCase("utf8_binary", Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L), - "a"), - ModeTestCase("utf8_binary_lcase", Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L), - "b"), - ModeTestCase("unicode_ci", Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L), - "b"), - ModeTestCase("unicode", Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L), - "a") + ModeTestCase("utf8_binary", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"), + ModeTestCase("utf8_binary_lcase", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b"), + ModeTestCase("unicode_ci", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b"), + ModeTestCase("unicode", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a") ) testCases.foreach(t => { + val valuesToAdd = t.bufferValues.map { case (elt, numRepeats) => + (0L to numRepeats).map(_ => s"('$elt')").mkString(",") + }.mkString(",") + withTable("t") { sql("CREATE TABLE t(i STRING) USING parquet") - sql("INSERT INTO t VALUES " + - t.bufferValues.map { case (elt, numRepeats) => - (0L to numRepeats).map(_ => s"('$elt')").mkString(",") - }.mkString(",")) + sql("INSERT INTO t VALUES " + valuesToAdd) val query = s"SELECT mode(collate(i, '${t.collationId}')) FROM t" checkAnswer(sql(query), Row(t.result)) assert(sql(query).schema.fields.head.dataType.sameType(StringType(t.collationId))) @@ -868,18 +865,18 @@ class CollationStringExpressionsSuite ModeTestCase("unicode_ci", bufferValues, "b"), ModeTestCase("unicode", bufferValues, "a")) - val bufferValuesUTF8String = Map( - UTF8String.fromString("a") -> 5L, - UTF8String.fromString("b") -> 4L, - UTF8String.fromString("B") -> 3L, - UTF8String.fromString("d") -> 2L, - UTF8String.fromString("e") -> 1L) - - val testCasesUTF8String = Seq( - UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"), - UTF8StringModeTestCase("utf8_binary_lcase", bufferValuesUTF8String, "b"), - UTF8StringModeTestCase("unicode_ci", bufferValuesUTF8String, "b"), - UTF8StringModeTestCase("unicode", bufferValuesUTF8String, "a")) +// val bufferValuesUTF8String = Map( +// UTF8String.fromString("a") -> 5L, +// UTF8String.fromString("b") -> 4L, +// UTF8String.fromString("B") -> 3L, +// UTF8String.fromString("d") -> 2L, +// UTF8String.fromString("e") -> 1L) +// +// val testCasesUTF8String = Seq( +// UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"), +// UTF8StringModeTestCase("utf8_binary_lcase", bufferValuesUTF8String, "b"), +// UTF8StringModeTestCase("unicode_ci", bufferValuesUTF8String, "b"), +// UTF8StringModeTestCase("unicode", bufferValuesUTF8String, "a")) testCasesStrings.foreach(t => { val buffer = new OpenHashMap[AnyRef, Long](5) @@ -887,13 +884,13 @@ class CollationStringExpressionsSuite t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } assert(myMode.eval(buffer).toString == t.result) }) - - testCasesUTF8String.foreach(t => { - val buffer = new OpenHashMap[AnyRef, Long](5) - val myMode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) - t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } - assert(myMode.eval(buffer).toString == t.result) - }) +// +// testCasesUTF8String.foreach(t => { +// val buffer = new OpenHashMap[AnyRef, Long](5) +// val myMode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) +// t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } +// assert(myMode.eval(buffer).toString == t.result) +// }) } // TODO: Add more tests for other string expressions From 16ed98f115063b58d79553bfbfdf2dc093ea7b07 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Wed, 8 May 2024 11:59:55 -0400 Subject: [PATCH 13/42] fix up --- .../apache/spark/sql/catalyst/expressions/aggregate/Mode.scala | 1 - .../org/apache/spark/sql/CollationStringExpressionsSuite.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index da197a66cb229..29fca31b98233 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -113,7 +113,6 @@ case class Mode( } private def isCollatedString(child: Expression): Boolean = { - println("child.dataType: " + child.dataType) child.dataType match { case s: StringType if s.collationId != CollationFactory.UTF8_BINARY_COLLATION_ID => true // maybe use supportsBinaryEquality or something else diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 14f362cef8f5a..3d3b91a1959e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -882,7 +882,7 @@ class CollationStringExpressionsSuite val buffer = new OpenHashMap[AnyRef, Long](5) val myMode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } - assert(myMode.eval(buffer).toString == t.result) + assert(myMode.eval(buffer).toString.toLowerCase() == t.result.toLowerCase()) }) // // testCasesUTF8String.foreach(t => { From 6891c9bd9f4c2944d7fc31ca31094f168a53e60a Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Wed, 8 May 2024 12:04:46 -0400 Subject: [PATCH 14/42] tests pass --- .../sql/CollationStringExpressionsSuite.scala | 38 +++++++++---------- .../benchmark/CollationBenchmark.scala | 17 +++++---- 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 3d3b91a1959e0..66aa7a98c664b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -865,18 +865,18 @@ class CollationStringExpressionsSuite ModeTestCase("unicode_ci", bufferValues, "b"), ModeTestCase("unicode", bufferValues, "a")) -// val bufferValuesUTF8String = Map( -// UTF8String.fromString("a") -> 5L, -// UTF8String.fromString("b") -> 4L, -// UTF8String.fromString("B") -> 3L, -// UTF8String.fromString("d") -> 2L, -// UTF8String.fromString("e") -> 1L) -// -// val testCasesUTF8String = Seq( -// UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"), -// UTF8StringModeTestCase("utf8_binary_lcase", bufferValuesUTF8String, "b"), -// UTF8StringModeTestCase("unicode_ci", bufferValuesUTF8String, "b"), -// UTF8StringModeTestCase("unicode", bufferValuesUTF8String, "a")) + val bufferValuesUTF8String = Map( + UTF8String.fromString("a") -> 5L, + UTF8String.fromString("b") -> 4L, + UTF8String.fromString("B") -> 3L, + UTF8String.fromString("d") -> 2L, + UTF8String.fromString("e") -> 1L) + + val testCasesUTF8String = Seq( + UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"), + UTF8StringModeTestCase("utf8_binary_lcase", bufferValuesUTF8String, "b"), + UTF8StringModeTestCase("unicode_ci", bufferValuesUTF8String, "b"), + UTF8StringModeTestCase("unicode", bufferValuesUTF8String, "a")) testCasesStrings.foreach(t => { val buffer = new OpenHashMap[AnyRef, Long](5) @@ -884,13 +884,13 @@ class CollationStringExpressionsSuite t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } assert(myMode.eval(buffer).toString.toLowerCase() == t.result.toLowerCase()) }) -// -// testCasesUTF8String.foreach(t => { -// val buffer = new OpenHashMap[AnyRef, Long](5) -// val myMode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) -// t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } -// assert(myMode.eval(buffer).toString == t.result) -// }) + + testCasesUTF8String.foreach(t => { + val buffer = new OpenHashMap[AnyRef, Long](5) + val myMode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) + t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } + assert(myMode.eval(buffer).toString.toLowerCase() == t.result.toLowerCase()) + }) } // TODO: Add more tests for other string expressions diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index e9c790dd84b2b..2cfd9d8e3a646 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -194,7 +194,7 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { collationTypes: Seq[String], value: Seq[UTF8String]): Unit = { val benchmark = new Benchmark( - "collation unit benchmarks - mode", + s"collation unit benchmarks - mode - ${value.size} elements", value.size * 10, warmupTime = 10.seconds, output = output) @@ -203,31 +203,32 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { val modeDefaultCollation = Mode(child = Literal.create("some_column_name", StringType(collationType))) val buffer = new OpenHashMap[AnyRef, Long](value.size) - value.zipWithIndex.sliding(20, 20).foreach(slide => { + value.zipWithIndex.sliding(2000, 2000).foreach(slide => { slide.foreach { case (v: UTF8String, i: Int) => buffer.update(v.toString + s"_${i.toString}", (i % 1000).toLong) } modeDefaultCollation.eval(buffer) - }) } } } - benchmark.addCase(s"Collation Not Enabled") { _ => + + benchmark.addCase(s"Collation Not Enabled - mode - ${value.size} elements") { _ => val modeNoCollation = Mode(child = Literal("some_column_name")) val buffer = new OpenHashMap[AnyRef, Long](value.size) - value.zipWithIndex.sliding(20, 20).foreach(slide => { + value.zipWithIndex.sliding(2000, 2000).foreach(slide => { slide.foreach { case (v: UTF8String, i: Int) => buffer.update(v.toString + s"_${i.toString}", (i % 1000).toLong) } modeNoCollation.eval(buffer) }) } - benchmark.addCase(s"Numerical Type") { _ => + + benchmark.addCase(s"Numerical Type - mode - ${value.size} elements") { _ => val modeIntType = Mode(child = Literal.create(1, IntegerType)) val buffer = new OpenHashMap[AnyRef, Long](value.size) - value.zipWithIndex.sliding(20, 20).foreach(slide => { + value.zipWithIndex.sliding(2000, 2000).foreach(slide => { slide.foreach { case (_, i: Int) => buffer.update(i.asInstanceOf[AnyRef], (i % 1000).toLong) @@ -266,6 +267,8 @@ object CollationBenchmark extends CollationBenchmarkBase { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val inputs = generateSeqInput(10000L) benchmarkMode(collationTypes, inputs) + benchmarkMode(collationTypes, generateSeqInput(5000L)) + benchmarkMode(collationTypes, generateSeqInput(2500L)) benchmarkUTFStringEquals(collationTypes, inputs) benchmarkUTFStringCompare(collationTypes, inputs) benchmarkUTFStringHashFunction(collationTypes, inputs) From 3a0edac9c51cd64703d0c54d8a6639e2e9da0cfa Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Wed, 8 May 2024 12:10:40 -0400 Subject: [PATCH 15/42] tests pass --- .../execution/benchmark/CollationBenchmark.scala | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index 2cfd9d8e3a646..1d761c7da5a9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -199,7 +199,7 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { warmupTime = 10.seconds, output = output) collationTypes.foreach { collationType => { - benchmark.addCase(s"$collationType") { _ => + benchmark.addCase(s"$collationType - mode - ${value.size} elements") { _ => val modeDefaultCollation = Mode(child = Literal.create("some_column_name", StringType(collationType))) val buffer = new OpenHashMap[AnyRef, Long](value.size) @@ -213,18 +213,6 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { } } - benchmark.addCase(s"Collation Not Enabled - mode - ${value.size} elements") { _ => - val modeNoCollation = Mode(child = - Literal("some_column_name")) - val buffer = new OpenHashMap[AnyRef, Long](value.size) - value.zipWithIndex.sliding(2000, 2000).foreach(slide => { - slide.foreach { case (v: UTF8String, i: Int) => - buffer.update(v.toString + s"_${i.toString}", (i % 1000).toLong) - } - modeNoCollation.eval(buffer) - }) - } - benchmark.addCase(s"Numerical Type - mode - ${value.size} elements") { _ => val modeIntType = Mode(child = Literal.create(1, IntegerType)) val buffer = new OpenHashMap[AnyRef, Long](value.size) From abea8366c632e77942a9ee7a283af571a9b70864 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Wed, 8 May 2024 17:28:33 -0400 Subject: [PATCH 16/42] test --- .../spark/sql/CollationStringExpressionsSuite.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 66aa7a98c664b..b65c42cd412dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -836,19 +836,6 @@ class CollationStringExpressionsSuite }) } - test("Support mode for string expression with collation ID on table") { - withTable("t") { - sql("CREATE TABLE t(i STRING) USING parquet") - sql("INSERT INTO t VALUES " + - "('a'), ('a'), ('a'), ('a'), ('a'), " + - "('b'), ('b'), ('b'), " + - "('B'), ('B'), ('B'), ('B')") - val query = "SELECT mode(collate(i, 'UTF8_BINARY_LCASE')) FROM t" - checkAnswer(sql(query), Row("b")) - assert(sql(query).schema.fields.head.dataType.sameType(StringType("UTF8_BINARY_LCASE"))) - } - } - test("Support Mode.eval(buffer)") { case class ModeTestCase[R]( collationId: String, From 7fc7561b303356367ca11a7e2f0e00f8f977ae8a Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Wed, 8 May 2024 17:29:48 -0400 Subject: [PATCH 17/42] import _ --- .../apache/spark/sql/catalyst/expressions/aggregate/Mode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index 29fca31b98233..3c0a677e9f9c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.trees.UnaryLike import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.catalyst.util.{CollationFactory, GenericArrayData} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType, StringType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.collection.OpenHashMap From 05fe1a932035973ce017161b5da926014e068e3d Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Thu, 9 May 2024 10:57:01 -0400 Subject: [PATCH 18/42] added bm results --- .../CollationBenchmark-jdk21-results.txt | 30 +++++++ .../benchmarks/CollationBenchmark-results.txt | 78 +++++++++++++------ .../benchmark/CollationBenchmark.scala | 6 +- 3 files changed, 87 insertions(+), 27 deletions(-) diff --git a/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt b/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt index 500e39a965f37..956d25efdf30a 100644 --- a/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt +++ b/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt @@ -52,3 +52,33 @@ UNICODE 6235 6239 UTF8_BINARY 7066 7069 5 0.0 70658.1 1.6X UNICODE_CI 313515 313999 685 0.0 3135149.1 0.0X +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +AMD EPYC 7763 64-Core Processor +collation unit benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------- +UTF8_BINARY_LCASE - mode - 10000 elements 19 19 0 5.2 192.2 1.0X +UNICODE - mode - 10000 elements 52 53 1 1.9 522.1 0.4X +UTF8_BINARY - mode - 10000 elements 1 1 0 81.3 12.3 15.6X +UNICODE_CI - mode - 10000 elements 52 53 1 1.9 518.9 0.4X +Numerical Type - mode - 10000 elements 1 1 0 116.5 8.6 22.4X + +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +AMD EPYC 7763 64-Core Processor +collation unit benchmarks - mode - 5000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +UTF8_BINARY_LCASE - mode - 5000 elements 7 7 0 7.4 135.8 1.0X +UNICODE - mode - 5000 elements 18 18 1 2.8 354.5 0.4X +UTF8_BINARY - mode - 5000 elements 1 1 0 89.3 11.2 12.1X +UNICODE_CI - mode - 5000 elements 18 18 1 2.8 353.6 0.4X +Numerical Type - mode - 5000 elements 0 0 0 147.4 6.8 20.0X + +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +AMD EPYC 7763 64-Core Processor +collation unit benchmarks - mode - 2500 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +UTF8_BINARY_LCASE - mode - 2500 elements 3 3 0 9.8 102.1 1.0X +UNICODE - mode - 2500 elements 7 7 0 3.8 265.1 0.4X +UTF8_BINARY - mode - 2500 elements 0 0 0 97.3 10.3 9.9X +UNICODE_CI - mode - 2500 elements 7 7 1 3.8 264.5 0.4X +Numerical Type - mode - 2500 elements 0 0 0 164.3 6.1 16.8X + diff --git a/sql/core/benchmarks/CollationBenchmark-results.txt b/sql/core/benchmarks/CollationBenchmark-results.txt index 1e0515b182862..99b2f3eebcb11 100644 --- a/sql/core/benchmarks/CollationBenchmark-results.txt +++ b/sql/core/benchmarks/CollationBenchmark-results.txt @@ -2,53 +2,83 @@ OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE 3571 3576 7 0.0 35708.8 1.0X -UNICODE 2235 2240 7 0.0 22349.2 1.6X -UTF8_BINARY 2237 2242 6 0.0 22371.7 1.6X -UNICODE_CI 18733 18817 118 0.0 187333.8 0.2X +UTF8_BINARY_LCASE 2739 2741 4 0.0 27385.4 1.0X +UNICODE 1692 1697 7 0.1 16917.9 1.6X +UTF8_BINARY 1693 1697 5 0.1 16933.2 1.6X +UNICODE_CI 16444 16491 66 0.0 164444.0 0.2X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE 4260 4290 41 0.0 42602.6 1.0X -UNICODE 19536 19624 124 0.0 195360.2 0.2X -UTF8_BINARY 3582 3612 43 0.0 35818.5 1.2X -UNICODE_CI 20381 20454 103 0.0 203814.1 0.2X +UTF8_BINARY_LCASE 4441 4442 0 0.0 44413.4 1.0X +UNICODE 17975 17979 6 0.0 179749.5 0.2X +UTF8_BINARY 3393 3398 7 0.0 33927.9 1.3X +UNICODE_CI 17856 17856 1 0.0 178555.0 0.2X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 7347 7349 3 0.0 73467.1 1.0X -UNICODE 73462 73608 206 0.0 734623.2 0.1X -UTF8_BINARY 5775 5815 57 0.0 57746.0 1.3X -UNICODE_CI 57543 57619 108 0.0 575425.2 0.1X +UTF8_BINARY_LCASE 7026 7027 0 0.0 70264.3 1.0X +UNICODE 66759 66804 63 0.0 667592.9 0.1X +UTF8_BINARY 5381 5406 36 0.0 53808.5 1.3X +UNICODE_CI 57825 57893 97 0.0 578249.9 0.1X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 15415 15424 13 0.0 154147.1 1.0X -UNICODE 8091 8108 25 0.0 80907.9 1.9X -UTF8_BINARY 8964 8979 21 0.0 89643.5 1.7X -UNICODE_CI 469123 474822 8060 0.0 4691227.7 0.0X +UTF8_BINARY_LCASE 14644 14645 1 0.0 146442.9 1.0X +UNICODE 8091 8096 7 0.0 80911.3 1.8X +UTF8_BINARY 9109 9116 10 0.0 91088.6 1.6X +UNICODE_CI 313713 313875 229 0.0 3137134.2 0.0X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - startsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 13064 13080 23 0.0 130635.2 1.0X -UNICODE 6836 6851 22 0.0 68360.1 1.9X -UTF8_BINARY 7693 7719 36 0.0 76933.9 1.7X -UNICODE_CI 488919 495530 9349 0.0 4889190.5 0.0X +UTF8_BINARY_LCASE 13075 13106 44 0.0 130750.4 1.0X +UNICODE 6776 6777 1 0.0 67755.2 1.9X +UTF8_BINARY 7781 7818 52 0.0 77812.6 1.7X +UNICODE_CI 317735 318369 896 0.0 3177354.7 0.0X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - endsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 13097 13112 21 0.0 130970.4 1.0X -UNICODE 6960 6985 34 0.0 69603.9 1.9X -UTF8_BINARY 7766 7768 3 0.0 77663.5 1.7X -UNICODE_CI 456956 470733 19485 0.0 4569556.7 0.0X +UTF8_BINARY_LCASE 13103 13108 8 0.0 131030.4 1.0X +UNICODE 6715 6757 58 0.0 67154.0 2.0X +UTF8_BINARY 7709 7713 5 0.0 77090.4 1.7X +UNICODE_CI 319593 320810 1722 0.0 3195929.8 0.0X +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +AMD EPYC 7763 64-Core Processor + +collation unit benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------- +UTF8_BINARY_LCASE - mode - 10000 elements 25 25 0 4.1 245.1 1.0X +UNICODE - mode - 10000 elements 53 54 0 1.9 529.7 0.5X +UTF8_BINARY - mode - 10000 elements 1 1 0 76.3 13.1 18.7X +UNICODE_CI - mode - 10000 elements 53 54 1 1.9 527.4 0.5X +Numerical Type - mode - 10000 elements 1 1 0 117.7 8.5 28.8X + +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +AMD EPYC 7763 64-Core Processor +collation unit benchmarks - mode - 5000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +UTF8_BINARY_LCASE - mode - 5000 elements 9 9 0 5.5 181.6 1.0X +UNICODE - mode - 5000 elements 18 18 1 2.8 360.9 0.5X +UTF8_BINARY - mode - 5000 elements 1 1 0 85.0 11.8 15.4X +UNICODE_CI - mode - 5000 elements 18 19 1 2.8 359.9 0.5X +Numerical Type - mode - 5000 elements 0 0 0 147.6 6.8 26.8X + +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +AMD EPYC 7763 64-Core Processor +collation unit benchmarks - mode - 2500 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +UTF8_BINARY_LCASE - mode - 2500 elements 3 3 0 7.4 136.0 1.0X +UNICODE - mode - 2500 elements 7 7 0 3.7 269.4 0.5X +UTF8_BINARY - mode - 2500 elements 0 0 0 91.8 10.9 12.5X +UNICODE_CI - mode - 2500 elements 7 7 0 3.7 268.7 0.5X +Numerical Type - mode - 2500 elements 0 0 0 166.9 6.0 22.7X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index 1d761c7da5a9f..dbbf6cc540e24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -254,15 +254,15 @@ object CollationBenchmark extends CollationBenchmarkBase { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val inputs = generateSeqInput(10000L) - benchmarkMode(collationTypes, inputs) - benchmarkMode(collationTypes, generateSeqInput(5000L)) - benchmarkMode(collationTypes, generateSeqInput(2500L)) benchmarkUTFStringEquals(collationTypes, inputs) benchmarkUTFStringCompare(collationTypes, inputs) benchmarkUTFStringHashFunction(collationTypes, inputs) benchmarkContains(collationTypes, inputs) benchmarkStartsWith(collationTypes, inputs) benchmarkEndsWith(collationTypes, inputs) + benchmarkMode(collationTypes, inputs) + benchmarkMode(collationTypes, generateSeqInput(5000L)) + benchmarkMode(collationTypes, generateSeqInput(2500L)) } } From a5710d18d807dc46d6d14825252b5386ba3c126c Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Fri, 10 May 2024 09:27:09 -0400 Subject: [PATCH 19/42] tests pass --- .../catalyst/expressions/aggregate/Mode.scala | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index 3c0a677e9f9c8..d1efa25b23c4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.expressions.aggregate -import scala.collection.mutable.TreeMap - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, UnresolvedWithinGroup} import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder} @@ -75,28 +73,23 @@ case class Mode( } buffer } + private def impl3(buff: OpenHashMap[AnyRef, Long]) = { + val modeMap = buff.toSeq.groupMapReduce { + case (key: String, _) => + CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) + case (key: UTF8String, _) => + CollationFactory.getCollationKey(key, collationId) + case (key, _) => key + }(x => x)((x, y) => (x._1, x._2 + y._2)).values + modeMap + } override def eval(buff: OpenHashMap[AnyRef, Long]): Any = { if (buff.isEmpty) { return null } val buffer = if (isCollatedString(child)) { - val modeMap = buff.foldLeft( - new TreeMap[org.apache.spark.unsafe.types.UTF8String, Long]()(Ordering.comparatorToOrdering( - CollationFactory.fetchCollation(collationId).comparator - ))) - { - case (map, (key: String, count)) => - map(org.apache.spark.unsafe.types.UTF8String.fromString(key)) = - map.getOrElse(org.apache.spark.unsafe.types.UTF8String.fromString(key), 0L) + count - map - case (map, (key: UTF8String, count)) => - map(key) = map.getOrElse(key, 0L) + count - map - case (_, _) => - throw new IllegalArgumentException("Mode expects non-null string type.") - } - modeMap + impl3(buff) } else { buff } From 76f089f53281795a0def9386ca2bffbe1298e755 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Fri, 10 May 2024 09:27:46 -0400 Subject: [PATCH 20/42] tests pass --- .../catalyst/expressions/aggregate/Mode.scala | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index d1efa25b23c4e..ec3ee0e6fa5ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -73,23 +73,20 @@ case class Mode( } buffer } - private def impl3(buff: OpenHashMap[AnyRef, Long]) = { - val modeMap = buff.toSeq.groupMapReduce { - case (key: String, _) => - CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) - case (key: UTF8String, _) => - CollationFactory.getCollationKey(key, collationId) - case (key, _) => key - }(x => x)((x, y) => (x._1, x._2 + y._2)).values - modeMap - } override def eval(buff: OpenHashMap[AnyRef, Long]): Any = { if (buff.isEmpty) { return null } val buffer = if (isCollatedString(child)) { - impl3(buff) + val modeMap = buff.toSeq.groupMapReduce { + case (key: String, _) => + CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) + case (key: UTF8String, _) => + CollationFactory.getCollationKey(key, collationId) + case (key, _) => key + }(x => x)((x, y) => (x._1, x._2 + y._2)).values + modeMap } else { buff } From d8ea7717691177c07e3ac4e1d0ed025d29f186fa Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Fri, 10 May 2024 09:36:30 -0400 Subject: [PATCH 21/42] tests pass --- .../org/apache/spark/sql/CollationStringExpressionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index b65c42cd412dd..4193e3f0bedcc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -812,7 +812,7 @@ class CollationStringExpressionsSuite } } - test("Support mode for string expression with collation2 ") { + test("Support mode for string expression with collation - Advanced Test") { case class ModeTestCase[R](collationId: String, bufferValues: Map[String, Long], result: R) val testCases = Seq( ModeTestCase("utf8_binary", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"), From af187ac2de98f1442eae619111fbd5a71c8fb684 Mon Sep 17 00:00:00 2001 From: Gideon Potok <31429832+GideonPotok@users.noreply.github.com> Date: Mon, 13 May 2024 12:16:56 -0400 Subject: [PATCH 22/42] Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala Co-authored-by: Uros Bojanic <157381213+uros-db@users.noreply.github.com> --- .../apache/spark/sql/catalyst/expressions/aggregate/Mode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index ec3ee0e6fa5ef..a7183b4b0458c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -78,7 +78,7 @@ case class Mode( if (buff.isEmpty) { return null } - val buffer = if (isCollatedString(child)) { + val buffer = if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { val modeMap = buff.toSeq.groupMapReduce { case (key: String, _) => CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) From 184317d0fb70c8cbfa9535d2e603fa55b3b160eb Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Mon, 13 May 2024 13:13:33 -0400 Subject: [PATCH 23/42] removed withCollatedString --- .../spark/sql/catalyst/expressions/aggregate/Mode.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index a7183b4b0458c..da6d65afb7736 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -102,14 +102,6 @@ case class Mode( }.getOrElse(buffer.maxBy(_._2))._1 } - private def isCollatedString(child: Expression): Boolean = { - child.dataType match { - case s: StringType if s.collationId != CollationFactory.UTF8_BINARY_COLLATION_ID => true - // maybe use supportsBinaryEquality or something else - case _ => false - } - } - override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Mode = copy(mutableAggBufferOffset = newMutableAggBufferOffset) From cd74bc846ef17ed1749d513ebab7db7a04aa3cef Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Mon, 13 May 2024 13:15:55 -0400 Subject: [PATCH 24/42] buff->buffer --- .../sql/catalyst/expressions/aggregate/Mode.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index da6d65afb7736..67bc124dacc34 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -74,12 +74,12 @@ case class Mode( buffer } - override def eval(buff: OpenHashMap[AnyRef, Long]): Any = { - if (buff.isEmpty) { + override def eval(buffer: OpenHashMap[AnyRef, Long]): Any = { + if (buffer.isEmpty) { return null } - val buffer = if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { - val modeMap = buff.toSeq.groupMapReduce { + val buffer2 = if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { + val modeMap = buffer.toSeq.groupMapReduce { case (key: String, _) => CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) case (key: UTF8String, _) => @@ -88,7 +88,7 @@ case class Mode( }(x => x)((x, y) => (x._1, x._2 + y._2)).values modeMap } else { - buff + buffer } reverseOpt.map { reverse => @@ -98,8 +98,8 @@ case class Mode( PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]] } val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering) - buffer.maxBy { case (key, count) => (count, key) }(ordering) - }.getOrElse(buffer.maxBy(_._2))._1 + buffer2.maxBy { case (key, count) => (count, key) }(ordering) + }.getOrElse(buffer2.maxBy(_._2))._1 } override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Mode = From 8f2525ab4e4ed17c0a0c9456a558b19e16a2d955 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Mon, 13 May 2024 13:23:24 -0400 Subject: [PATCH 25/42] added the jdk 17 benchmarks. --- .../benchmarks/CollationBenchmark-results.txt | 80 +++++++++---------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/sql/core/benchmarks/CollationBenchmark-results.txt b/sql/core/benchmarks/CollationBenchmark-results.txt index 99b2f3eebcb11..392499a814f06 100644 --- a/sql/core/benchmarks/CollationBenchmark-results.txt +++ b/sql/core/benchmarks/CollationBenchmark-results.txt @@ -2,83 +2,83 @@ OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE 2739 2741 4 0.0 27385.4 1.0X -UNICODE 1692 1697 7 0.1 16917.9 1.6X -UTF8_BINARY 1693 1697 5 0.1 16933.2 1.6X -UNICODE_CI 16444 16491 66 0.0 164444.0 0.2X +UTF8_BINARY_LCASE 3236 3237 1 0.0 32360.9 1.0X +UNICODE 2088 2090 2 0.0 20884.9 1.5X +UTF8_BINARY 2087 2088 2 0.0 20868.7 1.6X +UNICODE_CI 17378 17391 18 0.0 173784.8 0.2X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE 4441 4442 0 0.0 44413.4 1.0X -UNICODE 17975 17979 6 0.0 179749.5 0.2X -UTF8_BINARY 3393 3398 7 0.0 33927.9 1.3X -UNICODE_CI 17856 17856 1 0.0 178555.0 0.2X +UTF8_BINARY_LCASE 3538 3540 3 0.0 35381.3 1.0X +UNICODE 19902 19903 2 0.0 199015.6 0.2X +UTF8_BINARY 3310 3311 1 0.0 33100.8 1.1X +UNICODE_CI 20913 20929 21 0.0 209134.2 0.2X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 7026 7027 0 0.0 70264.3 1.0X -UNICODE 66759 66804 63 0.0 667592.9 0.1X -UTF8_BINARY 5381 5406 36 0.0 53808.5 1.3X -UNICODE_CI 57825 57893 97 0.0 578249.9 0.1X +UTF8_BINARY_LCASE 7000 7003 3 0.0 70004.2 1.0X +UNICODE 65023 65024 2 0.0 650231.5 0.1X +UTF8_BINARY 5307 5343 52 0.0 53069.0 1.3X +UNICODE_CI 60437 60466 41 0.0 604374.8 0.1X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 14644 14645 1 0.0 146442.9 1.0X -UNICODE 8091 8096 7 0.0 80911.3 1.8X -UTF8_BINARY 9109 9116 10 0.0 91088.6 1.6X -UNICODE_CI 313713 313875 229 0.0 3137134.2 0.0X +UTF8_BINARY_LCASE 14255 14257 2 0.0 142554.8 1.0X +UNICODE 6842 6847 7 0.0 68416.2 2.1X +UTF8_BINARY 7616 7619 4 0.0 76163.5 1.9X +UNICODE_CI 317873 317979 150 0.0 3178729.5 0.0X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - startsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 13075 13106 44 0.0 130750.4 1.0X -UNICODE 6776 6777 1 0.0 67755.2 1.9X -UTF8_BINARY 7781 7818 52 0.0 77812.6 1.7X -UNICODE_CI 317735 318369 896 0.0 3177354.7 0.0X +UTF8_BINARY_LCASE 11522 11526 4 0.0 115224.8 1.0X +UNICODE 5631 5634 4 0.0 56311.7 2.0X +UTF8_BINARY 6396 6429 46 0.0 63958.4 1.8X +UNICODE_CI 314087 314121 48 0.0 3140869.4 0.0X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - endsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 13103 13108 8 0.0 131030.4 1.0X -UNICODE 6715 6757 58 0.0 67154.0 2.0X -UTF8_BINARY 7709 7713 5 0.0 77090.4 1.7X -UNICODE_CI 319593 320810 1722 0.0 3195929.8 0.0X +UTF8_BINARY_LCASE 11561 11567 9 0.0 115606.6 1.0X +UNICODE 5784 5792 10 0.0 57843.7 2.0X +UTF8_BINARY 6536 6543 10 0.0 65359.5 1.8X +UNICODE_CI 322878 323177 423 0.0 3228780.0 0.0X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor - collation unit benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 10000 elements 25 25 0 4.1 245.1 1.0X -UNICODE - mode - 10000 elements 53 54 0 1.9 529.7 0.5X -UTF8_BINARY - mode - 10000 elements 1 1 0 76.3 13.1 18.7X -UNICODE_CI - mode - 10000 elements 53 54 1 1.9 527.4 0.5X -Numerical Type - mode - 10000 elements 1 1 0 117.7 8.5 28.8X +UTF8_BINARY_LCASE - mode - 10000 elements 10 10 1 9.8 101.6 1.0X +UNICODE - mode - 10000 elements 9 9 0 10.9 91.7 1.1X +UTF8_BINARY - mode - 10000 elements 1 1 0 75.8 13.2 7.7X +UNICODE_CI - mode - 10000 elements 14 15 0 6.9 144.4 0.7X +Numerical Type - mode - 10000 elements 1 1 0 116.6 8.6 11.8X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - mode - 5000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 5000 elements 9 9 0 5.5 181.6 1.0X -UNICODE - mode - 5000 elements 18 18 1 2.8 360.9 0.5X -UTF8_BINARY - mode - 5000 elements 1 1 0 85.0 11.8 15.4X -UNICODE_CI - mode - 5000 elements 18 19 1 2.8 359.9 0.5X -Numerical Type - mode - 5000 elements 0 0 0 147.6 6.8 26.8X +UTF8_BINARY_LCASE - mode - 5000 elements 4 4 0 12.9 77.3 1.0X +UNICODE - mode - 5000 elements 3 4 0 14.6 68.4 1.1X +UTF8_BINARY - mode - 5000 elements 1 1 0 83.9 11.9 6.5X +UNICODE_CI - mode - 5000 elements 5 5 0 9.4 106.7 0.7X +Numerical Type - mode - 5000 elements 0 0 0 142.9 7.0 11.1X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - mode - 2500 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 2500 elements 3 3 0 7.4 136.0 1.0X -UNICODE - mode - 2500 elements 7 7 0 3.7 269.4 0.5X -UTF8_BINARY - mode - 2500 elements 0 0 0 91.8 10.9 12.5X -UNICODE_CI - mode - 2500 elements 7 7 0 3.7 268.7 0.5X -Numerical Type - mode - 2500 elements 0 0 0 166.9 6.0 22.7X +UTF8_BINARY_LCASE - mode - 2500 elements 2 2 0 14.7 68.1 1.0X +UNICODE - mode - 2500 elements 2 2 0 16.6 60.3 1.1X +UTF8_BINARY - mode - 2500 elements 0 0 0 81.2 12.3 5.5X +UNICODE_CI - mode - 2500 elements 2 2 0 11.0 91.0 0.7X +Numerical Type - mode - 2500 elements 0 0 0 141.0 7.1 9.6X + From 7aca2e34a598ec69f74bcf8d970ea9badb4176ba Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Mon, 13 May 2024 14:01:41 -0400 Subject: [PATCH 26/42] better benchmark --- .../benchmark/CollationBenchmark.scala | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index dbbf6cc540e24..a3e04999e32c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -195,7 +195,7 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { value: Seq[UTF8String]): Unit = { val benchmark = new Benchmark( s"collation unit benchmarks - mode - ${value.size} elements", - value.size * 10, + value.size, warmupTime = 10.seconds, output = output) collationTypes.foreach { collationType => { @@ -242,16 +242,36 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { */ object CollationBenchmark extends CollationBenchmarkBase { - override def generateSeqInput(n: Long): Seq[UTF8String] = { - val input = Seq("ABC", "ABC", "aBC", "aBC", "abc", "abc", "DEF", "DEF", "def", "def", - "GHI", "ghi", "JKL", "jkl", "MNO", "mno", "PQR", "pqr", "STU", "stu", "VWX", "vwx", - "ABC", "ABC", "aBC", "aBC", "abc", "abc", "DEF", "DEF", "def", "def", "GHI", "ghi", - "JKL", "jkl", "MNO", "mno", "PQR", "pqr", "STU", "stu", "VWX", "vwx", "YZ") - .map(UTF8String.fromString) - val inputLong: Seq[UTF8String] = (0L until n).map(i => input(i.toInt % input.size)) - inputLong + private val baseInputStrings = Seq("ABC", "ABC", "aBC", "aBC", "abc", + "abc", "DEF", "DEF", "def", "def", + "GHI", "ghi", "JKL", "jkl", "MNO", "mno", "PQR", "pqr", "STU", "stu", "VWX", "vwx", + "ABC", "ABC", "aBC", "aBC", "abc", "abc", "DEF", "DEF", "def", "def", "GHI", "ghi", + "JKL", "jkl", "MNO", "mno", "PQR", "pqr", "STU", "stu", "VWX", "vwx", "YZ") + + + /* + * Generate input strings for the benchmark. The input strings are a sequence of base strings + * repeated n / input.size times. + */ + private def generateBaseInputStrings(n: Long): Seq[UTF8String] = { + val input = baseInputStrings.map(UTF8String.fromString) + (0L until n).map(i => input(i.toInt % input.size)) } + /* + Lowercase and some repeated strings to test the performance of the collation functions. + */ + def generateBaseInputStringswithUniqueGroupNumber(n: Long): Seq[UTF8String] = { + (0 to n / baseInputStrings.size).flatMap(k => baseInputStrings.map( + x => UTF8String.fromString(x + "_" + k))) + .flatMap( + x => Seq(x, x.repeat(4), x.repeat(8))) // Variable Lengths... + .sortBy(f => f.reverse().hashCode()) // Shuffle the input + } + + override def generateSeqInput(n: Long): Seq[UTF8String] = + generateBaseInputStrings(n) + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val inputs = generateSeqInput(10000L) benchmarkUTFStringEquals(collationTypes, inputs) @@ -260,9 +280,9 @@ object CollationBenchmark extends CollationBenchmarkBase { benchmarkContains(collationTypes, inputs) benchmarkStartsWith(collationTypes, inputs) benchmarkEndsWith(collationTypes, inputs) - benchmarkMode(collationTypes, inputs) - benchmarkMode(collationTypes, generateSeqInput(5000L)) - benchmarkMode(collationTypes, generateSeqInput(2500L)) + benchmarkMode(collationTypes, generateBaseInputStringswithUniqueGroupNumber(10000L)) + benchmarkMode(collationTypes, generateBaseInputStringswithUniqueGroupNumber(5000L)) + benchmarkMode(collationTypes, generateBaseInputStringswithUniqueGroupNumber(2500L)) } } From 81e2b4427a91e56fc2a1a0f47121252e0c2e37f1 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Mon, 13 May 2024 14:28:42 -0400 Subject: [PATCH 27/42] better benchmark --- .../benchmark/CollationBenchmark.scala | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index a3e04999e32c1..e3d351430705f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -203,27 +203,14 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { val modeDefaultCollation = Mode(child = Literal.create("some_column_name", StringType(collationType))) val buffer = new OpenHashMap[AnyRef, Long](value.size) - value.zipWithIndex.sliding(2000, 2000).foreach(slide => { - slide.foreach { case (v: UTF8String, i: Int) => - buffer.update(v.toString + s"_${i.toString}", (i % 1000).toLong) - } + value.foreach(v => { + buffer.update(v.toString, (v.hashCode() % 1000).toLong) + }) modeDefaultCollation.eval(buffer) - }) + } } } - } - benchmark.addCase(s"Numerical Type - mode - ${value.size} elements") { _ => - val modeIntType = Mode(child = Literal.create(1, IntegerType)) - val buffer = new OpenHashMap[AnyRef, Long](value.size) - value.zipWithIndex.sliding(2000, 2000).foreach(slide => { - slide.foreach { - case (_, i: Int) => - buffer.update(i.asInstanceOf[AnyRef], (i % 1000).toLong) - } - modeIntType.eval(buffer) - }) - } benchmark.run() } } From dec21a503050ebef59e2ca252cca4671be5e4d56 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Mon, 13 May 2024 14:49:39 -0400 Subject: [PATCH 28/42] better benchmark --- .../apache/spark/sql/CollationStringExpressionsSuite.scala | 4 ++-- .../spark/sql/execution/benchmark/CollationBenchmark.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 5bab6b334c6f9..2b6383184b92e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql import org.apache.spark.SparkConf -import org.apache.spark.sql.catalyst.expressions.aggregate.Mode import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, Literal, StringTrim, StringTrimLeft, StringTrimRight} +import org.apache.spark.sql.catalyst.expressions.aggregate.Mode import org.apache.spark.sql.catalyst.util.CollationFactory import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -880,7 +880,7 @@ class CollationStringExpressionsSuite assert(myMode.eval(buffer).toString.toLowerCase() == t.result.toLowerCase()) }) } - + test("StringTrim* functions - unit tests for both paths (codegen and eval)") { // Without trimString param. checkEvaluation(StringTrim(Literal.create( " asd ", StringType("UTF8_BINARY"))), "asd") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index e3d351430705f..8040d61024507 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -22,7 +22,7 @@ import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.aggregate.Mode import org.apache.spark.sql.catalyst.util.{CollationFactory, CollationSupport} -import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.sql.types.StringType import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.collection.OpenHashMap @@ -249,7 +249,7 @@ object CollationBenchmark extends CollationBenchmarkBase { Lowercase and some repeated strings to test the performance of the collation functions. */ def generateBaseInputStringswithUniqueGroupNumber(n: Long): Seq[UTF8String] = { - (0 to n / baseInputStrings.size).flatMap(k => baseInputStrings.map( + (0 to n.toInt / baseInputStrings.size).flatMap(k => baseInputStrings.map( x => UTF8String.fromString(x + "_" + k))) .flatMap( x => Seq(x, x.repeat(4), x.repeat(8))) // Variable Lengths... From 045c007b31f5a22369e815b82f34eb1f84f8200c Mon Sep 17 00:00:00 2001 From: Gideon Potok <31429832+GideonPotok@users.noreply.github.com> Date: Tue, 14 May 2024 08:31:29 -0400 Subject: [PATCH 29/42] Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala rename to collationAwareBuffer Co-authored-by: Uros Bojanic <157381213+uros-db@users.noreply.github.com> --- .../apache/spark/sql/catalyst/expressions/aggregate/Mode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index 67bc124dacc34..d21049a0169fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -78,7 +78,7 @@ case class Mode( if (buffer.isEmpty) { return null } - val buffer2 = if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { + val collationAwareBuffer = if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { val modeMap = buffer.toSeq.groupMapReduce { case (key: String, _) => CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) From d85c052183c987d5da568b1a4fae076b3fd012e2 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 14 May 2024 08:36:09 -0400 Subject: [PATCH 30/42] up to date benchmarks --- .../CollationBenchmark-jdk21-results.txt | 83 +++++++++---------- .../benchmarks/CollationBenchmark-results.txt | 83 +++++++++---------- 2 files changed, 80 insertions(+), 86 deletions(-) diff --git a/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt b/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt index 956d25efdf30a..ab71b57390c5c 100644 --- a/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt +++ b/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt @@ -2,83 +2,80 @@ OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE 2948 2958 13 0.0 29483.6 1.0X -UNICODE 2040 2042 3 0.0 20396.6 1.4X -UTF8_BINARY 2043 2043 0 0.0 20426.3 1.4X -UNICODE_CI 16318 16338 28 0.0 163178.4 0.2X +UTF8_BINARY_LCASE 2889 2923 48 0.0 28892.1 1.0X +UNICODE 2748 2748 1 0.0 27476.5 1.1X +UTF8_BINARY 2744 2745 1 0.0 27439.5 1.1X +UNICODE_CI 16815 16817 2 0.0 168154.3 0.2X OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE 3227 3228 1 0.0 32272.1 1.0X -UNICODE 16637 16643 9 0.0 166367.7 0.2X -UTF8_BINARY 3132 3137 7 0.0 31319.2 1.0X -UNICODE_CI 17816 17829 18 0.0 178162.4 0.2X +UTF8_BINARY_LCASE 4782 4784 3 0.0 47819.3 1.0X +UNICODE 18986 18995 13 0.0 189855.8 0.3X +UTF8_BINARY 5026 5048 31 0.0 50258.2 1.0X +UNICODE_CI 19735 19771 50 0.0 197351.1 0.2X OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 4824 4824 0 0.0 48243.7 1.0X -UNICODE 69416 69475 84 0.0 694158.3 0.1X -UTF8_BINARY 3806 3808 2 0.0 38062.8 1.3X -UNICODE_CI 60943 60975 45 0.0 609426.2 0.1X +UTF8_BINARY_LCASE 4933 4933 1 0.0 49330.9 1.0X +UNICODE 68091 68119 40 0.0 680908.8 0.1X +UTF8_BINARY 3878 3879 2 0.0 38782.4 1.3X +UNICODE_CI 55501 55526 35 0.0 555014.2 0.1X OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 11979 11980 1 0.0 119790.4 1.0X -UNICODE 6469 6474 7 0.0 64694.8 1.9X -UTF8_BINARY 7253 7253 1 0.0 72528.3 1.7X -UNICODE_CI 319124 319881 1070 0.0 3191244.0 0.0X +UTF8_BINARY_LCASE 10441 10444 4 0.0 104412.3 1.0X +UNICODE 5811 5812 1 0.0 58106.6 1.8X +UTF8_BINARY 6397 6411 19 0.0 63971.7 1.6X +UNICODE_CI 323853 324618 1082 0.0 3238530.0 0.0X OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - startsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 11584 11595 15 0.0 115841.4 1.0X -UNICODE 6155 6156 2 0.0 61548.7 1.9X -UTF8_BINARY 6979 6982 5 0.0 69785.6 1.7X -UNICODE_CI 318228 318726 705 0.0 3182275.2 0.0X +UTF8_BINARY_LCASE 10123 10154 44 0.0 101227.4 1.0X +UNICODE 5682 5686 7 0.0 56815.0 1.8X +UTF8_BINARY 6296 6300 5 0.0 62961.9 1.6X +UNICODE_CI 318720 318957 336 0.0 3187199.4 0.0X OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - endsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 11655 11664 12 0.0 116552.8 1.0X -UNICODE 6235 6239 5 0.0 62350.8 1.9X -UTF8_BINARY 7066 7069 5 0.0 70658.1 1.6X -UNICODE_CI 313515 313999 685 0.0 3135149.1 0.0X +UTF8_BINARY_LCASE 10195 10198 5 0.0 101948.5 1.0X +UNICODE 5731 5732 1 0.0 57314.8 1.8X +UTF8_BINARY 6344 6366 31 0.0 63443.6 1.6X +UNICODE_CI 324196 325450 1772 0.0 3241964.4 0.0X OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +collation unit benchmarks - mode - 30105 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 10000 elements 19 19 0 5.2 192.2 1.0X -UNICODE - mode - 10000 elements 52 53 1 1.9 522.1 0.4X -UTF8_BINARY - mode - 10000 elements 1 1 0 81.3 12.3 15.6X -UNICODE_CI - mode - 10000 elements 52 53 1 1.9 518.9 0.4X -Numerical Type - mode - 10000 elements 1 1 0 116.5 8.6 22.4X +UTF8_BINARY_LCASE - mode - 30105 elements 6 6 0 5.1 195.6 1.0X +UNICODE - mode - 30105 elements 3 3 0 11.6 86.0 2.3X +UTF8_BINARY - mode - 30105 elements 3 3 0 11.6 85.9 2.3X +UNICODE_CI - mode - 30105 elements 12 12 1 2.6 382.9 0.5X OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 5000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 5000 elements 7 7 0 7.4 135.8 1.0X -UNICODE - mode - 5000 elements 18 18 1 2.8 354.5 0.4X -UTF8_BINARY - mode - 5000 elements 1 1 0 89.3 11.2 12.1X -UNICODE_CI - mode - 5000 elements 18 18 1 2.8 353.6 0.4X -Numerical Type - mode - 5000 elements 0 0 0 147.4 6.8 20.0X +collation unit benchmarks - mode - 15120 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------- +UTF8_BINARY_LCASE - mode - 15120 elements 3 3 0 5.3 187.8 1.0X +UNICODE - mode - 15120 elements 1 1 0 12.4 80.6 2.3X +UTF8_BINARY - mode - 15120 elements 1 1 0 12.4 80.9 2.3X +UNICODE_CI - mode - 15120 elements 5 6 0 2.8 357.8 0.5X OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 2500 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +collation unit benchmarks - mode - 7560 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 2500 elements 3 3 0 9.8 102.1 1.0X -UNICODE - mode - 2500 elements 7 7 0 3.8 265.1 0.4X -UTF8_BINARY - mode - 2500 elements 0 0 0 97.3 10.3 9.9X -UNICODE_CI - mode - 2500 elements 7 7 1 3.8 264.5 0.4X -Numerical Type - mode - 2500 elements 0 0 0 164.3 6.1 16.8X +UTF8_BINARY_LCASE - mode - 7560 elements 1 1 0 5.6 179.2 1.0X +UNICODE - mode - 7560 elements 1 1 0 13.1 76.3 2.3X +UTF8_BINARY - mode - 7560 elements 1 1 0 13.1 76.4 2.3X +UNICODE_CI - mode - 7560 elements 3 3 0 2.9 345.1 0.5X diff --git a/sql/core/benchmarks/CollationBenchmark-results.txt b/sql/core/benchmarks/CollationBenchmark-results.txt index 392499a814f06..ed0ab08c80c79 100644 --- a/sql/core/benchmarks/CollationBenchmark-results.txt +++ b/sql/core/benchmarks/CollationBenchmark-results.txt @@ -2,83 +2,80 @@ OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE 3236 3237 1 0.0 32360.9 1.0X -UNICODE 2088 2090 2 0.0 20884.9 1.5X -UTF8_BINARY 2087 2088 2 0.0 20868.7 1.6X -UNICODE_CI 17378 17391 18 0.0 173784.8 0.2X +UTF8_BINARY_LCASE 3260 3264 7 0.0 32595.0 1.0X +UNICODE 2783 2784 1 0.0 27834.5 1.2X +UTF8_BINARY 2789 2789 0 0.0 27889.1 1.2X +UNICODE_CI 17545 17548 5 0.0 175445.8 0.2X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE 3538 3540 3 0.0 35381.3 1.0X -UNICODE 19902 19903 2 0.0 199015.6 0.2X -UTF8_BINARY 3310 3311 1 0.0 33100.8 1.1X -UNICODE_CI 20913 20929 21 0.0 209134.2 0.2X +UTF8_BINARY_LCASE 3716 3736 27 0.0 37164.3 1.0X +UNICODE 18425 18429 6 0.0 184247.4 0.2X +UTF8_BINARY 3192 3198 9 0.0 31922.3 1.2X +UNICODE_CI 19072 19079 10 0.0 190718.3 0.2X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 7000 7003 3 0.0 70004.2 1.0X -UNICODE 65023 65024 2 0.0 650231.5 0.1X -UTF8_BINARY 5307 5343 52 0.0 53069.0 1.3X -UNICODE_CI 60437 60466 41 0.0 604374.8 0.1X +UTF8_BINARY_LCASE 7051 7053 4 0.0 70505.6 1.0X +UNICODE 64901 64941 58 0.0 649006.0 0.1X +UTF8_BINARY 5461 5501 57 0.0 54612.8 1.3X +UNICODE_CI 59907 59972 91 0.0 599073.6 0.1X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 14255 14257 2 0.0 142554.8 1.0X -UNICODE 6842 6847 7 0.0 68416.2 2.1X -UTF8_BINARY 7616 7619 4 0.0 76163.5 1.9X -UNICODE_CI 317873 317979 150 0.0 3178729.5 0.0X +UTF8_BINARY_LCASE 13702 13706 5 0.0 137020.5 1.0X +UNICODE 7306 7309 5 0.0 73056.6 1.9X +UTF8_BINARY 8077 8079 4 0.0 80765.4 1.7X +UNICODE_CI 311083 311372 409 0.0 3110831.3 0.0X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - startsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 11522 11526 4 0.0 115224.8 1.0X -UNICODE 5631 5634 4 0.0 56311.7 2.0X -UTF8_BINARY 6396 6429 46 0.0 63958.4 1.8X -UNICODE_CI 314087 314121 48 0.0 3140869.4 0.0X +UTF8_BINARY_LCASE 11665 11676 16 0.0 116650.6 1.0X +UNICODE 5983 5987 5 0.0 59832.3 1.9X +UTF8_BINARY 6680 6701 29 0.0 66803.2 1.7X +UNICODE_CI 307047 307098 72 0.0 3070474.8 0.0X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - endsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -UTF8_BINARY_LCASE 11561 11567 9 0.0 115606.6 1.0X -UNICODE 5784 5792 10 0.0 57843.7 2.0X -UTF8_BINARY 6536 6543 10 0.0 65359.5 1.8X -UNICODE_CI 322878 323177 423 0.0 3228780.0 0.0X +UTF8_BINARY_LCASE 11707 11712 7 0.0 117068.1 1.0X +UNICODE 6062 6064 2 0.0 60618.8 1.9X +UTF8_BINARY 6730 6730 1 0.0 67300.3 1.7X +UNICODE_CI 314370 314565 276 0.0 3143696.2 0.0X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +collation unit benchmarks - mode - 30105 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 10000 elements 10 10 1 9.8 101.6 1.0X -UNICODE - mode - 10000 elements 9 9 0 10.9 91.7 1.1X -UTF8_BINARY - mode - 10000 elements 1 1 0 75.8 13.2 7.7X -UNICODE_CI - mode - 10000 elements 14 15 0 6.9 144.4 0.7X -Numerical Type - mode - 10000 elements 1 1 0 116.6 8.6 11.8X +UTF8_BINARY_LCASE - mode - 30105 elements 7 7 0 4.4 224.8 1.0X +UNICODE - mode - 30105 elements 3 3 0 9.4 105.8 2.1X +UTF8_BINARY - mode - 30105 elements 3 3 0 9.7 103.0 2.2X +UNICODE_CI - mode - 30105 elements 11 12 1 2.7 371.3 0.6X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 5000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 5000 elements 4 4 0 12.9 77.3 1.0X -UNICODE - mode - 5000 elements 3 4 0 14.6 68.4 1.1X -UTF8_BINARY - mode - 5000 elements 1 1 0 83.9 11.9 6.5X -UNICODE_CI - mode - 5000 elements 5 5 0 9.4 106.7 0.7X -Numerical Type - mode - 5000 elements 0 0 0 142.9 7.0 11.1X +collation unit benchmarks - mode - 15120 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------- +UTF8_BINARY_LCASE - mode - 15120 elements 3 3 0 4.6 216.0 1.0X +UNICODE - mode - 15120 elements 1 2 0 10.3 97.5 2.2X +UTF8_BINARY - mode - 15120 elements 1 2 0 10.3 97.5 2.2X +UNICODE_CI - mode - 15120 elements 5 5 0 2.9 347.6 0.6X OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 2500 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +collation unit benchmarks - mode - 7560 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 2500 elements 2 2 0 14.7 68.1 1.0X -UNICODE - mode - 2500 elements 2 2 0 16.6 60.3 1.1X -UTF8_BINARY - mode - 2500 elements 0 0 0 81.2 12.3 5.5X -UNICODE_CI - mode - 2500 elements 2 2 0 11.0 91.0 0.7X -Numerical Type - mode - 2500 elements 0 0 0 141.0 7.1 9.6X +UTF8_BINARY_LCASE - mode - 7560 elements 2 2 0 4.8 206.4 1.0X +UNICODE - mode - 7560 elements 1 1 0 10.9 92.1 2.2X +UTF8_BINARY - mode - 7560 elements 1 1 0 10.7 93.2 2.2X +UNICODE_CI - mode - 7560 elements 2 3 0 3.1 327.7 0.6X From af016f4b75d52a35f29dec90337fe6179d40357f Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 14 May 2024 08:38:44 -0400 Subject: [PATCH 31/42] scalastyle --- .../spark/sql/catalyst/expressions/aggregate/Mode.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index d21049a0169fc..174eef0a2b0de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -78,7 +78,8 @@ case class Mode( if (buffer.isEmpty) { return null } - val collationAwareBuffer = if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { + val collationAwareBuffer = + if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { val modeMap = buffer.toSeq.groupMapReduce { case (key: String, _) => CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) @@ -98,8 +99,8 @@ case class Mode( PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]] } val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering) - buffer2.maxBy { case (key, count) => (count, key) }(ordering) - }.getOrElse(buffer2.maxBy(_._2))._1 + collationAwareBuffer.maxBy { case (key, count) => (count, key) }(ordering) + }.getOrElse(collationAwareBuffer.maxBy(_._2))._1 } override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Mode = From 3fbe2b2bced047900fa1eb2d861eb191d2cc0d40 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 14 May 2024 10:50:12 -0400 Subject: [PATCH 32/42] scalastyle --- .../sql/CollationStringExpressionsSuite.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 2b6383184b92e..45b82298ed519 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -837,6 +837,57 @@ class CollationStringExpressionsSuite }) } + test("Support mode for string expression w/ collation - Unicode planes") { + val unicodeChars = Seq( + "\uD834\uDD1E", // 𝄞 (U+1D11E) + "\uD83D\uDE00", // 😀 (U+1F600) + "\uD83D\uDE02", // 😂 (U+1F602) + "\u0061", // a (U+0061) + "\u0062", // b (U+0062) + "\u005A", // Z (U+005A) + "\u0FB2\u0F71", // ྲ (U+0FB2 U+0F71) + "\u0FB3\u0F71", // ླ (U+0FB3 U+0F71) + "\u0FB2\u0F71\u0F72", // ྲྲ (U+0FB2 U+0F71 U+0F72) + "\u0FB2\u0F73", // ྲླ (U+0FB2 U+0F73) + "\u0FB2\u0F71\u0F74", // ྲྴ (U+0FB2 U+0F71 U+0F74) + "\u0FB2\u0F75", // ྲྵ (U+0FB2 U+0F75) + "\u0FB3\u0F71\u0F72", // ླྲ (U+0FB3 U+0F71 U+0F72) + "\u0FB3\u0F73", // ླླ (U+0FB3 U+0F73) + "\u0FB3\u0F71\u0F74", // ླྴ (U+0FB3 U+0F71 U+0F74) + "\u0FB3\u0F75", // ླྵ (U+0FB3 U+0F75) + "\uDC00", // U+DC00 is a low surrogate + "\u2F9AF", // 2F9AF ; [.FB80.0020.0002][.C561.0000.0000] # CJK COMPATIBILITY IDEOGRAPH-2F9AF + "\u2F9B2", // 2F9B2 ; [.FB80.0020.0002][.C56B.0000.0000] # CJK COMPATIBILITY IDEOGRAPH-2F9B2 + "\u2F9BF", // 2F9BF ; [.FB80.0020.0002][.C5D7.0000.0000] # CJK COMPATIBILITY IDEOGRAPH-2F9BF + "\u2F9C2", // 2F9C2 ; [.FB80.0020.0002][.C5F9.0000.0000] # CJK COMPATIBILITY IDEOGRAPH-2F9C2 + "\uD800\uDC00", // 𐀀 (U+10000) + "\uD87E\uDC00" // 𣸀 (U+2F800) + ) + + case class ModeTestCase[R](collationId: String, bufferValues: Map[String, Long], result: R) + unicodeChars.sliding(2, 1).foreach(x => { + val testCases = Seq( + ModeTestCase("utf8_binary", x.map(y => (y, 1L)).toMap, x.head), + ModeTestCase("unicode", x.map(y => (y, 1L)).toMap, x.head) + ) + testCases.foreach(t => { + val valuesToAdd = t.bufferValues.map { case (elt, numRepeats) => + (0L to numRepeats).map(_ => s"('$elt')").mkString(",") + }.mkString(",") + + withTable("t") { + sql("CREATE TABLE t(i STRING) USING parquet") + sql("INSERT INTO t VALUES " + valuesToAdd) + val query = s"SELECT mode(collate(i, '${t.collationId}')) FROM t" + checkAnswer(sql(query), Row(t.result)) + assert(sql(query).schema.fields.head.dataType.sameType(StringType(t.collationId))) + + } + }) + }) + + } + test("Support Mode.eval(buffer)") { case class ModeTestCase[R]( collationId: String, From ab7fa8e8694ef79e14db4a5906049075c5e51515 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 14 May 2024 12:18:39 -0400 Subject: [PATCH 33/42] tests with higher unicode planes - corner cases --- .../sql/CollationStringExpressionsSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 45b82298ed519..32d996c7540a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, Literal, StringTrim, StringTrimLeft, StringTrimRight} import org.apache.spark.sql.catalyst.expressions.aggregate.Mode import org.apache.spark.sql.catalyst.util.CollationFactory @@ -865,24 +866,23 @@ class CollationStringExpressionsSuite ) case class ModeTestCase[R](collationId: String, bufferValues: Map[String, Long], result: R) - unicodeChars.sliding(2, 1).foreach(x => { + unicodeChars.sliding(3, 1).foreach(x => { val testCases = Seq( ModeTestCase("utf8_binary", x.map(y => (y, 1L)).toMap, x.head), ModeTestCase("unicode", x.map(y => (y, 1L)).toMap, x.head) ) testCases.foreach(t => { - val valuesToAdd = t.bufferValues.map { case (elt, numRepeats) => - (0L to numRepeats).map(_ => s"('$elt')").mkString(",") - }.mkString(",") - - withTable("t") { - sql("CREATE TABLE t(i STRING) USING parquet") - sql("INSERT INTO t VALUES " + valuesToAdd) - val query = s"SELECT mode(collate(i, '${t.collationId}')) FROM t" - checkAnswer(sql(query), Row(t.result)) - assert(sql(query).schema.fields.head.dataType.sameType(StringType(t.collationId))) + val mode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) + val buffer = t.bufferValues.foldLeft(new OpenHashMap[AnyRef, Long](3))((b, each) => { + b.changeValue(UTF8String.fromString(each._1).asInstanceOf[AnyRef], each._2, _ + each._2) + b } + ) + // scalastyle:off println + println(s"${t.bufferValues.keys.mkString(",")}: " + mode.eval(buffer)) + // scalastyle:on println + assert(mode.eval(buffer).toString == t.result) }) }) From c86e01a9ddefb4b63db9563754503a0bb0558640 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 14 May 2024 12:19:10 -0400 Subject: [PATCH 34/42] removed those unicode tests for now at least --- .../sql/CollationStringExpressionsSuite.scala | 50 ------------------- 1 file changed, 50 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala index 32d996c7540a1..6622eb5c534c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala @@ -838,56 +838,6 @@ class CollationStringExpressionsSuite }) } - test("Support mode for string expression w/ collation - Unicode planes") { - val unicodeChars = Seq( - "\uD834\uDD1E", // 𝄞 (U+1D11E) - "\uD83D\uDE00", // 😀 (U+1F600) - "\uD83D\uDE02", // 😂 (U+1F602) - "\u0061", // a (U+0061) - "\u0062", // b (U+0062) - "\u005A", // Z (U+005A) - "\u0FB2\u0F71", // ྲ (U+0FB2 U+0F71) - "\u0FB3\u0F71", // ླ (U+0FB3 U+0F71) - "\u0FB2\u0F71\u0F72", // ྲྲ (U+0FB2 U+0F71 U+0F72) - "\u0FB2\u0F73", // ྲླ (U+0FB2 U+0F73) - "\u0FB2\u0F71\u0F74", // ྲྴ (U+0FB2 U+0F71 U+0F74) - "\u0FB2\u0F75", // ྲྵ (U+0FB2 U+0F75) - "\u0FB3\u0F71\u0F72", // ླྲ (U+0FB3 U+0F71 U+0F72) - "\u0FB3\u0F73", // ླླ (U+0FB3 U+0F73) - "\u0FB3\u0F71\u0F74", // ླྴ (U+0FB3 U+0F71 U+0F74) - "\u0FB3\u0F75", // ླྵ (U+0FB3 U+0F75) - "\uDC00", // U+DC00 is a low surrogate - "\u2F9AF", // 2F9AF ; [.FB80.0020.0002][.C561.0000.0000] # CJK COMPATIBILITY IDEOGRAPH-2F9AF - "\u2F9B2", // 2F9B2 ; [.FB80.0020.0002][.C56B.0000.0000] # CJK COMPATIBILITY IDEOGRAPH-2F9B2 - "\u2F9BF", // 2F9BF ; [.FB80.0020.0002][.C5D7.0000.0000] # CJK COMPATIBILITY IDEOGRAPH-2F9BF - "\u2F9C2", // 2F9C2 ; [.FB80.0020.0002][.C5F9.0000.0000] # CJK COMPATIBILITY IDEOGRAPH-2F9C2 - "\uD800\uDC00", // 𐀀 (U+10000) - "\uD87E\uDC00" // 𣸀 (U+2F800) - ) - - case class ModeTestCase[R](collationId: String, bufferValues: Map[String, Long], result: R) - unicodeChars.sliding(3, 1).foreach(x => { - val testCases = Seq( - ModeTestCase("utf8_binary", x.map(y => (y, 1L)).toMap, x.head), - ModeTestCase("unicode", x.map(y => (y, 1L)).toMap, x.head) - ) - testCases.foreach(t => { - - val mode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) - val buffer = t.bufferValues.foldLeft(new OpenHashMap[AnyRef, Long](3))((b, each) => { - b.changeValue(UTF8String.fromString(each._1).asInstanceOf[AnyRef], each._2, _ + each._2) - b - } - ) - // scalastyle:off println - println(s"${t.bufferValues.keys.mkString(",")}: " + mode.eval(buffer)) - // scalastyle:on println - assert(mode.eval(buffer).toString == t.result) - }) - }) - - } - test("Support Mode.eval(buffer)") { case class ModeTestCase[R]( collationId: String, From 08a3e0a614c4bb1b77ab8b04e9b5ab2118191175 Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 14 May 2024 17:03:46 -0400 Subject: [PATCH 35/42] removed extra benchmarks --- .../CollationBenchmark-jdk21-results.txt | 19 ------------------- .../benchmarks/CollationBenchmark-results.txt | 19 ------------------- .../benchmark/CollationBenchmark.scala | 2 -- 3 files changed, 40 deletions(-) diff --git a/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt b/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt index ab71b57390c5c..5b71f71d6d859 100644 --- a/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt +++ b/sql/core/benchmarks/CollationBenchmark-jdk21-results.txt @@ -60,22 +60,3 @@ UTF8_BINARY_LCASE - mode - 30105 elements 6 6 UNICODE - mode - 30105 elements 3 3 0 11.6 86.0 2.3X UTF8_BINARY - mode - 30105 elements 3 3 0 11.6 85.9 2.3X UNICODE_CI - mode - 30105 elements 12 12 1 2.6 382.9 0.5X - -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure -AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 15120 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 15120 elements 3 3 0 5.3 187.8 1.0X -UNICODE - mode - 15120 elements 1 1 0 12.4 80.6 2.3X -UTF8_BINARY - mode - 15120 elements 1 1 0 12.4 80.9 2.3X -UNICODE_CI - mode - 15120 elements 5 6 0 2.8 357.8 0.5X - -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure -AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 7560 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 7560 elements 1 1 0 5.6 179.2 1.0X -UNICODE - mode - 7560 elements 1 1 0 13.1 76.3 2.3X -UTF8_BINARY - mode - 7560 elements 1 1 0 13.1 76.4 2.3X -UNICODE_CI - mode - 7560 elements 3 3 0 2.9 345.1 0.5X - diff --git a/sql/core/benchmarks/CollationBenchmark-results.txt b/sql/core/benchmarks/CollationBenchmark-results.txt index ed0ab08c80c79..d889fff8f2b3d 100644 --- a/sql/core/benchmarks/CollationBenchmark-results.txt +++ b/sql/core/benchmarks/CollationBenchmark-results.txt @@ -60,22 +60,3 @@ UTF8_BINARY_LCASE - mode - 30105 elements 7 7 UNICODE - mode - 30105 elements 3 3 0 9.4 105.8 2.1X UTF8_BINARY - mode - 30105 elements 3 3 0 9.7 103.0 2.2X UNICODE_CI - mode - 30105 elements 11 12 1 2.7 371.3 0.6X - -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure -AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 15120 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 15120 elements 3 3 0 4.6 216.0 1.0X -UNICODE - mode - 15120 elements 1 2 0 10.3 97.5 2.2X -UTF8_BINARY - mode - 15120 elements 1 2 0 10.3 97.5 2.2X -UNICODE_CI - mode - 15120 elements 5 5 0 2.9 347.6 0.6X - -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure -AMD EPYC 7763 64-Core Processor -collation unit benchmarks - mode - 7560 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -UTF8_BINARY_LCASE - mode - 7560 elements 2 2 0 4.8 206.4 1.0X -UNICODE - mode - 7560 elements 1 1 0 10.9 92.1 2.2X -UTF8_BINARY - mode - 7560 elements 1 1 0 10.7 93.2 2.2X -UNICODE_CI - mode - 7560 elements 2 3 0 3.1 327.7 0.6X - diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala index 8040d61024507..949d9b30b3c19 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala @@ -268,8 +268,6 @@ object CollationBenchmark extends CollationBenchmarkBase { benchmarkStartsWith(collationTypes, inputs) benchmarkEndsWith(collationTypes, inputs) benchmarkMode(collationTypes, generateBaseInputStringswithUniqueGroupNumber(10000L)) - benchmarkMode(collationTypes, generateBaseInputStringswithUniqueGroupNumber(5000L)) - benchmarkMode(collationTypes, generateBaseInputStringswithUniqueGroupNumber(2500L)) } } From d3911cb836b45bb1ef67bb606f36138399c7c1fe Mon Sep 17 00:00:00 2001 From: GideonPotok Date: Tue, 14 May 2024 17:19:05 -0400 Subject: [PATCH 36/42] move tests to CollationSQLExpressionsSuite --- connector/kafka-0-10-token-provider/pom.xml | 2 +- .../sql/CollationSQLExpressionsSuite.scala | 83 ++++++++++++++++++- .../sql/CollationStringExpressionsSuite.scala | 76 ----------------- 3 files changed, 82 insertions(+), 79 deletions(-) diff --git a/connector/kafka-0-10-token-provider/pom.xml b/connector/kafka-0-10-token-provider/pom.xml index 2b2707b9da320..9eda75b368f01 100644 --- a/connector/kafka-0-10-token-provider/pom.xml +++ b/connector/kafka-0-10-token-provider/pom.xml @@ -1,4 +1,4 @@ - +: