From 652c3178ce8205836d8c0f300e5f8c248b2bd13f Mon Sep 17 00:00:00 2001 From: Sung Chung Date: Tue, 20 May 2014 11:25:53 -0700 Subject: [PATCH 1/2] Adding OWL-QN optimizer for L1 regularizations. It can also handle L2 regularizations together. It extends LBFGS. It uses the OWL-QN implementation from breeze (which didn't work correctly before, but it was also fixed prior to this). It requires the latest version of breeze to work correctly. --- .../spark/mllib/optimization/LBFGS.scala | 12 +- .../spark/mllib/optimization/OWLQN.scala | 135 ++++++++++++ .../spark/mllib/optimization/OWLQNSuite.scala | 202 ++++++++++++++++++ 3 files changed, 343 insertions(+), 6 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/optimization/OWLQN.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/optimization/OWLQNSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index 26a2b62e76ed..b0d0e31ad16c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -36,13 +36,13 @@ import org.apache.spark.mllib.rdd.RDDFunctions._ * @param updater Updater to be used to update weights after every iteration. */ @DeveloperApi -class LBFGS(private var gradient: Gradient, private var updater: Updater) +class LBFGS(protected var gradient: Gradient, protected var updater: Updater) extends Optimizer with Logging { - private var numCorrections = 10 - private var convergenceTol = 1E-4 - private var maxNumIterations = 100 - private var regParam = 0.0 + protected var numCorrections = 10 + protected var convergenceTol = 1E-4 + protected var maxNumIterations = 100 + protected var regParam = 0.0 /** * Set the number of corrections used in the LBFGS update. Default 10. @@ -185,7 +185,7 @@ object LBFGS extends Logging { * CostFun implements Breeze's DiffFunction[T], which returns the loss and gradient * at a particular point (weights). It's used in Breeze's convex optimization routines. */ - private class CostFun( + class CostFun( data: RDD[(Double, Vector)], gradient: Gradient, updater: Updater, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/OWLQN.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/OWLQN.scala new file mode 100644 index 000000000000..4b20adeee0e2 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/OWLQN.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.collection.mutable.ArrayBuffer + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{OWLQN => BreezeOWLQN, CachedDiffFunction} + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Vectors, Vector} + +/** + * :: DeveloperApi :: + * Class used to solve an optimization problem with both L1 and L2 regularizations. + * Spark is used to compute and aggregate statistics needed to do OWL-QN steps. + * The OWLQN class from the breeze library does the orthant-projections and stepping. + * Reference: [[http://machinelearning.wustl.edu/mlpapers/paper_files/icml2007_AndrewG07.pdf]] + * @param gradient Gradient function to be used. + */ +@DeveloperApi +class OWLQN(gradient: Gradient) + extends LBFGS(gradient, new SquaredL2Updater) { + + // This has to be between 0 and 1. + // 1.0 == L1 regularization. 0.0 == L2 regularization + private var alpha = 0.0 + + def setAlpha(alpha: Double): this.type = { + this.alpha = alpha + this + } + + override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { + val (weights, _) = OWLQN.runOWLQN( + data, + gradient, + updater, + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + alpha, + initialWeights) + weights + } +} + +/** + * :: DeveloperApi :: + * Top-level method to run OWLQN. + */ +@DeveloperApi +object OWLQN extends Logging { + /** + * Run OWL-QN in parallel using mini batches. + * The cost function to be used here is exactly the same as L-BFGS (which can handle L2 regularization as well). + * The only difference is that instead of L-BFGS from breeze, we use OWL-QN from breeze and + * we allow the user to specify the alpha that determines regularization weights between L1 and L2. + * + * @param data - Input data for OWLQN. RDD of the set of data examples, each of + * the form (label, [feature values]). + * @param gradient - Gradient object (used to compute the gradient of the loss function of + * one single data example) + * @param updater - Updater function to actually perform a gradient step in a given direction. + * @param numCorrections - The number of corrections used in the OWLQN update. + * @param convergenceTol - The convergence tolerance of iterations for OWLQN + * @param maxNumIterations - Maximal number of iterations that OWLQN can be run. + * @param regParam - Regularization parameter + * @param alpha - Between 0.0 and 1.0. L1 weight becomes alpha * regParam. L2 weight becomes (1 - alpha) * regParam + * @param initialWeights - Initial weights to start the optimization process from. + * + * @return A tuple containing two elements. The first element is a column matrix containing + * weights for every feature, and the second element is an array containing the loss + * computed for every iteration. + */ + def runOWLQN( + data: RDD[(Double, Vector)], + gradient: Gradient, + updater: Updater, + numCorrections: Int, + convergenceTol: Double, + maxNumIterations: Int, + regParam: Double, + alpha: Double, + initialWeights: Vector): (Vector, Array[Double]) = { + + val lossHistory = new ArrayBuffer[Double](maxNumIterations) + + val numExamples = data.count() + + val l1RegParam = alpha * regParam + val l2RegParam = (1.0 - alpha) * regParam + + // Cost function doesn't change from LBFGS because breeze's OWLQN code handles all the L1 related things. + val costFun = + new LBFGS.CostFun(data, gradient, updater, l2RegParam, numExamples) + + val owlqn = new BreezeOWLQN[BDV[Double]](maxNumIterations, numCorrections, l1RegParam, convergenceTol) + + val states = + owlqn.iterations(new CachedDiffFunction(costFun), initialWeights.toBreeze.toDenseVector) + + var state = states.next() + while (states.hasNext) { + lossHistory.append(state.adjustedValue) + state = states.next() + } + + lossHistory.append(state.adjustedValue) + val weights = Vectors.fromBreeze(state.x) + + logInfo("OWLQN.runMiniBatchOWLQN finished. Last 10 losses %s".format( + lossHistory.takeRight(10).mkString(", "))) + + (weights, lossHistory.toArray) + } +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/OWLQNSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/OWLQNSuite.scala new file mode 100644 index 000000000000..104e6a368671 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/OWLQNSuite.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.LocalSparkContext + +class OWLQNSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val gradient = new LogisticGradient() + val numCorrections = 10 + val miniBatchFrac = 1.0 + + val l1Updater = new L1Updater() + val squaredL2Updater = new SquaredL2Updater() + + val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42) + val data = testData.map { case LabeledPoint(label, features) => + label -> Vectors.dense(1.0, features.toArray: _*) + } + + lazy val dataRDD = sc.parallelize(data, 2).cache() + + def compareDouble(x: Double, y: Double, tol: Double = 1E-3): Boolean = { + math.abs(x - y) / (math.abs(y) + 1e-15) < tol + } + + test("OWLQN loss should be decreasing and match the result of Gradient Descent.") { + val regParam = 0.3 + val alpha = 1.0 + + val initialWeightsWithIntercept = Vectors.dense(1.0, initialWeights: _*) + val convergenceTol = 1e-12 + val maxNumIterations = 10 + + val (weights1, loss) = OWLQN.runOWLQN( + dataRDD, + gradient, + squaredL2Updater, + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + alpha, + initialWeightsWithIntercept) + + // Since the cost function is convex, the loss is guaranteed to be monotonically decreasing + // with OWLQN optimizer. + assert((loss, loss.tail).zipped.forall(_ > _), "loss should be monotonically decreasing.") + + val stepSize = 1.0 + + // Well, GD converges slower, so it requires more iterations! + val numGDIterations = 100 + val (weights2, lossGD) = GradientDescent.runMiniBatchSGD( + dataRDD, + gradient, + l1Updater, + stepSize, + numGDIterations, + regParam, + miniBatchFrac, + initialWeightsWithIntercept) + + // GD converges a way slower than OWLQN. To achieve 1% difference, + // it requires 90 iterations in GD. No matter how hard we increase + // the number of iterations in GD here, the lossGD will be always + // larger than lossLBFGS. This is based on observation, no theoretically guaranteed + assert(Math.abs((lossGD.last - loss.last) / loss.last) < 0.02, + "OWLQN should match GD result within 2% difference.") + } + + test("OWLQN with L2 regularization should get the same result as LBFGS with L2 regularization.") { + val regParam = 0.2 + + // Prepare another non-zero weights to compare the loss in the first iteration. + val initialWeightsWithIntercept = Vectors.dense(0.3, 0.12) + val convergenceTol = 1e-12 + val maxNumIterations = 10 + + val (weightLBFGS, lossLBFGS) = LBFGS.runLBFGS( + dataRDD, + gradient, + squaredL2Updater, + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + initialWeightsWithIntercept) + + val (weightOWLQN, lossOWLQN) = OWLQN.runOWLQN( + dataRDD, + gradient, + squaredL2Updater, + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + 0.0, + initialWeightsWithIntercept) + + assert(compareDouble(lossOWLQN(0), lossLBFGS(0)), + "The first losses of LBFGS and OWLQN should be the same.") + + // OWLQN and LBFGS employ different line search, so the results might be slightly different. + assert(compareDouble(lossOWLQN.last, lossLBFGS.last, 0.02), + "The last losses of LBFGS and OWLQN should be within 2% difference.") + + assert(compareDouble(weightLBFGS(0), weightOWLQN(0), 0.02) && + compareDouble(weightLBFGS(1), weightOWLQN(1), 0.02), + "The weight differences between LBFGS and OWLQN should be within 2%.") + } + + test("The convergence criteria should work as expected.") { + val regParam = 0.01 + val alpha = 0.5 + + /** + * For the first run, we set the convergenceTol to 0.0, so that the algorithm will + * run up to the maxNumIterations which is 8 here. + */ + val initialWeightsWithIntercept = Vectors.dense(0.0, 0.0) + val maxNumIterations = 8 + var convergenceTol = 0.0 + + val (weights1, lossOWLQN1) = OWLQN.runOWLQN( + dataRDD, + gradient, + squaredL2Updater, + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + alpha, + initialWeightsWithIntercept) + + // Note that the first loss is computed with initial weights, + // so the total numbers of loss will be numbers of iterations + 1 + assert(lossOWLQN1.length == 9) + + convergenceTol = 0.1 + val (_, lossOWLQN2) = OWLQN.runOWLQN( + dataRDD, + gradient, + squaredL2Updater, + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + alpha, + initialWeightsWithIntercept) + + // Based on observation, lossLBFGS2 runs 3 iterations, no theoretically guaranteed. + assert(lossOWLQN2.length == 4) + assert((lossOWLQN2(2) - lossOWLQN2(3)) / lossOWLQN2(2) < convergenceTol) + + convergenceTol = 0.01 + val (_, lossOWLQN3) = OWLQN.runOWLQN( + dataRDD, + gradient, + squaredL2Updater, + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + alpha, + initialWeightsWithIntercept) + + // With smaller convergenceTol, it takes more steps. + assert(lossOWLQN3.length > lossOWLQN2.length) + + // Based on observation, lossLBFGS2 runs 6 iterations, no theoretically guaranteed. + assert(lossOWLQN3.length == 7) + assert((lossOWLQN3(4) - lossOWLQN3(5)) / lossOWLQN3(4) < convergenceTol) + } +} From 6e833e0dd16acd2063b252fb5e8adb808d893286 Mon Sep 17 00:00:00 2001 From: "Sung H. Chung" Date: Sun, 3 Aug 2014 15:34:48 +0900 Subject: [PATCH 2/2] Updating the breeze version to 0.8.1. --- mllib/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/pom.xml b/mllib/pom.xml index 9a33bd1cf6ad..22341efc5809 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -57,7 +57,7 @@ org.scalanlp breeze_${scala.binary.version} - 0.7 + 0.8.1