Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement percentile_approx
  • Loading branch information
lw-lin committed Jul 21, 2016
commit f21d74613e414821adab2ed61c19cb3297e30ee6
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ object FunctionRegistry {
expression[Max]("max"),
expression[Average]("mean"),
expression[Min]("min"),
expression[PercentileApprox]("percentile_approx"),
expression[Skewness]("skewness"),
expression[StddevSamp]("std"),
expression[StddevSamp]("stddev"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,200 @@ package org.apache.spark.sql.catalyst.expressions.aggregate

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.QuantileSummaries.Stats
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._

/**
* Computes an approximate percentile (quantile) using the G-K algorithm (see below), for very
* large numbers of rows where the regular percentile() UDAF might run out of memory.
*
* The input is a single double value or an array of double values representing the percentiles
* requested. The output, corresponding to the input, is either an single double value or an
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: an single -> a single

* array of doubles that are the percentile values.
*/
@ExpressionDescription(
usage = """_FUNC_(col, p [, B]) - Returns an approximate pth percentile of a numeric column in the
group. The B parameter, which defaults to 1000, controls approximation accuracy at the cost of
memory; higher values yield better approximations.
_FUNC_(col, array(p1 [, p2]...) [, B]) - Same as above, but accepts and returns an array of
percentile values instead of a single one.
""")
case class PercentileApprox(
child: Expression,
percentilesExpr: Expression,
bExpr: Option[Expression],
percentiles: Seq[Double], // the extracted percentiles
B: Int, // the extracted B
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @lw-lin .
I know the reason why you define this as a capatal 'B', but I'm just wondering it's consistent with Spark naming rule.

Copy link
Contributor Author

@lw-lin lw-lin Aug 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. I don't have strong preference here -- let's see what reviewers say.

resultAsArray: Boolean, // whether to return the result as an array
mutableAggBufferOffset: Int = 0,
inputAggBufferOffset: Int = 0) extends ImperativeAggregate {

private def this(child: Expression, percentilesExpr: Expression, bExpr: Option[Expression]) = {
this(
child = child,
percentilesExpr = percentilesExpr,
bExpr = bExpr,
// validate and extract percentiles
percentiles = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._1,
// validate and extract B
B = bExpr.map(PercentileApprox.validateBLiteral(_)).getOrElse(PercentileApprox.B_DEFAULT),
// validate and mark whether we should return results as array of double or not
resultAsArray = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._2)
}

// Constructor for the "_FUNC_(col, p) / _FUNC_(col, array(p1, ...))" form
def this(child: Expression, percentilesExpr: Expression) = {
this(child, percentilesExpr, None)
}

// Constructor for the "_FUNC_(col, p, B) / _FUNC_(col, array(p1, ...), B)" form
def this(child: Expression, percentilesExpr: Expression, bExpr: Expression) = {
this(child, percentilesExpr, Some(bExpr))
}

override def prettyName: String = "percentile_approx"

override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate =
copy(mutableAggBufferOffset = newMutableAggBufferOffset)

override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
copy(inputAggBufferOffset = newInputAggBufferOffset)

override def children: Seq[Expression] =
bExpr.map(child :: percentilesExpr :: _ :: Nil).getOrElse(child :: percentilesExpr :: Nil)

// we would return null for empty inputs
override def nullable: Boolean = true

override def dataType: DataType = if (resultAsArray) ArrayType(DoubleType) else DoubleType

override def inputTypes: Seq[AbstractDataType] = Seq(NumericType, AnyDataType, IntegralType)

override def checkInputDataTypes(): TypeCheckResult =
TypeUtils.checkForNumericExpr(child.dataType, "function percentile_approx")

// The number of intermediate outputs is highly relative to the actual data-set (an upper bound is
// (11/2e)log(2en), where e is the relativeError parameter, n is the number of items in the
// dataset) -- thus it's hard to allocate agg buffer in advance without knowing the size of
// inputs. Due to this reason, currently we don't support partial mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a bit more about this? AFAIK, hive supports partial aggregate for percentile_approx, and it looks to me that your implementation keeps the buffer data(QuantileSummaries) in this aggregate function object, instead of letting aggregate operator manage it, that's the main reason why we can't support partial aggregate for percentile_approx I think.

Copy link
Contributor Author

@lw-lin lw-lin Aug 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive supports partial aggregate for percentile_approx

Hive's implementation computes approximate percentile values from a histogram, thus Hive supports partial aggregation (but makes no approximation guarantees).

... QuantileSummaries in this aggregate function object, instead of letting aggregate operator manage it, that's the main reason why we can't support partial aggregate

Yes that's quite right. QuantileSummaries has been implemented and well tested prior to this patch, so it'd be great if we can reuse that and put a QuantileSummaries instance directly into the aggregation buffer (in order to support partial aggregation). @cloud-fan any pointer on how to do that please?

Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear, are you saying that our current algorithm to compute percentile_approx can't support partial aggregation fundamentally?

Copy link
Contributor Author

@lw-lin lw-lin Aug 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fundamentally the QuantileSummaries implementation does support RDD-style partial aggregation -- QuantileSummaries itself is the agg buffer of a partition's data at mappers, and multiple QuantileSummariess will be merged at reducers (please refer to StatFunctions.multipleApproxQuantiles).

Fundamentally it should also support the SparkSQL-style partial aggregation. I'm trying to reuse QuantileSummaries here; if there's no easy way to reuse QuantileSummaries, I'm afraid we'll have to re-write this GK algorithm totally to support our SparkSQL-style partial aggregation.

So any way we can just directly put a QuantileSummaries instance into SparkSQL's agg buffer? Or do we have to break a QuantileSummaries instance up, say into an int + a double + a double[] + an int[] + an int[] which SparkSQL's agg buffer can manage?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think putting buffer object in agg buffer row is better, but that need to be well designed. Can you hold this PR for a while? We are discussing about it internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this can wait. Thanks for the information!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some updates, #14753
is created to support putting generic object in aggregation buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clockfly cool!
now that #14754 has been merged, looks like I should update this patch once #14753 gets merged.

override def supportsPartial: Boolean = false

override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes)

override val aggBufferAttributes: Seq[AttributeReference] = Seq()

override val inputAggBufferAttributes: Seq[AttributeReference] = Seq()

private var summary: QuantileSummaries = null

override def initialize(buffer: MutableRow): Unit = {
// Our `PercentileApprox` function takes a `B` parameter, but the underlying GK algorithm takes
// a `relativeError` parameter, so we need to convert `B` to `relativeError`.
// Please refer to SPARK-16283 for details.
val relativeError = PercentileApprox.bToRelativeError(B)
summary = new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, relativeError)
}

override def update(buffer: MutableRow, input: InternalRow): Unit = {
val value = child.eval(input) match {
case o: Byte => o.toDouble
case o: Short => o.toDouble
case o: Int => o.toDouble
case o: Long => o.toDouble
case o: Float => o.toDouble
case o: Decimal => o.toDouble
case o: Double => o
}

summary.insert(value)
}

override def merge(buffer: MutableRow, inputBuffer: InternalRow): Unit = {
sys.error("PercentileApprox does not support partial aggregation.")
}

override def eval(buffer: InternalRow): Any = {
// summary must be compressed before being queried
summary = summary.compress()

if (resultAsArray) {
// return the result as an array of doubles, or return null for empty inputs
if (summary.count > 0) new GenericArrayData(percentiles.map { summary.query(_) }) else null
}
else {
// return the result as a double, or return null for empty inputs
if (summary.count > 0) summary.query(percentiles.head) else null
}
}

private def childrenSQL: String =
if (bExpr.isDefined) {
s"${child.sql}, ${percentilesExpr.toString}, ${bExpr.get.toString}"
}
else {
s"${child.sql}, ${percentilesExpr.toString}"
}

override def sql: String = s"$prettyName($childrenSQL)"

override def sql(isDistinct: Boolean): String = {
val distinct = if (isDistinct) "DISTINCT " else ""
s"$prettyName($distinct$childrenSQL)"
}
}

object PercentileApprox {

// Using 1000 as a default achieves a relativeError of 0.001
private[sql] val B_DEFAULT = 1000

// Our `PercentileApprox` function takes a `B` parameter, but the underlying GK algorithm takes
// a `relativeError` parameter, so we need to convert `B` to `relativeError`.
// Please refer to SPARK-16283 for details.
private[sql] def bToRelativeError(B: Int): Double = Math.max(1.0d / B, 0.001)

/**
* Validates the percentile(s) expression and extract the percentile(s).
* Returns the extracted percentile(s) and an indicator of whether it's an array.
*/
private def validatePercentilesLiteral(exp: Expression): (Seq[Double], Boolean) = {
def withinRange(v: Double): Boolean = 0.0 <= v && v <= 1.0
exp match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also pattern match on the result of the exp.eval(). That would be way easier.

case Literal(f: Float, FloatType) if withinRange(f) => (Seq(f.toDouble), false)
case Literal(d: Double, DoubleType) if withinRange(d) => (Seq(d), false)
case Literal(dec: Decimal, _) if withinRange(dec.toDouble) => (Seq(dec.toDouble), false)

case CreateArray(children: Seq[Expression]) if (children.length > 0) =>
(children.map(_ match {
case Literal(f: Float, FloatType) if withinRange(f) => f.toDouble
case Literal(d: Double, DoubleType) if withinRange(d) => d
case Literal(dec: Decimal, _) if withinRange(dec.toDouble) => dec.toDouble
case _ =>
throw new AnalysisException(
"The second argument should be a double literal or an array of doubles, and should " +
"be within range [0.0, 1.0]")
}), true)

case _ =>
throw new AnalysisException(
"The second argument should be a double literal or an array of doubles, and should " +
"be within range [0.0, 1.0]")
}
}

/** Validates the B expression and extract its value. */
private def validateBLiteral(exp: Expression): Int = exp match {
case Literal(i: Int, IntegerType) if i > 0 => i

case _ =>
throw new AnalysisException("The third argument should be a positive integer literal")
}
}

/**
* Helper class to compute approximate quantile summary.
Expand Down
161 changes: 161 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,167 @@ object functions {
*/
def min(columnName: String): Column = min(Column(columnName))

/**
* :: Experimental ::
*
* Aggregate function: returns an array of approximate percentile values of a numeric column in
* the group.
*
* @param e numeric column to compute approximate percentile values on
* @param pc an array of double values representing the percentiles requested
* @param B approximation accuracy at the cost of memory. Higher values yield better
* approximations; the default is 1,000
*
* @return returns the array of approximate percentile values
*
* @group agg_funcs
* @since 2.1.0
*/
@Experimental
def approxPercentile(e: Column, pc: Seq[Double], B: Int): Column = withAggregateFunction {
new PercentileApprox(e.expr, CreateArray(pc.map(v => Literal(v))), Literal(B))
}

/**
* :: Experimental ::
*
* Aggregate function: returns an array of approximate percentile values of a numeric column in
* the group.
*
* @param columnName numeric column to compute approximate percentile values on
* @param pc an array of double values representing the percentiles requested
* @param B approximation accuracy at the cost of memory. Higher values yield better
* approximations; the default is 1,000
*
* @return returns the array of approximate percentile values
*
* @group agg_funcs
* @since 2.1.0
*/
@Experimental
def approxPercentile(columnName: String, pc: Seq[Double], B: Int): Column = {
approxPercentile(Column(columnName), pc, B)
}

/**
* :: Experimental ::
*
* Aggregate function: returns an array of approximate percentile values of a numeric column in
* the group.
*
* @param e numeric column to compute approximate percentile values on
* @param pc an array of double values representing the percentiles requested
*
* @return returns the array of approximate percentile values
*
* @group agg_funcs
* @since 2.1.0
*/
@Experimental
def approxPercentile(e: Column, pc: Seq[Double]): Column = withAggregateFunction {
new PercentileApprox(e.expr, CreateArray(pc.map(v => Literal(v))))
}

/**
* :: Experimental ::
*
* Aggregate function: returns an array of approximate percentile values of a numeric column in
* the group.
*
* @param columnName numeric column to compute approximate percentile values on
* @param pc an array of double values representing the percentiles requested
*
* @return returns the array of approximate percentile values
*
* @group agg_funcs
* @since 2.1.0
*/
@Experimental
def approxPercentile(columnName: String, pc: Seq[Double]): Column = {
approxPercentile(Column(columnName), pc)
}

/**
* :: Experimental ::
*
* Aggregate function: returns an approximate pth percentile value of a numeric column in the
* group.
*
* @param e numeric column to compute approximate percentile values on
* @param pc the percentile requested
* @param B approximation accuracy at the cost of memory. Higher values yield better
* approximations; the default is 1,000
*
* @return returns the approximate pth percentile value
*
* @group agg_funcs
* @since 2.1.0
*/
@Experimental
def approxPercentile(e: Column, pc: Double, B: Int): Column = withAggregateFunction {
new PercentileApprox(e.expr, Literal(pc), Literal(B))
}

/**
* :: Experimental ::
*
* Aggregate function: returns an approximate pth percentile value of a numeric column in the
* group.
*
* @param columnName numeric column to compute approximate percentile values on
* @param pc the percentile requested
* @param B approximation accuracy at the cost of memory. Higher values yield better
* approximations; the default is 1,000
*
* @return returns the approximate pth percentile value
*
* @group agg_funcs
* @since 2.1.0
*/
@Experimental
def approxPercentile(columnName: String, pc: Double, B: Int): Column = {
approxPercentile(Column(columnName), pc, B)
}

/**
* :: Experimental ::
*
* Aggregate function: returns an approximate pth percentile value of a numeric column in the
* group.
*
* @param e numeric column to compute approximate percentile values on
* @param pc the percentile requested
*
* @return returns the approximate pth percentile value
*
* @group agg_funcs
* @since 2.1.0
*/
@Experimental
def approxPercentile(e: Column, pc: Double): Column = withAggregateFunction {
new PercentileApprox(e.expr, Literal(pc))
}


/**
* :: Experimental ::
*
* Aggregate function: returns an approximate pth percentile value of a numeric column in the
* group.
*
* @param columnName numeric column to compute approximate percentile values on
* @param pc the percentile requested
*
* @return returns the approximate pth percentile value
*
* @group agg_funcs
* @since 2.1.0
*/
@Experimental
def approxPercentile(columnName: String, pc: Double): Column = {
approxPercentile(Column(columnName), pc)
}

/**
* Aggregate function: returns the skewness of the values in a group.
*
Expand Down
Loading