Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
removed constructor
  • Loading branch information
rubenljanssen committed May 26, 2017
commit a33ea0d7601d7b14e50536e3c457847145e799ae
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.expressions.Aggregator
// This file defines internal implementations for aggregators.
////////////////////////////////////////////////////////////////////////////////////////////////////

abstract class TypedAggregator[IN, BUF: TypeTag, OUT: TypeTag, FRESULT, JAVA](f: IN => FRESULT)
abstract class TypedAggregator[IN, BUF: TypeTag, OUT: TypeTag, JAVA]
extends Aggregator[IN, BUF, OUT] {

def bufferEncoder: Encoder[BUF] = ExpressionEncoder[BUF]()
Expand All @@ -41,7 +41,7 @@ abstract class TypedAggregator[IN, BUF: TypeTag, OUT: TypeTag, FRESULT, JAVA](f:
}

class TypedSumDouble[IN](val f: IN => Double)
extends TypedAggregator[IN, Double, Double, Double, java.lang.Double](f) {
extends TypedAggregator[IN, Double, Double, java.lang.Double] {

override def zero: Double = 0.0
override def reduce(b: Double, a: IN): Double = b + f(a)
Expand All @@ -54,7 +54,7 @@ class TypedSumDouble[IN](val f: IN => Double)


class TypedSumLong[IN](val f: IN => Long)
extends TypedAggregator[IN, Long, Long, Long, java.lang.Long](f) {
extends TypedAggregator[IN, Long, Long, java.lang.Long] {

override def zero: Long = 0L
override def reduce(b: Long, a: IN): Long = b + f(a)
Expand All @@ -66,7 +66,7 @@ class TypedSumLong[IN](val f: IN => Long)
}

class TypedCount[IN](val f: IN => Any)
extends TypedAggregator[IN, Long, Long, Any, java.lang.Long](f) {
extends TypedAggregator[IN, Long, Long, java.lang.Long] {

override def zero: Long = 0
override def reduce(b: Long, a: IN): Long = if (f(a) == null) b else b + 1
Expand All @@ -79,7 +79,7 @@ class TypedCount[IN](val f: IN => Any)
}

class TypedAverage[IN](val f: IN => Double)
extends TypedAggregator[IN, (Double, Long), Double, Double, java.lang.Double](f) {
extends TypedAggregator[IN, (Double, Long), Double, java.lang.Double] {

override def zero: (Double, Long) = (0.0, 0L)
override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
Expand Down