Skip to content

Commit e0dcb3c

Browse files
mgaido91markhamstra
authored andcommitted
[SPARK-24013][SQL] Remove unneeded compress in ApproximatePercentile
## What changes were proposed in this pull request? `ApproximatePercentile` contains a workaround logic to compress the samples since at the beginning `QuantileSummaries` was ignoring the compression threshold. This problem was fixed in SPARK-17439, but the workaround logic was not removed. So we are compressing the samples many more times than needed: this could lead to critical performance degradation. This can create serious performance issues in queries like: ``` select approx_percentile(id, array(0.1)) from range(10000000) ``` ## How was this patch tested? added UT Author: Marco Gaido <[email protected]> Closes apache#21133 from mgaido91/SPARK-24013.
1 parent cc96c94 commit e0dcb3c

File tree

3 files changed

+26
-31
lines changed

3 files changed

+26
-31
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -206,27 +206,15 @@ object ApproximatePercentile {
206206
* with limited memory. PercentileDigest is backed by [[QuantileSummaries]].
207207
*
208208
* @param summaries underlying probabilistic data structure [[QuantileSummaries]].
209-
* @param isCompressed An internal flag from class [[QuantileSummaries]] to indicate whether the
210-
* underlying quantileSummaries is compressed.
211209
*/
212-
class PercentileDigest(
213-
private var summaries: QuantileSummaries,
214-
private var isCompressed: Boolean) {
215-
216-
// Trigger compression if the QuantileSummaries's buffer length exceeds
217-
// compressThresHoldBufferLength. The buffer length can be get by
218-
// quantileSummaries.sampled.length
219-
private[this] final val compressThresHoldBufferLength: Int = {
220-
// Max buffer length after compression.
221-
val maxBufferLengthAfterCompression: Int = (1 / summaries.relativeError).toInt * 2
222-
// A safe upper bound for buffer length before compression
223-
maxBufferLengthAfterCompression * 2
224-
}
210+
class PercentileDigest(private var summaries: QuantileSummaries) {
225211

226212
def this(relativeError: Double) = {
227-
this(new QuantileSummaries(defaultCompressThreshold, relativeError), isCompressed = true)
213+
this(new QuantileSummaries(defaultCompressThreshold, relativeError, compressed = true))
228214
}
229215

216+
private[sql] def isCompressed: Boolean = summaries.compressed
217+
230218
/** Returns compressed object of [[QuantileSummaries]] */
231219
def quantileSummaries: QuantileSummaries = {
232220
if (!isCompressed) compress()
@@ -236,14 +224,6 @@ object ApproximatePercentile {
236224
/** Insert an observation value into the PercentileDigest data structure. */
237225
def add(value: Double): Unit = {
238226
summaries = summaries.insert(value)
239-
// The result of QuantileSummaries.insert is un-compressed
240-
isCompressed = false
241-
242-
// Currently, QuantileSummaries ignores the construction parameter compressThresHold,
243-
// which may cause QuantileSummaries to occupy unbounded memory. We have to hack around here
244-
// to make sure QuantileSummaries doesn't occupy infinite memory.
245-
// TODO: Figure out why QuantileSummaries ignores construction parameter compressThresHold
246-
if (summaries.sampled.length >= compressThresHoldBufferLength) compress()
247227
}
248228

249229
/** In-place merges in another PercentileDigest. */
@@ -280,7 +260,6 @@ object ApproximatePercentile {
280260

281261
private final def compress(): Unit = {
282262
summaries = summaries.compress()
283-
isCompressed = true
284263
}
285264
}
286265

@@ -335,8 +314,8 @@ object ApproximatePercentile {
335314
sampled(i) = Stats(value, g, delta)
336315
i += 1
337316
}
338-
val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count)
339-
new PercentileDigest(summary, isCompressed = true)
317+
val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count, true)
318+
new PercentileDigest(summary)
340319
}
341320
}
342321

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
4040
* See the G-K article for more details.
4141
* @param count the count of all the elements *inserted in the sampled buffer*
4242
* (excluding the head buffer)
43+
* @param compressed whether the statistics have been compressed
4344
*/
4445
class QuantileSummaries(
4546
val compressThreshold: Int,
4647
val relativeError: Double,
4748
val sampled: Array[Stats] = Array.empty,
48-
val count: Long = 0L) extends Serializable {
49+
val count: Long = 0L,
50+
var compressed: Boolean = false) extends Serializable {
4951

5052
// a buffer of latest samples seen so far
5153
private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty
@@ -60,6 +62,7 @@ class QuantileSummaries(
6062
*/
6163
def insert(x: Double): QuantileSummaries = {
6264
headSampled += x
65+
compressed = false
6366
if (headSampled.size >= defaultHeadSize) {
6467
val result = this.withHeadBufferInserted
6568
if (result.sampled.length >= compressThreshold) {
@@ -135,11 +138,11 @@ class QuantileSummaries(
135138
assert(inserted.count == count + headSampled.size)
136139
val compressed =
137140
compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
138-
new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
141+
new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count, true)
139142
}
140143

141144
private def shallowCopy: QuantileSummaries = {
142-
new QuantileSummaries(compressThreshold, relativeError, sampled, count)
145+
new QuantileSummaries(compressThreshold, relativeError, sampled, count, compressed)
143146
}
144147

145148
/**
@@ -163,7 +166,7 @@ class QuantileSummaries(
163166
val res = (sampled ++ other.sampled).sortBy(_.value)
164167
val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
165168
new QuantileSummaries(
166-
other.compressThreshold, other.relativeError, comp, other.count + count)
169+
other.compressThreshold, other.relativeError, comp, other.count + count, true)
167170
}
168171
}
169172

sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql
1919

2020
import java.sql.{Date, Timestamp}
2121

22+
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
2223
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY
2324
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.PercentileDigest
2425
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -279,4 +280,16 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext {
279280
checkAnswer(query, expected)
280281
}
281282
}
283+
284+
test("SPARK-24013: unneeded compress can cause performance issues with sorted input") {
285+
val buffer = new PercentileDigest(1.0D / ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY)
286+
var compressCounts = 0
287+
(1 to 10000000).foreach { i =>
288+
buffer.add(i)
289+
if (buffer.isCompressed) compressCounts += 1
290+
}
291+
assert(compressCounts > 0)
292+
buffer.quantileSummaries
293+
assert(buffer.isCompressed)
294+
}
282295
}

0 commit comments

Comments
 (0)