Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix the bug of revert the null issue in Sum and also the Average UDAF
  • Loading branch information
chenghao-intel committed Mar 27, 2015
commit b539baf87993d1eab09561281027c179b98da7a5
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ case class Sum(child: Expression, distinct: Boolean = false)
@transient var arg: MutableLiteral = _
@transient var sum: Add = _

lazy val DEFAULT_VALUE = Cast(Literal(0, IntegerType), dataType).eval()

override def initialBoundReference(buffers: Seq[BoundReference]) = {
aggr = buffers(0)
arg = MutableLiteral(null, dataType)
Expand All @@ -431,6 +433,10 @@ case class Sum(child: Expression, distinct: Boolean = false)
arg.value = argument
buf(aggr) = sum.eval(buf)
}
} else {
if (buf.isNullAt(aggr)) {
buf(aggr) = DEFAULT_VALUE
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ private[hive] case class HiveGenericUdaf(
// Initialize (reinitialize) the aggregation buffer
override def reset(buf: MutableRow): Unit = {
val buffer = evaluator.getNewAggregationBuffer
.asInstanceOf[GenericUDAFEvaluator.AbstractAggregationBuffer]
evaluator.reset(buffer)
// This is a hack, we never use the mutable row as buffer, but define our own buffer,
// which is set as the first element of the buffer
Expand All @@ -276,27 +275,27 @@ private[hive] case class HiveGenericUdaf(
}.toArray

evaluator.iterate(
buf.getAs[GenericUDAFEvaluator.AbstractAggregationBuffer](bound.ordinal),
buf.getAs[GenericUDAFEvaluator.AggregationBuffer](bound.ordinal),
args)
}

// Merge 2 aggregation buffer, and write back to the later one
override def merge(value: Row, buf: MutableRow): Unit = {
val buffer = buf.getAs[GenericUDAFEvaluator.AbstractAggregationBuffer](bound.ordinal)
val buffer = buf.getAs[GenericUDAFEvaluator.AggregationBuffer](bound.ordinal)
evaluator.merge(buffer, wrap(value.get(bound.ordinal), bufferObjectInspector))
}

@deprecated
override def terminatePartial(buf: MutableRow): Unit = {
val buffer = buf.getAs[GenericUDAFEvaluator.AbstractAggregationBuffer](bound.ordinal)
val buffer = buf.getAs[GenericUDAFEvaluator.AggregationBuffer](bound.ordinal)
// this is for serialization
buf(bound) = unwrap(evaluator.terminatePartial(buffer), bufferObjectInspector)
}

// Output the final result by feeding the aggregation buffer
override def terminate(input: Row): Any = {
unwrap(evaluator.terminate(
input.getAs[GenericUDAFEvaluator.AbstractAggregationBuffer](bound.ordinal)),
input.getAs[GenericUDAFEvaluator.AggregationBuffer](bound.ordinal)),
objectInspector)
}
}
Expand Down