Skip to content

Commit 821ea67

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into sql-tests-refactor
2 parents 0b60325 + a8d2f4c commit 821ea67

File tree

6 files changed

+100
-4
lines changed

6 files changed

+100
-4
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.json4s.jackson.JsonMethods._
2626

2727
import org.apache.spark.SparkContext
2828
import org.apache.spark.annotation.Experimental
29-
import org.apache.spark.api.java.JavaPairRDD
29+
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
3030
import org.apache.spark.graphx.{Edge, EdgeContext, Graph, VertexId}
3131
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
3232
import org.apache.spark.mllib.util.{Loader, Saveable}
@@ -228,6 +228,11 @@ class LocalLDAModel private[clustering] (
228228
docConcentration, topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k,
229229
vocabSize)
230230

231+
/** Java-friendly version of [[logLikelihood]] */
232+
def logLikelihood(documents: JavaPairRDD[java.lang.Long, Vector]): Double = {
233+
logLikelihood(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
234+
}
235+
231236
/**
232237
* Calculate an upper bound bound on perplexity. (Lower is better.)
233238
* See Equation (16) in original Online LDA paper.
@@ -242,6 +247,11 @@ class LocalLDAModel private[clustering] (
242247
-logLikelihood(documents) / corpusTokenCount
243248
}
244249

250+
/** Java-friendly version of [[logPerplexity]] */
251+
def logPerplexity(documents: JavaPairRDD[java.lang.Long, Vector]): Double = {
252+
logPerplexity(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
253+
}
254+
245255
/**
246256
* Estimate the variational likelihood bound of from `documents`:
247257
* log p(documents) >= E_q[log p(documents)] - E_q[log q(documents)]
@@ -341,8 +351,14 @@ class LocalLDAModel private[clustering] (
341351
}
342352
}
343353

344-
}
354+
/** Java-friendly version of [[topicDistributions]] */
355+
def topicDistributions(
356+
documents: JavaPairRDD[java.lang.Long, Vector]): JavaPairRDD[java.lang.Long, Vector] = {
357+
val distributions = topicDistributions(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
358+
JavaPairRDD.fromRDD(distributions.asInstanceOf[RDD[(java.lang.Long, Vector)]])
359+
}
345360

361+
}
346362

347363
@Experimental
348364
object LocalLDAModel extends Loader[LocalLDAModel] {
@@ -657,6 +673,13 @@ class DistributedLDAModel private[clustering] (
657673
}
658674
}
659675

676+
/** Java-friendly version of [[topTopicsPerDocument]] */
677+
def javaTopTopicsPerDocument(
678+
k: Int): JavaRDD[(java.lang.Long, Array[Int], Array[java.lang.Double])] = {
679+
val topics = topTopicsPerDocument(k)
680+
topics.asInstanceOf[RDD[(java.lang.Long, Array[Int], Array[java.lang.Double])]].toJavaRDD()
681+
}
682+
660683
// TODO:
661684
// override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ???
662685

mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.stat
2020
import scala.annotation.varargs
2121

2222
import org.apache.spark.annotation.Experimental
23-
import org.apache.spark.api.java.JavaRDD
23+
import org.apache.spark.api.java.{JavaRDD, JavaDoubleRDD}
2424
import org.apache.spark.mllib.linalg.distributed.RowMatrix
2525
import org.apache.spark.mllib.linalg.{Matrix, Vector}
2626
import org.apache.spark.mllib.regression.LabeledPoint
@@ -178,6 +178,9 @@ object Statistics {
178178
ChiSqTest.chiSquaredFeatures(data)
179179
}
180180

181+
/** Java-friendly version of [[chiSqTest()]] */
182+
def chiSqTest(data: JavaRDD[LabeledPoint]): Array[ChiSqTestResult] = chiSqTest(data.rdd)
183+
181184
/**
182185
* Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a
183186
* continuous distribution. By comparing the largest difference between the empirical cumulative
@@ -212,4 +215,15 @@ object Statistics {
212215
: KolmogorovSmirnovTestResult = {
213216
KolmogorovSmirnovTest.testOneSample(data, distName, params: _*)
214217
}
218+
219+
/** Java-friendly version of [[kolmogorovSmirnovTest()]] */
220+
@varargs
221+
def kolmogorovSmirnovTest(
222+
data: JavaDoubleRDD,
223+
distName: String,
224+
params: java.lang.Double*): KolmogorovSmirnovTestResult = {
225+
val javaParams = params.asInstanceOf[Seq[Double]]
226+
KolmogorovSmirnovTest.testOneSample(data.rdd.asInstanceOf[RDD[Double]],
227+
distName, javaParams: _*)
228+
}
215229
}

mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ public Boolean call(Tuple2<Long, Vector> tuple2) {
124124
}
125125
});
126126
assertEquals(topicDistributions.count(), nonEmptyCorpus.count());
127+
128+
// Check: javaTopTopicsPerDocuments
129+
JavaRDD<scala.Tuple3<java.lang.Long, int[], java.lang.Double[]>> topTopics =
130+
model.javaTopTopicsPerDocument(3);
127131
}
128132

129133
@Test
@@ -160,11 +164,31 @@ public void OnlineOptimizerCompatibility() {
160164
assertEquals(roundedLocalTopicSummary.length, k);
161165
}
162166

167+
@Test
168+
public void localLdaMethods() {
169+
JavaRDD<Tuple2<Long, Vector>> docs = sc.parallelize(toyData, 2);
170+
JavaPairRDD<Long, Vector> pairedDocs = JavaPairRDD.fromJavaRDD(docs);
171+
172+
// check: topicDistributions
173+
assertEquals(toyModel.topicDistributions(pairedDocs).count(), pairedDocs.count());
174+
175+
// check: logPerplexity
176+
double logPerplexity = toyModel.logPerplexity(pairedDocs);
177+
178+
// check: logLikelihood.
179+
ArrayList<Tuple2<Long, Vector>> docsSingleWord = new ArrayList<Tuple2<Long, Vector>>();
180+
docsSingleWord.add(new Tuple2<Long, Vector>(Long.valueOf(0), Vectors.dense(1.0, 0.0, 0.0)));
181+
JavaPairRDD<Long, Vector> single = JavaPairRDD.fromJavaRDD(sc.parallelize(docsSingleWord));
182+
double logLikelihood = toyModel.logLikelihood(single);
183+
}
184+
163185
private static int tinyK = LDASuite$.MODULE$.tinyK();
164186
private static int tinyVocabSize = LDASuite$.MODULE$.tinyVocabSize();
165187
private static Matrix tinyTopics = LDASuite$.MODULE$.tinyTopics();
166188
private static Tuple2<int[], double[]>[] tinyTopicDescription =
167189
LDASuite$.MODULE$.tinyTopicDescription();
168190
private JavaPairRDD<Long, Vector> corpus;
191+
private LocalLDAModel toyModel = LDASuite$.MODULE$.toyModel();
192+
private ArrayList<Tuple2<Long, Vector>> toyData = LDASuite$.MODULE$.javaToyData();
169193

170194
}

mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@
2727
import static org.junit.Assert.assertEquals;
2828

2929
import org.apache.spark.api.java.JavaRDD;
30+
import org.apache.spark.api.java.JavaDoubleRDD;
3031
import org.apache.spark.api.java.JavaSparkContext;
32+
import org.apache.spark.mllib.linalg.Vectors;
33+
import org.apache.spark.mllib.regression.LabeledPoint;
34+
import org.apache.spark.mllib.stat.test.ChiSqTestResult;
35+
import org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult;
3136

3237
public class JavaStatisticsSuite implements Serializable {
3338
private transient JavaSparkContext sc;
@@ -53,4 +58,21 @@ public void testCorr() {
5358
// Check default method
5459
assertEquals(corr1, corr2);
5560
}
61+
62+
@Test
63+
public void kolmogorovSmirnovTest() {
64+
JavaDoubleRDD data = sc.parallelizeDoubles(Lists.newArrayList(0.2, 1.0, -1.0, 2.0));
65+
KolmogorovSmirnovTestResult testResult1 = Statistics.kolmogorovSmirnovTest(data, "norm");
66+
KolmogorovSmirnovTestResult testResult2 = Statistics.kolmogorovSmirnovTest(
67+
data, "norm", 0.0, 1.0);
68+
}
69+
70+
@Test
71+
public void chiSqTest() {
72+
JavaRDD<LabeledPoint> data = sc.parallelize(Lists.newArrayList(
73+
new LabeledPoint(0.0, Vectors.dense(0.1, 2.3)),
74+
new LabeledPoint(1.0, Vectors.dense(1.5, 5.1)),
75+
new LabeledPoint(0.0, Vectors.dense(2.4, 8.1))));
76+
ChiSqTestResult[] testResults = Statistics.chiSqTest(data);
77+
}
5678
}

mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.mllib.clustering
1919

20+
import java.util.{ArrayList => JArrayList}
21+
2022
import breeze.linalg.{DenseMatrix => BDM, argtopk, max, argmax}
2123

2224
import org.apache.spark.SparkFunSuite
@@ -575,6 +577,17 @@ private[clustering] object LDASuite {
575577
Vectors.sparse(6, Array(4, 5), Array(1, 1))
576578
).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
577579

580+
/** Used in the Java Test Suite */
581+
def javaToyData: JArrayList[(java.lang.Long, Vector)] = {
582+
val javaData = new JArrayList[(java.lang.Long, Vector)]
583+
var i = 0
584+
while (i < toyData.size) {
585+
javaData.add((toyData(i)._1, toyData(i)._2))
586+
i += 1
587+
}
588+
javaData
589+
}
590+
578591
def toyModel: LocalLDAModel = {
579592
val k = 2
580593
val vocabSize = 6

python/pyspark/sql/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
try:
4040
import pandas
4141
has_pandas = True
42-
except ImportError:
42+
except Exception:
4343
has_pandas = False
4444

4545
__all__ = ["SQLContext", "HiveContext", "UDFRegistration"]

0 commit comments

Comments
 (0)