Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ private[feature] trait StandardScalerParams extends Params with HasInputCol with

/**
* Centers the data with mean before scaling.
* It will build a dense output, so this does not work on sparse input
* It will build a dense output, so this does not work on sparse input
* and will raise an exception.
* Default: false
* @group param
*/
val withMean: BooleanParam = new BooleanParam(this, "withMean", "Center data with mean")

/**
* Scales the data to unit standard deviation.
* Default: true
Expand All @@ -68,13 +68,13 @@ class StandardScaler(override val uid: String) extends Estimator[StandardScalerM

/** @group setParam */
def setOutputCol(value: String): this.type = set(outputCol, value)

/** @group setParam */
def setWithMean(value: Boolean): this.type = set(withMean, value)

/** @group setParam */
def setWithStd(value: Boolean): this.type = set(withStd, value)

override def fit(dataset: DataFrame): StandardScalerModel = {
transformSchema(dataset.schema, logging = true)
val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private class LeastSquaresAggregator(
}
(weightsArray, -sum + labelMean / labelStd, weightsArray.length)
}

private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray)

private val gradientSumArray = Array.ofDim[Double](dim)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ private[python] class PythonMLLibAPI extends Serializable {
val sigma = si.map(_.asInstanceOf[DenseMatrix])
val gaussians = Array.tabulate(weight.length){
i => new MultivariateGaussian(mean(i), sigma(i))
}
}
val model = new GaussianMixtureModel(weight, gaussians)
model.predictSoft(data).map(Vectors.dense)
}
Expand Down Expand Up @@ -494,7 +494,7 @@ private[python] class PythonMLLibAPI extends Serializable {
def normalizeVector(p: Double, rdd: JavaRDD[Vector]): JavaRDD[Vector] = {
new Normalizer(p).transform(rdd)
}

/**
* Java stub for StandardScaler.fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import org.apache.spark.util.Utils
* independent Gaussian distributions with associated "mixing" weights
* specifying each's contribution to the composite.
*
* Given a set of sample points, this class will maximize the log-likelihood
* for a mixture of k Gaussians, iterating until the log-likelihood changes by
* Given a set of sample points, this class will maximize the log-likelihood
* for a mixture of k Gaussians, iterating until the log-likelihood changes by
* less than convergenceTol, or until it has reached the max number of iterations.
* While this process is generally guaranteed to converge, it is not guaranteed
* to find a global optimum.
* to find a global optimum.
*
* Note: For high-dimensional data (with many features), this algorithm may perform poorly.
* This is due to high-dimensional data (a) making it difficult to cluster at all (based
Expand All @@ -53,24 +53,24 @@ import org.apache.spark.util.Utils
*/
@Experimental
class GaussianMixture private (
private var k: Int,
private var convergenceTol: Double,
private var k: Int,
private var convergenceTol: Double,
private var maxIterations: Int,
private var seed: Long) extends Serializable {

/**
* Constructs a default instance. The default parameters are {k: 2, convergenceTol: 0.01,
* maxIterations: 100, seed: random}.
*/
def this() = this(2, 0.01, 100, Utils.random.nextLong())

// number of samples per cluster to use when initializing Gaussians
private val nSamples = 5
// an initializing GMM can be provided rather than using the

// an initializing GMM can be provided rather than using the
// default random starting point
private var initialModel: Option[GaussianMixtureModel] = None

/** Set the initial GMM starting point, bypassing the random initialization.
* You must call setK() prior to calling this method, and the condition
* (model.k == this.k) must be met; failure will result in an IllegalArgumentException
Expand All @@ -83,37 +83,37 @@ class GaussianMixture private (
}
this
}

/** Return the user supplied initial GMM, if supplied */
def getInitialModel: Option[GaussianMixtureModel] = initialModel

/** Set the number of Gaussians in the mixture model. Default: 2 */
def setK(k: Int): this.type = {
this.k = k
this
}

/** Return the number of Gaussians in the mixture model */
def getK: Int = k

/** Set the maximum number of iterations to run. Default: 100 */
def setMaxIterations(maxIterations: Int): this.type = {
this.maxIterations = maxIterations
this
}

/** Return the maximum number of iterations to run */
def getMaxIterations: Int = maxIterations

/**
* Set the largest change in log-likelihood at which convergence is
* Set the largest change in log-likelihood at which convergence is
* considered to have occurred.
*/
def setConvergenceTol(convergenceTol: Double): this.type = {
this.convergenceTol = convergenceTol
this
}

/**
* Return the largest change in log-likelihood at which convergence is
* considered to have occurred.
Expand All @@ -132,41 +132,41 @@ class GaussianMixture private (
/** Perform expectation maximization */
def run(data: RDD[Vector]): GaussianMixtureModel = {
val sc = data.sparkContext

// we will operate on the data as breeze data
val breezeData = data.map(_.toBreeze).cache()

// Get length of the input vectors
val d = breezeData.first().length

// Determine initial weights and corresponding Gaussians.
// If the user supplied an initial GMM, we use those values, otherwise
// we start with uniform weights, a random mean from the data, and
// diagonal covariance matrices using component variances
// derived from the samples
// derived from the samples
val (weights, gaussians) = initialModel match {
case Some(gmm) => (gmm.weights, gmm.gaussians)

case None => {
val samples = breezeData.takeSample(withReplacement = true, k * nSamples, seed)
(Array.fill(k)(1.0 / k), Array.tabulate(k) { i =>
(Array.fill(k)(1.0 / k), Array.tabulate(k) { i =>
val slice = samples.view(i * nSamples, (i + 1) * nSamples)
new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
})
}
}
var llh = Double.MinValue // current log-likelihood

var llh = Double.MinValue // current log-likelihood
var llhp = 0.0 // previous log-likelihood

var iter = 0
while (iter < maxIterations && math.abs(llh-llhp) > convergenceTol) {
// create and broadcast curried cluster contribution function
val compute = sc.broadcast(ExpectationSum.add(weights, gaussians)_)

// aggregate the cluster contribution for all sample points
val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _ += _)

// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
val sumWeights = sums.weights.sum
Expand All @@ -179,22 +179,22 @@ class GaussianMixture private (
gaussians(i) = new MultivariateGaussian(mu, sums.sigmas(i) / sums.weights(i))
i = i + 1
}

llhp = llh // current becomes previous
llh = sums.logLikelihood // this is the freshly computed log-likelihood
iter += 1
}
}

new GaussianMixtureModel(weights, gaussians)
}

/** Average of dense breeze vectors */
private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
val v = BDV.zeros[Double](x(0).length)
x.foreach(xi => v += xi)
v / x.length.toDouble
v / x.length.toDouble
}

/**
* Construct matrix where diagonal entries are element-wise
* variance of input vectors (computes biased variance)
Expand All @@ -210,14 +210,14 @@ class GaussianMixture private (
// companion class to provide zero constructor for ExpectationSum
private object ExpectationSum {
def zero(k: Int, d: Int): ExpectationSum = {
new ExpectationSum(0.0, Array.fill(k)(0.0),
new ExpectationSum(0.0, Array.fill(k)(0.0),
Array.fill(k)(BDV.zeros(d)), Array.fill(k)(BreezeMatrix.zeros(d, d)))
}

// compute cluster contributions for each input point
// (U, T) => U for aggregation
def add(
weights: Array[Double],
weights: Array[Double],
dists: Array[MultivariateGaussian])
(sums: ExpectationSum, x: BV[Double]): ExpectationSum = {
val p = weights.zip(dists).map {
Expand All @@ -235,7 +235,7 @@ private object ExpectationSum {
i = i + 1
}
sums
}
}
}

// Aggregation class for partial expectation results
Expand All @@ -244,9 +244,9 @@ private class ExpectationSum(
val weights: Array[Double],
val means: Array[BDV[Double]],
val sigmas: Array[BreezeMatrix[Double]]) extends Serializable {

val k = weights.length

def +=(x: ExpectationSum): ExpectationSum = {
var i = 0
while (i < k) {
Expand All @@ -257,5 +257,5 @@ private class ExpectationSum(
}
logLikelihood += x.logLikelihood
this
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ import org.apache.spark.sql.{SQLContext, Row}
/**
* :: Experimental ::
*
* Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points
* are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are
* the respective mean and covariance for each Gaussian distribution i=1..k.
*
* Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points
* are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are
* the respective mean and covariance for each Gaussian distribution i=1..k.
*
* @param weights Weights for each Gaussian distribution in the mixture, where weights(i) is
* the weight for Gaussian i, and weights.sum == 1
* @param gaussians Array of MultivariateGaussian where gaussians(i) represents
* the Multivariate Gaussian (Normal) Distribution for Gaussian i
*/
@Experimental
class GaussianMixtureModel(
val weights: Array[Double],
val weights: Array[Double],
val gaussians: Array[MultivariateGaussian]) extends Serializable with Saveable{

require(weights.length == gaussians.length, "Length of weight and Gaussian arrays must match")

override protected def formatVersion = "1.0"
Expand All @@ -64,20 +64,20 @@ class GaussianMixtureModel(
val responsibilityMatrix = predictSoft(points)
responsibilityMatrix.map(r => r.indexOf(r.max))
}

/**
* Given the input vectors, return the membership value of each vector
* to all mixture components.
* to all mixture components.
*/
def predictSoft(points: RDD[Vector]): RDD[Array[Double]] = {
val sc = points.sparkContext
val bcDists = sc.broadcast(gaussians)
val bcWeights = sc.broadcast(weights)
points.map { x =>
points.map { x =>
computeSoftAssignments(x.toBreeze.toDenseVector, bcDists.value, bcWeights.value, k)
}
}

/**
* Compute the partial assignments for each vector
*/
Expand All @@ -89,7 +89,7 @@ class GaussianMixtureModel(
val p = weights.zip(dists).map {
case (weight, dist) => MLUtils.EPSILON + weight * dist.pdf(pt)
}
val pSum = p.sum
val pSum = p.sum
for (i <- 0 until k) {
p(i) /= pSum
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class PowerIterationClustering private[clustering] (
import org.apache.spark.mllib.clustering.PowerIterationClustering._

/** Constructs a PIC instance with default parameters: {k: 2, maxIterations: 100,
* initMode: "random"}.
* initMode: "random"}.
*/
def this() = this(k = 2, maxIterations = 100, initMode = "random")

Expand Down Expand Up @@ -243,7 +243,7 @@ object PowerIterationClustering extends Logging {

/**
* Generates random vertex properties (v0) to start power iteration.
*
*
* @param g a graph representing the normalized affinity matrix (W)
* @return a graph with edges representing W and vertices representing a random vector
* with unit 1-norm
Expand All @@ -266,7 +266,7 @@ object PowerIterationClustering extends Logging {
* Generates the degree vector as the vertex properties (v0) to start power iteration.
* It is not exactly the node degrees but just the normalized sum similarities. Call it
* as degree vector because it is used in the PIC paper.
*
*
* @param g a graph representing the normalized affinity matrix (W)
* @return a graph with edges representing W and vertices representing the degree vector
*/
Expand All @@ -276,7 +276,7 @@ object PowerIterationClustering extends Logging {
val v0 = g.vertices.mapValues(_ / sum)
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
}

/**
* Runs power iteration.
* @param g input graph with edges representing the normalized affinity matrix (W) and vertices
Expand Down
Loading