Skip to content
Prev Previous commit
Next Next commit
on viirya's comment
  • Loading branch information
clockfly committed Aug 24, 2016
commit 5904bcd2eb523b6f3e744925a0e9d9da52f6ae0b
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
}

final override def update(buffer: MutableRow, input: InternalRow): Unit = {
val bufferObject = getField(buffer, mutableAggBufferOffset).asInstanceOf[T]
val bufferObject = getField[T](buffer, mutableAggBufferOffset)
update(bufferObject, input)
}

Expand Down Expand Up @@ -535,7 +535,7 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
* The framework calls this method every time after updating/merging one group (group by key).
*/
final def serializeAggregateBufferInPlace(buffer: MutableRow): Unit = {
val bufferObject = getField(buffer, mutableAggBufferOffset).asInstanceOf[T]
val bufferObject = getField[T](buffer, mutableAggBufferOffset)
buffer(mutableAggBufferOffset) = serialize(bufferObject)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class SortBasedAggregationIterator(
private def serializeTypedAggregateBuffer(aggregationBuffer: MutableRow): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused parameter aggregationBuffer. Or replace the following sortBasedAggregationBuffer to aggregationBuffer?

typedImperativeAggregates.foreach { agg =>
// In-place serialization
agg.serializeAggregateBufferInPlace(sortBasedAggregationBuffer)
agg.serializeAggregateBufferInPlace(aggregationBuffer)
}
}

Expand Down