Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-48712][SQL] Perf Improvement for encode with empty value or UT…
…F-8 charset
  • Loading branch information
yaooqinn committed Jun 25, 2024
commit 8d36c1e82bc11357974f00bd48ff67dcd29dc92b
Original file line number Diff line number Diff line change
Expand Up @@ -2835,7 +2835,7 @@ case class Encode(
object Encode {
def apply(value: Expression, charset: Expression): Encode = new Encode(value, charset)

private[expressions] final lazy val VALID_CHARSETS =
private[sql] final lazy val VALID_CHARSETS =
Set("US-ASCII", "ISO-8859-1", "UTF-8", "UTF-16BE", "UTF-16LE", "UTF-16", "UTF-32")

def encode(
Expand All @@ -2844,6 +2844,9 @@ object Encode {
legacyCharsets: Boolean,
legacyErrorAction: Boolean): Array[Byte] = {
val toCharset = charset.toString
if (input.numBytes == 0 || "UTF-8".equalsIgnoreCase(toCharset)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually a behavior change. If the input bytes are not valid utf 8 encoding, previously the result was not the same as the input bytes, but now it is.

We should either remove this utf 8 shortcut, or check the input bytes to see if it's valid utf8 encoding first.

cc @yaooqinn

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that we will encode the unmappable characters to mojibakes before this PR, but now we use its identity?

Do you think we can call input.isValid to check here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think so. For the happy path it's still faster than doing the actual encoding, and invalid utf8 bytes should be rare so it's ok to have an extra isValid call.

return input.getBytes
}
if (legacyCharsets || VALID_CHARSETS.contains(toCharset.toUpperCase(Locale.ROOT))) {
val encoder = try {
val codingErrorAction = if (legacyErrorAction) {
Expand Down
6 changes: 6 additions & 0 deletions sql/core/benchmarks/EncodeBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
OpenJDK 64-Bit Server VM 17.0.10+0 on Mac OS X 14.5
Apple M2 Max
encode: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF-8 79270 79698 448 0.3 3963.5 1.0X

Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.benchmark

import org.apache.spark.benchmark.Benchmark

/**
* Benchmark for measuring perf of different Base64 implementations
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar> <sql core test jar>
* 2. build/sbt "sql/Test/runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain <this class>"
* Results will be written to "benchmarks/EncodeBenchmark-results.txt".
* }}}
*/
object EncodeBenchmark extends SqlBasedBenchmark {
import spark.implicits._
private val N = 20L * 1000 * 1000

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
withTempPath { path =>
// scalastyle:off nonascii
val exprs = Seq(
"",
"Spark",
"白日依山尽,黄河入海流。欲穷千里目,更上一层楼。",
"το Spark είναι το πιο δημοφιλές πλαίσιο επεξεργασίας μεγάλων δεδομένων παγκοσμίως",
"세계에서 가장 인기 있는 빅데이터 처리 프레임워크인 Spark",
"Sparkは世界で最も人気のあるビッグデータ処理フレームワークである。")
// scalastyle:off nonascii

spark.range(N).map { i =>
val idx = (i % 6).toInt
val str = exprs(idx)
(str, str * 3, str * 5, str * 9, "")
}.write.parquet(path.getCanonicalPath)

val utf8 = new Benchmark("encode", N, output = output)
utf8.addCase("UTF-8", 3) { _ =>
spark.read.parquet(path.getCanonicalPath).selectExpr(
"encode(_1, 'UTF-8')",
"encode(_2, 'UTF-8')",
"encode(_3, 'UTF-8')",
"encode(_4, 'UTF-8')",
"encode(_5, 'UTF-8')").noop()
}
utf8.run()
}
}
}