Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f3fa658
work
thunterdb Mar 3, 2017
7539835
work on the test suite
thunterdb Mar 6, 2017
673943f
last work
thunterdb Mar 7, 2017
202b672
work on using imperative aggregators
thunterdb Mar 13, 2017
be01981
Merge remote-tracking branch 'upstream/master' into 19634
thunterdb Mar 17, 2017
a983284
more work on summarizer
thunterdb Mar 18, 2017
647a4fe
work
thunterdb Mar 21, 2017
3c4bef7
changes
thunterdb Mar 21, 2017
56390cc
Merge remote-tracking branch 'upstream/master' into 19634
thunterdb Mar 21, 2017
c3f236c
cleanup
thunterdb Mar 21, 2017
ef955c0
debugging
thunterdb Mar 21, 2017
a04f923
work
thunterdb Mar 21, 2017
946d490
Merge remote-tracking branch 'upstream/master' into 19634
thunterdb Mar 22, 2017
201eb77
debug
thunterdb Mar 22, 2017
f4dec88
trying to debug serialization issue
thunterdb Mar 23, 2017
4af0f47
better tests
thunterdb Mar 23, 2017
9f29030
changes
thunterdb Mar 24, 2017
e9877dc
debugging
thunterdb Mar 24, 2017
3a11d02
more tests and debugging
thunterdb Mar 24, 2017
6d26c17
fixed tests
thunterdb Mar 24, 2017
35eaeb0
doc
thunterdb Mar 24, 2017
58b17dc
cleanups
thunterdb Mar 24, 2017
18078c1
cleanups
thunterdb Mar 24, 2017
ffe5cfe
Cleanups
thunterdb Mar 24, 2017
41f4be6
Cleanups
thunterdb Mar 24, 2017
ba200bb
Cleanups
thunterdb Mar 24, 2017
2f809ef
Merge remote-tracking branch 'upstream/master' into 19634
thunterdb Mar 27, 2017
662f62c
small test to find perf issues
thunterdb Mar 28, 2017
96be071
Current speed:
thunterdb Mar 30, 2017
a569dac
BLAS calls for dense interface
thunterdb Mar 30, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
better tests
  • Loading branch information
thunterdb committed Mar 23, 2017
commit 4af0f47d326ef91d7cf9ccaf6a45ee3f904b191f
151 changes: 98 additions & 53 deletions mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import breeze.numerics
import org.apache.spark.SparkException
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.{BLAS, SQLDataTypes, Vector, Vectors}
import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors, VectorUDT}
import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData, UnsafeProjection, UnsafeRow}
Expand Down Expand Up @@ -191,7 +190,7 @@ object SummaryBuilderImpl extends Logging {
("max", Max, arrayDType, Seq(ComputeMax)),
("min", Min, arrayDType, Seq(ComputeMin)),
("normL2", NormL2, arrayDType, Seq(ComputeTotalWeightSum, ComputeM2)),
("normL1", Min, arrayDType, Seq(ComputeTotalWeightSum, ComputeL1))
("normL1", NormL1, arrayDType, Seq(ComputeTotalWeightSum, ComputeL1))
)

/**
Expand Down Expand Up @@ -277,30 +276,32 @@ object SummaryBuilderImpl extends Logging {
}
}

def bufferSchema: StructType = {
val at = ArrayType(DoubleType, containsNull = false)
val al = ArrayType(LongType, containsNull = false)
val bufferSchema: StructType = {
val fields = Seq(
"n" -> IntegerType,
"mean" -> at,
"m2n" -> at,
"m2" -> at,
"l1" -> at,
"mean" -> arrayDType,
"m2n" -> arrayDType,
"m2" -> arrayDType,
"l1" -> arrayDType,
"totalCount" -> LongType,
"totalWeightSum" -> DoubleType,
"totalWeightSquareSum" -> DoubleType,
"weightSum" -> at,
"nnz" -> al,
"max" -> at,
"min" -> at
"weightSum" -> arrayDType,
"nnz" -> arrayLType,
"max" -> arrayDType,
"min" -> arrayDType
)
StructType(fields.map { case (name, t) => StructField(name, t, nullable = true)})
}

val numFields = bufferSchema.fields.length

def updateInPlace(buffer: Buffer, v: Vector, w: Double): Unit = {
val startN = buffer.n
if (startN == -1) {
resizeArraysInPlace(buffer, v.size)
// The buffer was not initialized, we initialize it with the incoming row.
fillBufferWithRow(buffer, v, w)
return
} else {
require(startN == v.size,
s"Trying to insert a vector of size $v into a buffer that " +
Expand Down Expand Up @@ -368,7 +369,7 @@ object SummaryBuilderImpl extends Logging {
if (other.n == -1) {
buffer
} else {
mergeBuffers0(buffer, other)
mergeInitializedBuffers(buffer, other)
buffer
}
}
Expand All @@ -379,7 +380,7 @@ object SummaryBuilderImpl extends Logging {
*/
def read(bytes: Array[Byte], row2: UnsafeRow): Buffer = {
row2.pointTo(bytes, bytes.length)
val row = row2.getStruct(0, 12)
val row = row2.getStruct(0, numFields)
new Buffer(
n = row.getInt(0),
mean = nullableArrayD(row, 1),
Expand Down Expand Up @@ -471,39 +472,6 @@ object SummaryBuilderImpl extends Logging {
buffer.l1
}

private def resizeArraysInPlace(buffer: Buffer, n: Int): Unit = {
// It should either be unsized or of the same size.
require(buffer.n == -1 || buffer.n == n, (buffer.n, n))
if (buffer.n == n) {
return
}
buffer.n = n
// Conditional resize of the non-null arrays
if (buffer.mean != null) {
buffer.mean = Array.ofDim(n)
}
if (buffer.m2n != null) {
buffer.m2n = Array.ofDim(n)
}
if (buffer.m2 != null) {
buffer.m2 = Array.ofDim(n)
}
if (buffer.l1 != null) {
buffer.l1 = Array.ofDim(n)
}
if (buffer.weightSum != null) {
buffer.weightSum = Array.ofDim(n)
}
if (buffer.nnz != null) {
buffer.nnz = Array.ofDim(n)
}
if (buffer.max != null) {
buffer.max = Array.ofDim(n)
}
if (buffer.min != null) {
buffer.min = Array.ofDim(n)
}
}

private def gadD(arr: Array[Double]): UnsafeArrayData = {
if (arr == null) {
Expand Down Expand Up @@ -563,11 +531,88 @@ object SummaryBuilderImpl extends Logging {
}
}

/**
* Sets the content of a buffer based on a single row (initialization).
*
* The buffer must be uninitialized first.
*/
private def fillBufferWithRow(buffer: Buffer, v: Vector, w: Double): Unit = {
require(buffer.n == -1, (buffer.n, buffer))
buffer.n = v.size
buffer.totalCount = 1L
buffer.totalWeightSum = w
buffer.totalWeightSquareSum = w * w

val arr = v.toArray
if (buffer.mean != null) {
buffer.mean = arr.clone()
}
if (buffer.m2n != null) {
buffer.m2n = Array.ofDim(buffer.n)
}
if (buffer.max != null) {
buffer.max = arr.clone()
}
if (buffer.min != null) {
buffer.min = arr.clone()
}

// The rest of these operations have efficient bulk versions.
v match {
case dv: DenseVector =>
if (buffer.m2 != null) {
buffer.m2 = Array.ofDim(buffer.n)
b(buffer.m2) := w * (b(arr) :* b(arr))
}
if (buffer.l1 != null) {
buffer.l1 = Array.ofDim(buffer.n)
b(buffer.l1) := numerics.abs(b(arr))
}

case sv: SparseVector =>
if (buffer.m2 != null) {
buffer.m2 = Array.ofDim(buffer.n)
v.foreachActive { (index, value) =>
buffer.weightSum(index) = w * value * value
}
}

if (buffer.l1 != null) {
buffer.l1 = Array.ofDim(buffer.n)
v.foreachActive { (index, value) =>
buffer.weightSum(index) = w * math.abs(value)
}
}


// In the case of the weightSum and NNZ, we also have to account for the value of
// the elements.
if (buffer.weightSum != null) {
buffer.weightSum = Array.ofDim(buffer.n)
v.foreachActive { (index, value) =>
if (value != 0.0) {
buffer.weightSum(index) = w
}
}
}

if (buffer.nnz != null) {
buffer.nnz = Array.ofDim(buffer.n)
v.foreachActive { (index, value) =>
if (value != 0.0) {
buffer.nnz(index) = 1L
}
}
}

}
}


/**
* Merges other into buffer.
*/
private def mergeBuffers0(buffer: Buffer, other: Buffer): Unit = {
private def mergeInitializedBuffers(buffer: Buffer, other: Buffer): Unit = {
// Each buffer needs to be properly initialized.
require(buffer.n > 0 && other.n > 0, (buffer.n, other.n))
require(buffer.n == other.n, (buffer.n, other.n))
Expand Down Expand Up @@ -647,7 +692,7 @@ object SummaryBuilderImpl extends Logging {
inputAggBufferOffset: Int)
extends TypedImperativeAggregate[Buffer] {

// private lazy val row = new UnsafeRow(1)
private lazy val row = new UnsafeRow(Buffer.numFields)

override def eval(buff: Buffer): InternalRow = {
val metrics = requested.map({
Expand Down Expand Up @@ -699,7 +744,7 @@ object SummaryBuilderImpl extends Logging {

override def deserialize(bytes: Array[Byte]): Buffer = {
// Buffer.read(bytes, row)
Buffer.read(bytes, new UnsafeRow(1))
Buffer.read(bytes, new UnsafeRow(Buffer.numFields))
}

override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): MetricsAggregate = {
Expand Down
118 changes: 71 additions & 47 deletions mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,52 +52,75 @@ class SummarizerSuite extends SparkFunSuite with MLlibTestSparkContext {
// are expected for this input. They currently test against some fixed subset of the
// metrics, but should be made fuzzy in the future.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fuzzy?


private def testExample(input: Seq[Any], exp: ExpectedMetrics): Unit = {
val inputVec: Seq[Vector] = input.map {
private def testExample(name: String, input: Seq[Any], exp: ExpectedMetrics): Unit = {
def inputVec: Seq[Vector] = input.map {
case x: Array[Double] => Vectors.dense(x)
case x: Seq[Double] => Vectors.dense(x.toArray)
case x: Vector => x
case x => throw new Exception(x.toString)
}
val df = sc.parallelize(inputVec).map(Tuple1.apply).toDF("features")
val c = df.col("features")

compare(df.select(metrics("mean").summary(c), mean(c)), Seq(Row(exp.mean), exp.mean))

compare(df.select(metrics("variance").summary(c), variance(c)),
Seq(Row(exp.variance), exp.variance))

compare(df.select(metrics("count").summary(c), count(c)),
Seq(Row(exp.count), exp.count))

compare(df.select(metrics("numNonZeros").summary(c), numNonZeros(c)),
Seq(Row(exp.numNonZeros), exp.numNonZeros))
def wrapped() = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename wrapped -> wrappedInit

val df = sc.parallelize(inputVec).map(Tuple1.apply).toDF("features")
val c = df.col("features")
df -> c
}

println("STARTING MIN")
registerTest(s"$name - mean only") {
val (df, c) = wrapped()
compare(df.select(metrics("mean").summary(c), mean(c)), Seq(Row(exp.mean), exp.mean))
}

compare(df.select(metrics("min").summary(c), min(c)),
Seq(Row(exp.min), exp.min))
registerTest(s"$name - variance only") {
val (df, c) = wrapped()
compare(df.select(metrics("variance").summary(c), variance(c)),
Seq(Row(exp.variance), exp.variance))
}

println("STARTING MAX")
registerTest(s"$name - count only") {
val (df, c) = wrapped()
compare(df.select(metrics("count").summary(c), count(c)),
Seq(Row(exp.count), exp.count))
}

compare(df.select(metrics("max").summary(c), max(c)),
Seq(Row(exp.max), exp.max))
registerTest(s"$name - numNonZeros only") {
val (df, c) = wrapped()
compare(df.select(metrics("numNonZeros").summary(c), numNonZeros(c)),
Seq(Row(exp.numNonZeros), exp.numNonZeros))
}

println("STARTING NORML1")
registerTest(s"$name - min only") {
val (df, c) = wrapped()
compare(df.select(metrics("min").summary(c), min(c)),
Seq(Row(exp.min), exp.min))
}

compare(df.select(metrics("normL1").summary(c), normL1(c)),
Seq(Row(exp.normL1), exp.normL1))
registerTest(s"$name - max only") {
val (df, c) = wrapped()
compare(df.select(metrics("max").summary(c), max(c)),
Seq(Row(exp.max), exp.max))
}

println("STARTING NORML2")
registerTest(s"$name - normL1 only") {
val (df, c) = wrapped()
compare(df.select(metrics("normL1").summary(c), normL1(c)),
Seq(Row(exp.normL1), exp.normL1))
}

compare(df.select(metrics("normL2").summary(c), normL2(c)),
Seq(Row(exp.normL2), exp.normL2))
registerTest(s"$name - normL2 only") {
val (df, c) = wrapped()
compare(df.select(metrics("normL2").summary(c), normL2(c)),
Seq(Row(exp.normL2), exp.normL2))
}

compare(df.select(
metrics("mean", "variance", "count", "numNonZeros").summary(c),
mean(c), variance(c), count(c), numNonZeros(c)),
Seq(Row(exp.mean, exp.variance, exp.count, exp.numNonZeros),
exp.mean, exp.variance, exp.count, exp.numNonZeros))
registerTest(s"$name - all metrics at once") {
val (df, c) = wrapped()
compare(df.select(
metrics("mean", "variance", "count", "numNonZeros").summary(c),
mean(c), variance(c), count(c), numNonZeros(c)),
Seq(Row(exp.mean, exp.variance, exp.count, exp.numNonZeros),
exp.mean, exp.variance, exp.count, exp.numNonZeros))
}
}

private def denseData(input: Seq[Seq[Double]]): DataFrame = {
Expand Down Expand Up @@ -145,6 +168,8 @@ class SummarizerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => throw new Exception(s"$name: ${x1.getClass} ${x2.getClass} $x1 $x2")
}



test("debugging test") {
val df = denseData(Nil)
println(s">>> df=${df.collect().toSeq}")
Expand Down Expand Up @@ -175,32 +200,31 @@ class SummarizerSuite extends SparkFunSuite with MLlibTestSparkContext {
compare(res, Seq(Row(0L), 0L))
}

test("single element in vector, all metrics") {
val x = Seq(1.0, 2.0)
testExample(Seq(x), ExpectedMetrics(
{
val x = Seq(0.0, 1.0, 2.0)
testExample("single element", Seq(x), ExpectedMetrics(
mean = x,
variance = Seq(0.0, 0.0),
variance = Seq(0.0, 0.0, 0.0),
count = 1,
numNonZeros = Seq(1, 1),
numNonZeros = Seq(0, 1, 1),
max = x,
min = x,
normL1 = x,
normL2 = x
))
}

test("repeated metrics") {
testExample("two elements", Seq(Seq(0.0, 1.0, 2.0), Seq(0.0, -1.0, -2.0)), ExpectedMetrics(
mean = Seq(0.0, 0.0, 0.0),
variance = Seq(0.0, 1.0, 2.0),
count = 2,
numNonZeros = Seq(0, 2, 2),
max = Seq(0.0, 1.0, 2.0),
min = Seq(0.0, -1.0, -2.0),
normL1 = Seq(0.0, 2.0, 4.0),
normL2 = Seq(0.0, 2.0, 4.0)
))

}

test("dense vector input") {

}

test("sparse vector input") {

}

test("merging summarizer when one side has zero mean (SPARK-4355)") {
}
}