Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,27 +206,15 @@ object ApproximatePercentile {
* with limited memory. PercentileDigest is backed by [[QuantileSummaries]].
*
* @param summaries underlying probabilistic data structure [[QuantileSummaries]].
* @param isCompressed An internal flag from class [[QuantileSummaries]] to indicate whether the
* underlying quantileSummaries is compressed.
*/
class PercentileDigest(
private var summaries: QuantileSummaries,
private var isCompressed: Boolean) {

// Trigger compression if the QuantileSummaries's buffer length exceeds
// compressThresHoldBufferLength. The buffer length can be get by
// quantileSummaries.sampled.length
private[this] final val compressThresHoldBufferLength: Int = {
// Max buffer length after compression.
val maxBufferLengthAfterCompression: Int = (1 / summaries.relativeError).toInt * 2
// A safe upper bound for buffer length before compression
maxBufferLengthAfterCompression * 2
}
class PercentileDigest(private var summaries: QuantileSummaries) {

def this(relativeError: Double) = {
this(new QuantileSummaries(defaultCompressThreshold, relativeError), isCompressed = true)
this(new QuantileSummaries(defaultCompressThreshold, relativeError, compressed = true))
}

private[sql] def isCompressed: Boolean = summaries.compressed

/** Returns compressed object of [[QuantileSummaries]] */
def quantileSummaries: QuantileSummaries = {
if (!isCompressed) compress()
Expand All @@ -236,14 +224,6 @@ object ApproximatePercentile {
/** Insert an observation value into the PercentileDigest data structure. */
def add(value: Double): Unit = {
summaries = summaries.insert(value)
// The result of QuantileSummaries.insert is un-compressed
isCompressed = false

// Currently, QuantileSummaries ignores the construction parameter compressThresHold,
// which may cause QuantileSummaries to occupy unbounded memory. We have to hack around here
// to make sure QuantileSummaries doesn't occupy infinite memory.
// TODO: Figure out why QuantileSummaries ignores construction parameter compressThresHold
if (summaries.sampled.length >= compressThresHoldBufferLength) compress()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested if this change doesn't cause compress() to not be called at all, and memory consumption to go ubounded, but it appears to be working good - the mem usage through jmap -histo:live when running sql("select approx_percentile(id, array(0.1)) from range(10000000000L)").collect() remains stable.
The compress() is being called from QuantileSummaries.insert(), so it seems that the above TODO got resolved at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the TODO was resolved in SPARK-17439. I thought I clearly stated it in the description, but if this is not the case or you have any suggestion about how to improve the description, I am happy to improve it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it's my fault of not reading the description attentively :-).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem at all, thanks for checking this :) I addressed you comment on the test. Any more comments?

}

/** In-place merges in another PercentileDigest. */
Expand Down Expand Up @@ -280,7 +260,6 @@ object ApproximatePercentile {

private final def compress(): Unit = {
summaries = summaries.compress()
isCompressed = true
}
}

Expand Down Expand Up @@ -335,8 +314,8 @@ object ApproximatePercentile {
sampled(i) = Stats(value, g, delta)
i += 1
}
val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count)
new PercentileDigest(summary, isCompressed = true)
val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count, true)
new PercentileDigest(summary)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
* See the G-K article for more details.
* @param count the count of all the elements *inserted in the sampled buffer*
* (excluding the head buffer)
* @param compressed whether the statistics have been compressed
*/
class QuantileSummaries(
val compressThreshold: Int,
val relativeError: Double,
val sampled: Array[Stats] = Array.empty,
val count: Long = 0L) extends Serializable {
val count: Long = 0L,
var compressed: Boolean = false) extends Serializable {

// a buffer of latest samples seen so far
private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty
Expand All @@ -60,6 +62,7 @@ class QuantileSummaries(
*/
def insert(x: Double): QuantileSummaries = {
headSampled += x
compressed = false
if (headSampled.size >= defaultHeadSize) {
val result = this.withHeadBufferInserted
if (result.sampled.length >= compressThreshold) {
Expand Down Expand Up @@ -135,11 +138,11 @@ class QuantileSummaries(
assert(inserted.count == count + headSampled.size)
val compressed =
compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count, true)
}

private def shallowCopy: QuantileSummaries = {
new QuantileSummaries(compressThreshold, relativeError, sampled, count)
new QuantileSummaries(compressThreshold, relativeError, sampled, count, compressed)
}

/**
Expand All @@ -163,7 +166,7 @@ class QuantileSummaries(
val res = (sampled ++ other.sampled).sortBy(_.value)
val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
new QuantileSummaries(
other.compressThreshold, other.relativeError, comp, other.count + count)
other.compressThreshold, other.relativeError, comp, other.count + count, true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql

import java.sql.{Date, Timestamp}

import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.PercentileDigest
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -279,4 +280,16 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(query, expected)
}
}

test("SPARK-24013: unneeded compress can cause performance issues with sorted input") {
val buffer = new PercentileDigest(1.0D / ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY)
var compressCounts = 0
(1 to 10000000).foreach { i =>
buffer.add(i)
if (buffer.isCompressed) compressCounts += 1
}
assert(compressCounts > 0)
buffer.quantileSummaries
assert(buffer.isCompressed)
}
}