Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rebase...
  • Loading branch information
hvanhovell committed Aug 10, 2015
commit e792325b636dbcfec0725636a68030db0f95b2fa
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ case class ReduceSetAlgebraic(left: Expression, right: AlgebraicAggregate)
case class ReduceSetAggregate(left: Expression, right: AggregateFunction2)
extends BinaryExpression with CodegenFallback {

right.mutableBufferOffset = 0
right.withNewMutableBufferOffset(0)

override def dataType: DataType = right.dataType

Expand Down
34 changes: 20 additions & 14 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ private[execution] object AggregateProcessor {
val ref = distinctExpressionSchemaMap(agg.children)
evaluateExpressions += ReduceSetAlgebraic(ref, agg)
case (agg: AlgebraicAggregate, false, _) =>
agg.mutableBufferOffset = bufferSchema.size
agg.withNewMutableBufferOffset(bufferSchema.size)
bufferSchema ++= agg.bufferAttributes
initialValues ++= agg.initialValues
updateExpressions ++= agg.updateExpressions
Expand All @@ -801,7 +801,7 @@ private[execution] object AggregateProcessor {
val boundAgg = BindReferences.bindReference(agg, inputSchema)
aggregates2 += boundAgg
aggregates2OutputOffsets += i
agg.mutableBufferOffset = bufferSchema.size
agg.withNewMutableBufferOffset(bufferSchema.size)
bufferSchema ++= boundAgg.bufferAttributes
val nops = Seq.fill(boundAgg.bufferAttributes.size)(NoOp)
initialValues ++= nops
Expand Down Expand Up @@ -888,7 +888,7 @@ private[execution] final class AggregateProcessor(
}
i = 0
while (i < aggregates1Size) {
buffer.getAs[AggregateFunction1](aggregates1BufferOffsets(i), null).update(input)
buffer.get(aggregates1BufferOffsets(i), null).asInstanceOf[AggregateFunction1].update(input)
i += 1
}
}
Expand All @@ -913,7 +913,7 @@ private[execution] final class AggregateProcessor(
}
i = 0
while (i < aggregates1Size) {
val function = buffer.getAs[AggregateFunction1](aggregates1BufferOffsets(i), null)
val function = buffer.get(aggregates1BufferOffsets(i), null).asInstanceOf[AggregateFunction1]
val value = function.eval(EmptyRow)
target.update(aggregates1OutputOffsets(i), value)
i += 1
Expand All @@ -925,8 +925,18 @@ private[execution] final class OffsetMutableRow(offset: Int, delegate: MutableRo
extends MutableRow {
def setNullAt(i: Int): Unit = delegate.setNullAt(i + offset)
def update(i: Int, value: Any): Unit = delegate.update(i + offset, value)
def get(i: Int, dataType: DataType): Any = delegate.get(i + offset, dataType)
def genericGet(i: Int): Any = delegate.genericGet(i + offset)
def numFields: Int = delegate.numFields - offset
def copy(): InternalRow = {
val numFields = delegate.numFields
val values = new Array[Any](numFields)
var i = 0
while (i < numFields) {
values(i) = delegate.genericGet(i)
i += 1
}
new OffsetMutableRow(offset, new GenericMutableRow(values))
}
}


Expand Down Expand Up @@ -982,9 +992,9 @@ final class JRow(private[this] val numLeftFields: Int, val numFields: Int) exten
rows(r).getBinary(ordinal(i, r))
}

override def get(i: Int, dataType: DataType): Any = {
override def genericGet(i: Int): Any = {
val r = row(i)
rows(r).get(ordinal(i, r), dataType)
rows(r).genericGet(ordinal(i, r))
}

override def isNullAt(i: Int): Boolean = {
Expand Down Expand Up @@ -1038,13 +1048,9 @@ final class JRow(private[this] val numLeftFields: Int, val numFields: Int) exten
}

override def copy(): InternalRow = {
val copiedValues = new Array[Any](numFields)
var i = 0
while (i < numFields) {
copiedValues(i) = get(i)
i += 1
}
new GenericInternalRow(copiedValues)
val row = new JRow(numLeftFields, numFields)
row(rows(0).copy(), rows(1).copy())
row
}

override def toString: String = {
Expand Down