Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ case class TungstenAggregate(
} else {
// This is a grouped aggregate and the input iterator is empty,
// so return an empty iterator.
Iterator[UnsafeRow]()
Iterator.empty
}
} else {
aggregationIterator.start(parentIterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,30 @@ class TungstenAggregationIterator(
// sort-based aggregation (by calling switchToSortBasedAggregation).
private def processInputs(): Unit = {
assert(inputIter != null, "attempted to process input when iterator was null")
while (!sortBased && inputIter.hasNext) {
val newInput = inputIter.next()
numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
if (groupingExpressions.isEmpty) {
// If there is no grouping expressions, we can just reuse the same buffer over and over again.
// Note that it would be better to eliminate the hash map entirely in the future.
val groupingKey = groupProjection.apply(null)
val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
if (buffer == null) {
// buffer == null means that we could not allocate more memory.
// Now, we need to spill the map and switch to sort-based aggregation.
switchToSortBasedAggregation(groupingKey, newInput)
} else {
while (inputIter.hasNext) {
val newInput = inputIter.next()
numInputRows += 1
processRow(buffer, newInput)
}
} else {
while (!sortBased && inputIter.hasNext) {
val newInput = inputIter.next()
numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
if (buffer == null) {
// buffer == null means that we could not allocate more memory.
// Now, we need to spill the map and switch to sort-based aggregation.
switchToSortBasedAggregation(groupingKey, newInput)
} else {
processRow(buffer, newInput)
}
}
}
}

Expand Down