Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
when code-generation is not possible then change the plan itself to f…
…allback to normal Spark HashAggregateExec
  • Loading branch information
Sumedh Wale committed Dec 1, 2016
commit 0fe9cae8fcc5bd67c7f7ac1ee9f84fe29b7fe57b
27 changes: 23 additions & 4 deletions core/src/main/scala/org/apache/spark/sql/SnappyStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Final, Partial, PartialMerge}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Final, ImperativeAggregate, Partial, PartialMerge}
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, PhysicalAggregation, PhysicalOperation}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.SnappyHashAggregateExec
import org.apache.spark.sql.execution.aggregate.{AggUtils, SnappyHashAggregateExec}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.DefaultPlanner
import org.apache.spark.sql.streaming._
Expand All @@ -44,7 +44,7 @@ private[sql] trait SnappyStrategies {
}

/** Stream related strategies to map stream specific logical plan to physical plan */
object StreamQueryStrategy extends Strategy {
object StreamQueryStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case LogicalDStreamPlan(output, rowStream) =>
PhysicalDStreamPlan(output, rowStream) :: Nil
Expand Down Expand Up @@ -163,6 +163,7 @@ object SnappyAggregation extends Strategy {
} else {
planAggregateWithOneDistinct(
groupingExpressions,
aggregateExpressions,
functionsWithDistinct,
functionsWithoutDistinct,
resultExpressions,
Expand All @@ -174,12 +175,23 @@ object SnappyAggregation extends Strategy {
case _ => Nil
}

def supportCodegen(aggregateExpressions: Seq[AggregateExpression]): Boolean = {
// ImperativeAggregate is not supported right now in code generation.
!aggregateExpressions.exists(_.aggregateFunction
.isInstanceOf[ImperativeAggregate])
}

def planAggregateWithoutDistinct(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = {
// Check if we can use HashAggregate.
// Check if we can use SnappyHashAggregateExec.

if (!supportCodegen(aggregateExpressions)) {
return AggUtils.planAggregateWithoutDistinct(groupingExpressions,
aggregateExpressions, resultExpressions, child)
}

// 1. Create an Aggregate Operator for partial aggregations.

Expand Down Expand Up @@ -223,10 +235,17 @@ object SnappyAggregation extends Strategy {

def planAggregateWithOneDistinct(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
functionsWithDistinct: Seq[AggregateExpression],
functionsWithoutDistinct: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = {
// Check if we can use SnappyHashAggregateExec.

if (!supportCodegen(aggregateExpressions)) {
return AggUtils.planAggregateWithoutDistinct(groupingExpressions,
aggregateExpressions, resultExpressions, child)
}

// functionsWithDistinct is guaranteed to be non-empty. Even though it
// may contain more than one DISTINCT aggregate function, all of those
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,9 @@ case class ObjectHashMapAccessor(@transient session: SnappySession,
declarations.append(s"final long $nullMaskVar = " +
s"$keyObjVar != null ? $keyObjVar.$nullVar : -1L;\n")
} else {
declarations.append(s"final long $nullMaskVar = $keyObjVar.$nullVar;")
declarations.append(s"final long $nullMaskVar = $keyObjVar.$nullVar;\n")
}
declarations.append(s"long $nullValMaskVar = $nullMaskVar;")
declarations.append(s"long $nullValMaskVar = $nullMaskVar;\n")
nullValMaskVars(index) = nullValMaskVar
nullVar -> (nullMaskVar, nullValMaskVar)
}.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* LICENSE file.
*/
/*
* Adapted from Spark's HashAggregateExec having the license below.
* Some code adapted from Spark's HashAggregateExec having the license below.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ case class LocalJoin(leftKeys: Seq[Expression],
right: SparkPlan)
extends BinaryExecNode with HashJoin with BatchConsumer {

override def nodeName: String = "LocalJoin"

@transient private var mapAccessor: ObjectHashMapAccessor = _
@transient private var hashMapTerm: String = _
@transient private var mapDataTerm: String = _
Expand Down