Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rebase. Performance improvements...
  • Loading branch information
hvanhovell committed Aug 10, 2015
commit 25c6f42e405b696a25d6691d07a54da976ac83bc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil
}
}

case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>
val convertedWindowExpressions = windowExprs.map { e =>
val converted = e.transformDown(Utils.convertAggregateExpressions)
Expand Down
150 changes: 143 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.types.{DataType, NullType, IntegerType}
import org.apache.spark.sql.types.{Decimal, DataType, NullType, IntegerType}
import org.apache.spark.rdd.RDD
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.collection.CompactBuffer
import scala.collection.mutable

Expand Down Expand Up @@ -824,14 +825,16 @@ private[execution] object AggregateProcessor {
val evaluateProjection = newMutableProjection(evaluateExpressions, bufferSchema)()

// (EXPERI)-MENTAL
val boundUpdateExpressions = BindReferences.bindJoinReferences(
updateExpressions, bufferSchema, inputSchema)
val updateProjection = newMutableProjection(boundUpdateExpressions, Nil)()
//val boundUpdateExpressions = BindReferences.bindJoinReferences(
// updateExpressions, bufferSchema, inputSchema)
val updateProjection = newMutableProjection(updateExpressions, bufferSchema ++ inputSchema)()
val join = new JRow(bufferSchema.size, bufferSchema.size + inputSchema.size)

// Create the processor
new AggregateProcessor(bufferSchema.toArray, initialProjection, updateProjection,
evaluateProjection, aggregates2.toArray, aggregates2OutputOffsets.toArray,
aggregates1.toArray, aggregates1BufferOffsets.toArray, aggregates1OutputOffsets.toArray)
aggregates1.toArray, aggregates1BufferOffsets.toArray, aggregates1OutputOffsets.toArray,
join)
}
}

Expand All @@ -848,9 +851,10 @@ private[execution] final class AggregateProcessor(
private[this] val aggregates2OutputOffsets: Array[Int],
private[this] val aggregates1: Array[AggregateExpression1],
private[this] val aggregates1BufferOffsets: Array[Int],
private[this] val aggregates1OutputOffsets: Array[Int]) {
private[this] val aggregates1OutputOffsets: Array[Int],
private[this] val join: JRow) {

private[this] val join = new JoinedRow
//private[this] val join = new JoinedRow
private[this] val bufferDataTypes = bufferSchema.toSeq.map(_.dataType)
private[this] val aggregates2Size = aggregates2.length
private[this] val aggregates1Size = aggregates1.length
Expand Down Expand Up @@ -922,3 +926,135 @@ private[execution] final class OffsetMutableRow(offset: Int, delegate: MutableRo
def get(i: Int, dataType: DataType): Any = delegate.get(i + offset, dataType)
def numFields: Int = delegate.numFields - offset
}


final class JRow(leftNumFields: Int, totalNumFields: Int) extends InternalRow {
/*
private[this] val mapping = Array.tabulate(totalNumFields) { n =>
if (n < leftNumFields) 0
else 1
}
private[this] val ordinals = Array.tabulate(totalNumFields) { n =>
if (n < leftNumFields) n
else n - leftNumFields
}*/
// Get the sign and flip the bit.
private[this] def row(i:Int) = (((i - leftNumFields) & -0x80000000) >>> 31) ^ 1

//
private[this] def ordinal(i:Int, row: Int) = i - row * leftNumFields

private[this] val rows = new Array[InternalRow](2)

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
rows(0) = r1
rows(1) = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: InternalRow): InternalRow = {
rows(0) = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: InternalRow): InternalRow = {
rows(1) = newRight
this
}

override def toSeq: Seq[Any] = rows.flatMap(_.toSeq)

override def numFields: Int = totalNumFields

override def getUTF8String(i: Int): UTF8String = {
val r = row(i)
rows(r).getUTF8String(ordinal(i, r))
}

override def getBinary(i: Int): Array[Byte] = {
val r = row(i)
rows(r).getBinary(ordinal(i, r))
}

override def get(i: Int, dataType: DataType): Any = {
val r = row(i)
rows(r).get(ordinal(i, r), dataType)
}

override def isNullAt(i: Int): Boolean = {
val r = row(i)
rows(r).isNullAt(ordinal(i, r))
}

override def getInt(i: Int): Int = {
val r = row(i)
rows(r).getInt(ordinal(i, r))
}

override def getLong(i: Int): Long = {
val r = row(i)
rows(r).getLong(ordinal(i, r))
}

override def getDouble(i: Int): Double = {
val r = row(i)
rows(r).getDouble(ordinal(i, r))
}

override def getBoolean(i: Int): Boolean = {
val r = row(i)
rows(r).getBoolean(ordinal(i, r))
}

override def getShort(i: Int): Short = {
val r = row(i)
rows(r).getShort(ordinal(i, r))
}

override def getByte(i: Int): Byte = {
val r = row(i)
rows(r).getByte(ordinal(i, r))
}

override def getFloat(i: Int): Float = {
val r = row(i)
rows(r).getFloat(ordinal(i, r))
}

override def getDecimal(i: Int, precision: Int, scale: Int): Decimal = {
val r = row(i)
rows(r).getDecimal(ordinal(i, r), precision, scale)
}

override def getStruct(i: Int, numFields: Int): InternalRow = {
val r = row(i)
rows(r).getStruct(ordinal(i, r), numFields)
}

override def copy(): InternalRow = {
val copiedValues = new Array[Any](totalNumFields)
var i = 0
while (i < totalNumFields) {
copiedValues(i) = get(i)
i += 1
}
new GenericInternalRow(copiedValues)
}

override def toString: String = {
// Make sure toString never throws NullPointerException.
val Array(row1, row2) = rows
if ((row1 eq null) && (row2 eq null)) {
"[ empty row ]"
} else if (row1 eq null) {
row2.mkString("[", ",", "]")
} else if (row2 eq null) {
row1.mkString("[", ",", "]")
} else {
mkString("[", ",", "]")
}
}
}