From fa68a22424bacedfed023f141787773e6d2bbb79 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Jul 2015 13:49:59 -0700 Subject: [PATCH 1/3] Make eval() and genCode() final --- .../apache/spark/sql/catalyst/expressions/Expression.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 29ae47e842dd..3f72e6e184db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -184,10 +184,10 @@ abstract class Expression extends TreeNode[Expression] { */ trait Unevaluable extends Expression { - override def eval(input: InternalRow = null): Any = + final override def eval(input: InternalRow = null): Any = throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") - override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = + final override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") } From 65329c2fb97f8a864ad199e02089a80ad29596c9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Jul 2015 14:08:11 -0700 Subject: [PATCH 2/3] Do not have AggregateFunction1 inherit from AggregateExpression1 --- .../catalyst/expressions/aggregate/interfaces.scala | 4 ---- .../spark/sql/catalyst/expressions/aggregates.scala | 11 +++++------ 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 577ede73cb01..37b57cffd82b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -63,10 +63,6 @@ private[sql] case object Complete extends AggregateMode */ private[sql] case object NoOp extends Expression with Unevaluable { override def nullable: Boolean = true - override def eval(input: InternalRow): Any = { - throw new TreeNodeException( - this, s"No function to evaluate expression. type: ${this.nodeName}") - } override def dataType: DataType = NullType override def children: Seq[Expression] = Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index e07c920a41d0..d3295b8bafa8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ import org.apache.spark.util.collection.OpenHashSet @@ -71,8 +71,7 @@ trait PartialAggregate1 extends AggregateExpression1 { * A specific implementation of an aggregate function. Used to wrap a generic * [[AggregateExpression1]] with an algorithm that will be used to compute one specific result. */ -abstract class AggregateFunction1 - extends LeafExpression with AggregateExpression1 with Serializable { +abstract class AggregateFunction1 extends LeafExpression with Serializable { /** Base should return the generic aggregate expression that this function is computing */ val base: AggregateExpression1 @@ -82,9 +81,9 @@ abstract class AggregateFunction1 def update(input: InternalRow): Unit - // Do we really need this? - override def newInstance(): AggregateFunction1 = { - makeCopy(productIterator.map { case a: AnyRef => a }.toArray) + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + throw new UnsupportedOperationException( + "AggregateFunction1 should not be used for generated aggregates") } } From 8d9ed22fb07fcc1fcb794f843981bd933b0376ef Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Jul 2015 15:39:09 -0700 Subject: [PATCH 3/3] AlgebraicAggregate should extend Unevaluable --- .../catalyst/expressions/aggregate/interfaces.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 37b57cffd82b..d3fee1ade05e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -147,8 +147,7 @@ abstract class AggregateFunction2 /** * A helper class for aggregate functions that can be implemented in terms of catalyst expressions. */ -abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable { - self: Product => +abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable with Unevaluable { val initialValues: Seq[Expression] val updateExpressions: Seq[Expression] @@ -184,19 +183,15 @@ abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable { } } - override def update(buffer: MutableRow, input: InternalRow): Unit = { + override final def update(buffer: MutableRow, input: InternalRow): Unit = { throw new UnsupportedOperationException( "AlgebraicAggregate's update should not be called directly") } - override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = { + override final def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = { throw new UnsupportedOperationException( "AlgebraicAggregate's merge should not be called directly") } - override def eval(buffer: InternalRow): Any = { - throw new UnsupportedOperationException( - "AlgebraicAggregate's eval should not be called directly") - } }