Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
move compress to QuantileSummaries
  • Loading branch information
mgaido91 committed Apr 30, 2018
commit aab21a77bf031b131c8920d1ed2eebd63d4775e5
Original file line number Diff line number Diff line change
Expand Up @@ -206,28 +206,15 @@ object ApproximatePercentile {
* with limited memory. PercentileDigest is backed by [[QuantileSummaries]].
*
* @param summaries underlying probabilistic data structure [[QuantileSummaries]].
* @param isCompressed An internal flag from class [[QuantileSummaries]] to indicate whether the
* underlying quantileSummaries is compressed.
*/
class PercentileDigest(
private var summaries: QuantileSummaries,
// visible for testing
private[sql] var isCompressed: Boolean) {

// Trigger compression if the QuantileSummaries's buffer length exceeds
// compressThresHoldBufferLength. The buffer length can be get by
// quantileSummaries.sampled.length
private[this] final val compressThresHoldBufferLength: Int = {
// Max buffer length after compression.
val maxBufferLengthAfterCompression: Int = (1 / summaries.relativeError).toInt * 2
// A safe upper bound for buffer length before compression
maxBufferLengthAfterCompression * 2
}
class PercentileDigest(private var summaries: QuantileSummaries) {

def this(relativeError: Double) = {
this(new QuantileSummaries(defaultCompressThreshold, relativeError), isCompressed = true)
this(new QuantileSummaries(defaultCompressThreshold, relativeError, compressed = true))
}

private[sql] def isCompressed: Boolean = summaries.compressed

/** Returns compressed object of [[QuantileSummaries]] */
def quantileSummaries: QuantileSummaries = {
if (!isCompressed) compress()
Expand All @@ -237,8 +224,6 @@ object ApproximatePercentile {
/** Insert an observation value into the PercentileDigest data structure. */
def add(value: Double): Unit = {
summaries = summaries.insert(value)
// The result of QuantileSummaries.insert is un-compressed
isCompressed = false
}

/** In-place merges in another PercentileDigest. */
Expand Down Expand Up @@ -275,7 +260,6 @@ object ApproximatePercentile {

private final def compress(): Unit = {
summaries = summaries.compress()
isCompressed = true
}
}

Expand Down Expand Up @@ -330,8 +314,8 @@ object ApproximatePercentile {
sampled(i) = Stats(value, g, delta)
i += 1
}
val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count)
new PercentileDigest(summary, isCompressed = true)
val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count, true)
new PercentileDigest(summary)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
* See the G-K article for more details.
* @param count the count of all the elements *inserted in the sampled buffer*
* (excluding the head buffer)
* @param compressed whether the statistics have been compressed
*/
class QuantileSummaries(
val compressThreshold: Int,
val relativeError: Double,
val sampled: Array[Stats] = Array.empty,
val count: Long = 0L) extends Serializable {
val count: Long = 0L,
var compressed: Boolean = false) extends Serializable {

// a buffer of latest samples seen so far
private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty
Expand All @@ -60,6 +62,7 @@ class QuantileSummaries(
*/
def insert(x: Double): QuantileSummaries = {
headSampled += x
compressed = false
if (headSampled.size >= defaultHeadSize) {
val result = this.withHeadBufferInserted
if (result.sampled.length >= compressThreshold) {
Expand Down Expand Up @@ -135,11 +138,11 @@ class QuantileSummaries(
assert(inserted.count == count + headSampled.size)
val compressed =
compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count, true)
}

private def shallowCopy: QuantileSummaries = {
new QuantileSummaries(compressThreshold, relativeError, sampled, count)
new QuantileSummaries(compressThreshold, relativeError, sampled, count, compressed)
}

/**
Expand All @@ -163,7 +166,7 @@ class QuantileSummaries(
val res = (sampled ++ other.sampled).sortBy(_.value)
val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
new QuantileSummaries(
other.compressThreshold, other.relativeError, comp, other.count + count)
other.compressThreshold, other.relativeError, comp, other.count + count, true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,12 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext {

test("SPARK-24013: unneeded compress can cause performance issues with sorted input") {
val buffer = new PercentileDigest(1.0D / ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY)
var compressCounts = 0
(1 to 10000000).foreach { i =>
buffer.add(i)
assert(!buffer.isCompressed)
if (buffer.isCompressed) compressCounts += 1
}
assert(compressCounts > 0)
buffer.quantileSummaries
assert(buffer.isCompressed)
}
Expand Down