From 25ef2acab961b7c1089704a72932c48bf75ce34c Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Thu, 2 Jul 2015 16:53:29 +0900 Subject: [PATCH 01/15] Resolve conflicts with rebasing --- .../mllib/api/python/PythonMLLibAPI.scala | 20 +++++++ python/pyspark/mllib/clustering.py | 58 ++++++++++++++++++- 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index e628059c4af8..160c33a1c6bf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -502,6 +502,26 @@ private[python] class PythonMLLibAPI extends Serializable { new MatrixFactorizationModelWrapper(model) } + /** + * Java stub for Python mllib LDA.run() + */ + def trainLDAModel( + data: JavaRDD[LabeledPoint], + k: Int, + seed: java.lang.Long): LDAModel = { + val algo = new LDA() + .setK(k) + + if (seed != null) algo.setSeed(seed) + + try { + algo.run(data.rdd.map(x => (x.label.toLong, x.features))) + } finally { + data.rdd.unpersist(blocking = false) + } + } + + /** * Java stub for Python mllib FPGrowth.train(). This stub returns a handle * to the Java object instead of the content of the Java object. Extra care diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index ed4d78a2c678..e261d22d0251 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -31,13 +31,15 @@ from pyspark.rdd import RDD, ignore_unicode_prefix from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector +from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.stat.distribution import MultivariateGaussian from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable from pyspark.streaming import DStream __all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture', 'PowerIterationClusteringModel', 'PowerIterationClustering', - 'StreamingKMeans', 'StreamingKMeansModel'] + 'StreamingKMeans', 'StreamingKMeansModel', + 'LDA', 'LDAModel'] @inherit_doc @@ -574,5 +576,59 @@ def _test(): exit(-1) +class LDAModel(JavaModelWrapper): + + """ A clustering model derived from the LDA method. + + Latent Dirichlet Allocation (LDA), a topic model designed for text documents. + Terminologyu + - "word" = "term": an element of the vocabulary + - "token": instance of a term appearing in a document + - "topic": multinomial distribution over words representing some concept + References: + - Original LDA paper (journal version): + Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003. + + >>> from pyspark.mllib.linalg import Vectors + >>> from collections import namedtuple + >>> from numpy.testing import assert_almost_equal + >>> data = [ + ... LabeledPoint(1, [0.0, 1.0]), + ... LabeledPoint(2, [1.0, 0.0]), + ... ] + >>> rdd = sc.parallelize(data) + >>> model = LDA.train(rdd, 2) + >>> model.vocabSize() + 2 + >>> topics = model.topicsMatrix() + >>> topics_expect = array([[0.5, 0.5], [0.5, 0.5]]) + >>> assert_almost_equal(topics, topics_expect, 1) + """ + + def topicsMatrix(self): + """Inferred topics, where each topic is represented by a distribution over terms.""" + return self.call("topicsMatrix").toArray() + + def vocabSize(self): + """Vocabulary size (number of terms or terms in the vocabulary)""" + return self.call("vocabSize") + + def describeTopics(self, maxTermsPerTopic=None): + """Return the topics described by weighted terms. + + TODO: + Implementing this method is a little hard. Since Scala's return value consistes of tuples. + """ + raise NotImplementedError("LDAModel.describeTopics() in Python must be implemented.") + + +class LDA(): + + @classmethod + def train(cls, rdd, k, seed=None): + model = callMLlibFunc("trainLDAModel", rdd, k, seed) + return LDAModel(model) + + if __name__ == "__main__": _test() From 68f0653c793f565e4ec26e3666658c71b2c4aa77 Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Sat, 13 Jun 2015 06:54:01 +0900 Subject: [PATCH 02/15] Support some parameters for `ALS.train()` in Python --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 16 +++++++++++++--- python/pyspark/mllib/clustering.py | 10 +++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 160c33a1c6bf..ecaa885967ad 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -506,11 +506,21 @@ private[python] class PythonMLLibAPI extends Serializable { * Java stub for Python mllib LDA.run() */ def trainLDAModel( - data: JavaRDD[LabeledPoint], - k: Int, - seed: java.lang.Long): LDAModel = { + data: JavaRDD[LabeledPoint], + k: Int, + maxIterations: Int, + docConcentration: Double, + topicConcentration: Double, + seed: java.lang.Long, + checkpointInterval: Int, + optimizer: String): LDAModel = { val algo = new LDA() .setK(k) + .setMaxIterations(maxIterations) + .setDocConcentration(docConcentration) + .setTopicConcentration(topicConcentration) + .setCheckpointInterval(checkpointInterval) + .setOptimizer(optimizer) if (seed != null) algo.setSeed(seed) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index e261d22d0251..66a037856dea 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -597,7 +597,7 @@ class LDAModel(JavaModelWrapper): ... LabeledPoint(2, [1.0, 0.0]), ... ] >>> rdd = sc.parallelize(data) - >>> model = LDA.train(rdd, 2) + >>> model = LDA.train(rdd, k=2) >>> model.vocabSize() 2 >>> topics = model.topicsMatrix() @@ -625,8 +625,12 @@ def describeTopics(self, maxTermsPerTopic=None): class LDA(): @classmethod - def train(cls, rdd, k, seed=None): - model = callMLlibFunc("trainLDAModel", rdd, k, seed) + def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0, + topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"): + """Train a LDA model.""" + model = callMLlibFunc("trainLDAModel", rdd, k, maxIterations, + docConcentration, topicConcentration, seed, + checkpointInterval, optimizer) return LDAModel(model) From 77fd1b7b1c4960df79d0a597c0eae0010c35b666 Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Fri, 19 Jun 2015 11:11:14 +0900 Subject: [PATCH 03/15] Not use LabeledPoint --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 13 ++++++++----- python/pyspark/mllib/clustering.py | 4 ++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index ecaa885967ad..a957fd86388f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -506,7 +506,7 @@ private[python] class PythonMLLibAPI extends Serializable { * Java stub for Python mllib LDA.run() */ def trainLDAModel( - data: JavaRDD[LabeledPoint], + data: JavaRDD[java.util.List[Any]], k: Int, maxIterations: Int, docConcentration: Double, @@ -524,11 +524,14 @@ private[python] class PythonMLLibAPI extends Serializable { if (seed != null) algo.setSeed(seed) - try { - algo.run(data.rdd.map(x => (x.label.toLong, x.features))) - } finally { - data.rdd.unpersist(blocking = false) + val documents = data.rdd.map(_.asScala.toArray).map { r => + r(0).getClass.getSimpleName match { + case "Integer" => (r(0).asInstanceOf[java.lang.Integer].toLong, r(1).asInstanceOf[Vector]) + case "Long" => (r(0).asInstanceOf[java.lang.Long].toLong, r(1).asInstanceOf[Vector]) + case _ => throw new IllegalArgumentException("input values contains invalid type value.") + } } + algo.run(documents) } diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 66a037856dea..b00287880c6d 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -593,8 +593,8 @@ class LDAModel(JavaModelWrapper): >>> from collections import namedtuple >>> from numpy.testing import assert_almost_equal >>> data = [ - ... LabeledPoint(1, [0.0, 1.0]), - ... LabeledPoint(2, [1.0, 0.0]), + ... [1, Vectors.dense([0.0, 1.0])], + ... [2, SparseVector(2, {0: 1.0})], ... ] >>> rdd = sc.parallelize(data) >>> model = LDA.train(rdd, k=2) From 8117e18f992724fe4c1c256e2cefbda36e990529 Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 24 Jun 2015 01:25:32 +0900 Subject: [PATCH 04/15] Fix the validation problems by `lint-scala` --- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index a957fd86388f..03d3f2f8eab3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -526,8 +526,8 @@ private[python] class PythonMLLibAPI extends Serializable { val documents = data.rdd.map(_.asScala.toArray).map { r => r(0).getClass.getSimpleName match { - case "Integer" => (r(0).asInstanceOf[java.lang.Integer].toLong, r(1).asInstanceOf[Vector]) - case "Long" => (r(0).asInstanceOf[java.lang.Long].toLong, r(1).asInstanceOf[Vector]) + case "Integer" => (r(0).asInstanceOf[java.lang.Integer].toLong, r(1).asInstanceOf[Vector]) + case "Long" => (r(0).asInstanceOf[java.lang.Long].toLong, r(1).asInstanceOf[Vector]) case _ => throw new IllegalArgumentException("input values contains invalid type value.") } } From 39514ecfe538cb1cc14da7943814d1ceef1db559 Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 24 Jun 2015 07:41:09 +0900 Subject: [PATCH 05/15] Modify how to cast the input data --- .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 03d3f2f8eab3..15e7432dafad 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -525,9 +525,11 @@ private[python] class PythonMLLibAPI extends Serializable { if (seed != null) algo.setSeed(seed) val documents = data.rdd.map(_.asScala.toArray).map { r => - r(0).getClass.getSimpleName match { - case "Integer" => (r(0).asInstanceOf[java.lang.Integer].toLong, r(1).asInstanceOf[Vector]) - case "Long" => (r(0).asInstanceOf[java.lang.Long].toLong, r(1).asInstanceOf[Vector]) + r(0) match { + case i: java.lang.Integer => + (r(0).asInstanceOf[java.lang.Integer].toLong, r(1).asInstanceOf[Vector]) + case i: java.lang.Long => + (r(0).asInstanceOf[java.lang.Long].toLong, r(1).asInstanceOf[Vector]) case _ => throw new IllegalArgumentException("input values contains invalid type value.") } } From 22788295feb04531447f1695625bd6ab8a86624c Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 24 Jun 2015 07:46:11 +0900 Subject: [PATCH 06/15] Fix the indentation --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 15e7432dafad..b0110fec6953 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -515,12 +515,12 @@ private[python] class PythonMLLibAPI extends Serializable { checkpointInterval: Int, optimizer: String): LDAModel = { val algo = new LDA() - .setK(k) - .setMaxIterations(maxIterations) - .setDocConcentration(docConcentration) - .setTopicConcentration(topicConcentration) - .setCheckpointInterval(checkpointInterval) - .setOptimizer(optimizer) + .setK(k) + .setMaxIterations(maxIterations) + .setDocConcentration(docConcentration) + .setTopicConcentration(topicConcentration) + .setCheckpointInterval(checkpointInterval) + .setOptimizer(optimizer) if (seed != null) algo.setSeed(seed) From 73412c3c4a9c755ecd074b1b03300770bd18443f Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 24 Jun 2015 07:49:24 +0900 Subject: [PATCH 07/15] Fix the typo --- python/pyspark/mllib/clustering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index b00287880c6d..2ee42a8a653a 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -581,7 +581,7 @@ class LDAModel(JavaModelWrapper): """ A clustering model derived from the LDA method. Latent Dirichlet Allocation (LDA), a topic model designed for text documents. - Terminologyu + Terminology - "word" = "term": an element of the vocabulary - "token": instance of a term appearing in a document - "topic": multinomial distribution over words representing some concept From 57ac03d899c9e3893e5a4a75f867d20eef02c9d3 Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 24 Jun 2015 07:50:20 +0900 Subject: [PATCH 08/15] Remove the unnecessary import in Python unit testing --- python/pyspark/mllib/clustering.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 2ee42a8a653a..5f33be30676e 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -590,7 +590,6 @@ class LDAModel(JavaModelWrapper): Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003. >>> from pyspark.mllib.linalg import Vectors - >>> from collections import namedtuple >>> from numpy.testing import assert_almost_equal >>> data = [ ... [1, Vectors.dense([0.0, 1.0])], From 98f645a2c3b64ffc86ebdecaaab037cc93e50347 Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 24 Jun 2015 08:03:07 +0900 Subject: [PATCH 09/15] Remove the interface for `describeTopics`. Because it is not implemented. --- python/pyspark/mllib/clustering.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 5f33be30676e..671e2b19472c 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -612,14 +612,6 @@ def vocabSize(self): """Vocabulary size (number of terms or terms in the vocabulary)""" return self.call("vocabSize") - def describeTopics(self, maxTermsPerTopic=None): - """Return the topics described by weighted terms. - - TODO: - Implementing this method is a little hard. Since Scala's return value consistes of tuples. - """ - raise NotImplementedError("LDAModel.describeTopics() in Python must be implemented.") - class LDA(): From faa97643676c1a4734cf6ac77648222a0aa9e03b Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 24 Jun 2015 08:28:13 +0900 Subject: [PATCH 10/15] Add some comments for the LDA paramters --- python/pyspark/mllib/clustering.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 671e2b19472c..697a3128d11d 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -618,7 +618,20 @@ class LDA(): @classmethod def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0, topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"): - """Train a LDA model.""" + """Train a LDA model. + + :param rdd: RDD of data points + :param k: Number of clusters you want + :param maxIterations: Number of iterations. Default to 20 + :param docConcentration: Concentration parameter (commonly named "alpha") + for the prior placed on documents' distributions over topics ("theta"). + :param topicConcentration: Concentration parameter (commonly named "beta" or "eta") + for the prior placed on topics' distributions over terms. + :param seed: Random Seed + :param checkpointInterval: Period (in iterations) between checkpoints. + :param optimizer: LDAOptimizer used to perform the actual calculation + (default = EMLDAOptimizer) + """ model = callMLlibFunc("trainLDAModel", rdd, k, maxIterations, docConcentration, topicConcentration, seed, checkpointInterval, optimizer) From 9f8bed8c97c1931f2ff5b69f17cd2513f5e73a16 Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Thu, 2 Jul 2015 17:42:27 +0900 Subject: [PATCH 11/15] Simplify casting --- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index b0110fec6953..c58a64001d9a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -526,10 +526,8 @@ private[python] class PythonMLLibAPI extends Serializable { val documents = data.rdd.map(_.asScala.toArray).map { r => r(0) match { - case i: java.lang.Integer => - (r(0).asInstanceOf[java.lang.Integer].toLong, r(1).asInstanceOf[Vector]) - case i: java.lang.Long => - (r(0).asInstanceOf[java.lang.Long].toLong, r(1).asInstanceOf[Vector]) + case i: java.lang.Integer => (i.toLong, r(1).asInstanceOf[Vector]) + case i: java.lang.Long => (i.toLong, r(1).asInstanceOf[Vector]) case _ => throw new IllegalArgumentException("input values contains invalid type value.") } } From 083e226231b2e2daf65569fe8710ef1255f63f6e Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Fri, 3 Jul 2015 08:08:43 +0900 Subject: [PATCH 12/15] Add the comment about the supported values and the default value of `optimizer` --- python/pyspark/mllib/clustering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 697a3128d11d..34f2c4521106 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -630,7 +630,7 @@ def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0, :param seed: Random Seed :param checkpointInterval: Period (in iterations) between checkpoints. :param optimizer: LDAOptimizer used to perform the actual calculation - (default = EMLDAOptimizer) + (default = EMLDAOptimizer). Currently "em", "online" are supported. Default to "em". """ model = callMLlibFunc("trainLDAModel", rdd, k, maxIterations, docConcentration, topicConcentration, seed, From d7a332ab8b457a3597c0acc477ae37bf17df354b Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Thu, 9 Jul 2015 11:11:18 +0900 Subject: [PATCH 13/15] Remove the doc comment about the optimizer's default value --- python/pyspark/mllib/clustering.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 34f2c4521106..5fe0afc5691d 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -629,8 +629,8 @@ def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0, for the prior placed on topics' distributions over terms. :param seed: Random Seed :param checkpointInterval: Period (in iterations) between checkpoints. - :param optimizer: LDAOptimizer used to perform the actual calculation - (default = EMLDAOptimizer). Currently "em", "online" are supported. Default to "em". + :param optimizer: LDAOptimizer used to perform the actual calculation. + Currently "em", "online" are supported. Default to "em". """ model = callMLlibFunc("trainLDAModel", rdd, k, maxIterations, docConcentration, topicConcentration, seed, From 28bd165a5da8918a86cb205cd3facc48869b48bf Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 15 Jul 2015 08:28:40 +0900 Subject: [PATCH 14/15] Change the place of testing code --- python/pyspark/mllib/clustering.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 5fe0afc5691d..d0ab0c4be056 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -565,17 +565,6 @@ def predictOnValues(self, dstream): return dstream.mapValues(lambda x: self._model.predict(x)) -def _test(): - import doctest - import pyspark.mllib.clustering - globs = pyspark.mllib.clustering.__dict__.copy() - globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) - globs['sc'].stop() - if failure_count: - exit(-1) - - class LDAModel(JavaModelWrapper): """ A clustering model derived from the LDA method. @@ -638,5 +627,16 @@ def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0, return LDAModel(model) +def _test(): + import doctest + import pyspark.mllib.clustering + globs = pyspark.mllib.clustering.__dict__.copy() + globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + if __name__ == "__main__": _test() From 6855f5904322a57baf06868a23ef48eb3284514a Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 15 Jul 2015 08:32:50 +0900 Subject: [PATCH 15/15] LDA inherits object --- python/pyspark/mllib/clustering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index d0ab0c4be056..8a92f6911c24 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -602,7 +602,7 @@ def vocabSize(self): return self.call("vocabSize") -class LDA(): +class LDA(object): @classmethod def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0,