Skip to content

Commit af40769

Browse files
committed
Merge remote-tracking branch 'apache/master' into readerRddMaster
2 parents b5d1008 + bfd75cd commit af40769

File tree

30 files changed

+465
-356
lines changed

30 files changed

+465
-356
lines changed

dev/create-release/release-build.sh

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,11 +371,18 @@ if [[ "$1" == "publish-release" ]]; then
371371
find . -type f |grep -v \.jar |grep -v \.pom | xargs rm
372372

373373
echo "Creating hash and signature files"
374-
# this must have .asc and .sha1 - it really doesn't like anything else there
374+
# this must have .asc, .md5 and .sha1 - it really doesn't like anything else there
375375
for file in $(find . -type f)
376376
do
377377
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --output $file.asc \
378378
--detach-sig --armour $file;
379+
if [ $(command -v md5) ]; then
380+
# Available on OS X; -q to keep only hash
381+
md5 -q $file > $file.md5
382+
else
383+
# Available on Linux; cut to keep only hash
384+
md5sum $file | cut -f1 -d' ' > $file.md5
385+
fi
379386
sha1sum $file | cut -f1 -d' ' > $file.sha1
380387
done
381388

mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,10 +569,14 @@ abstract class LDAModel private[ml] (
569569
class LocalLDAModel private[ml] (
570570
uid: String,
571571
vocabSize: Int,
572-
@Since("1.6.0") override private[clustering] val oldLocalModel: OldLocalLDAModel,
572+
private[clustering] val oldLocalModel_ : OldLocalLDAModel,
573573
sparkSession: SparkSession)
574574
extends LDAModel(uid, vocabSize, sparkSession) {
575575

576+
override private[clustering] def oldLocalModel: OldLocalLDAModel = {
577+
oldLocalModel_.setSeed(getSeed)
578+
}
579+
576580
@Since("1.6.0")
577581
override def copy(extra: ParamMap): LocalLDAModel = {
578582
val copied = new LocalLDAModel(uid, vocabSize, oldLocalModel, sparkSession)

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
3232
import org.apache.spark.mllib.util.{Loader, Saveable}
3333
import org.apache.spark.rdd.RDD
3434
import org.apache.spark.sql.{Row, SparkSession}
35-
import org.apache.spark.util.BoundedPriorityQueue
35+
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
3636

3737
/**
3838
* Latent Dirichlet Allocation (LDA) model.
@@ -194,6 +194,8 @@ class LocalLDAModel private[spark] (
194194
override protected[spark] val gammaShape: Double = 100)
195195
extends LDAModel with Serializable {
196196

197+
private var seed: Long = Utils.random.nextLong()
198+
197199
@Since("1.3.0")
198200
override def k: Int = topics.numCols
199201

@@ -216,6 +218,21 @@ class LocalLDAModel private[spark] (
216218

217219
override protected def formatVersion = "1.0"
218220

221+
/**
222+
* Random seed for cluster initialization.
223+
*/
224+
@Since("2.4.0")
225+
def getSeed: Long = seed
226+
227+
/**
228+
* Set the random seed for cluster initialization.
229+
*/
230+
@Since("2.4.0")
231+
def setSeed(seed: Long): this.type = {
232+
this.seed = seed
233+
this
234+
}
235+
219236
@Since("1.5.0")
220237
override def save(sc: SparkContext, path: String): Unit = {
221238
LocalLDAModel.SaveLoadV1_0.save(sc, path, topicsMatrix, docConcentration, topicConcentration,
@@ -298,6 +315,7 @@ class LocalLDAModel private[spark] (
298315
// by topic (columns of lambda)
299316
val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t
300317
val ElogbetaBc = documents.sparkContext.broadcast(Elogbeta)
318+
val gammaSeed = this.seed
301319

302320
// Sum bound components for each document:
303321
// component for prob(tokens) + component for prob(document-topic distribution)
@@ -306,7 +324,7 @@ class LocalLDAModel private[spark] (
306324
val localElogbeta = ElogbetaBc.value
307325
var docBound = 0.0D
308326
val (gammad: BDV[Double], _, _) = OnlineLDAOptimizer.variationalTopicInference(
309-
termCounts, exp(localElogbeta), brzAlpha, gammaShape, k)
327+
termCounts, exp(localElogbeta), brzAlpha, gammaShape, k, gammaSeed + id)
310328
val Elogthetad: BDV[Double] = LDAUtils.dirichletExpectation(gammad)
311329

312330
// E[log p(doc | theta, beta)]
@@ -352,6 +370,7 @@ class LocalLDAModel private[spark] (
352370
val docConcentrationBrz = this.docConcentration.asBreeze
353371
val gammaShape = this.gammaShape
354372
val k = this.k
373+
val gammaSeed = this.seed
355374

356375
documents.map { case (id: Long, termCounts: Vector) =>
357376
if (termCounts.numNonzeros == 0) {
@@ -362,7 +381,8 @@ class LocalLDAModel private[spark] (
362381
expElogbetaBc.value,
363382
docConcentrationBrz,
364383
gammaShape,
365-
k)
384+
k,
385+
gammaSeed + id)
366386
(id, Vectors.dense(normalize(gamma, 1.0).toArray))
367387
}
368388
}
@@ -376,6 +396,7 @@ class LocalLDAModel private[spark] (
376396
val docConcentrationBrz = this.docConcentration.asBreeze
377397
val gammaShape = this.gammaShape
378398
val k = this.k
399+
val gammaSeed = this.seed
379400

380401
(termCounts: Vector) =>
381402
if (termCounts.numNonzeros == 0) {
@@ -386,7 +407,8 @@ class LocalLDAModel private[spark] (
386407
expElogbeta,
387408
docConcentrationBrz,
388409
gammaShape,
389-
k)
410+
k,
411+
gammaSeed)
390412
Vectors.dense(normalize(gamma, 1.0).toArray)
391413
}
392414
}
@@ -403,6 +425,7 @@ class LocalLDAModel private[spark] (
403425
*/
404426
@Since("2.0.0")
405427
def topicDistribution(document: Vector): Vector = {
428+
val gammaSeed = this.seed
406429
val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
407430
if (document.numNonzeros == 0) {
408431
Vectors.zeros(this.k)
@@ -412,7 +435,8 @@ class LocalLDAModel private[spark] (
412435
expElogbeta,
413436
this.docConcentration.asBreeze,
414437
gammaShape,
415-
this.k)
438+
this.k,
439+
gammaSeed)
416440
Vectors.dense(normalize(gamma, 1.0).toArray)
417441
}
418442
}

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
3030
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
3131
import org.apache.spark.rdd.RDD
3232
import org.apache.spark.storage.StorageLevel
33+
import org.apache.spark.util.Utils
3334

3435
/**
3536
* :: DeveloperApi ::
@@ -464,6 +465,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging {
464465
val alpha = this.alpha.asBreeze
465466
val gammaShape = this.gammaShape
466467
val optimizeDocConcentration = this.optimizeDocConcentration
468+
val seed = randomGenerator.nextLong()
467469
// If and only if optimizeDocConcentration is set true,
468470
// we calculate logphat in the same pass as other statistics.
469471
// No calculation of loghat happens otherwise.
@@ -473,20 +475,21 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging {
473475
None
474476
}
475477

476-
val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
477-
val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
478-
479-
val stat = BDM.zeros[Double](k, vocabSize)
480-
val logphatPartOption = logphatPartOptionBase()
481-
var nonEmptyDocCount: Long = 0L
482-
nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
483-
nonEmptyDocCount += 1
484-
val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
485-
termCounts, expElogbetaBc.value, alpha, gammaShape, k)
486-
stat(::, ids) := stat(::, ids) + sstats
487-
logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
488-
}
489-
Iterator((stat, logphatPartOption, nonEmptyDocCount))
478+
val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitionsWithIndex {
479+
(index, docs) =>
480+
val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
481+
482+
val stat = BDM.zeros[Double](k, vocabSize)
483+
val logphatPartOption = logphatPartOptionBase()
484+
var nonEmptyDocCount: Long = 0L
485+
nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
486+
nonEmptyDocCount += 1
487+
val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
488+
termCounts, expElogbetaBc.value, alpha, gammaShape, k, seed + index)
489+
stat(::, ids) := stat(::, ids) + sstats
490+
logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
491+
}
492+
Iterator((stat, logphatPartOption, nonEmptyDocCount))
490493
}
491494

492495
val elementWiseSum = (
@@ -578,7 +581,8 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging {
578581
}
579582

580583
override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
581-
new LocalLDAModel(Matrices.fromBreeze(lambda).transpose, alpha, eta, gammaShape)
584+
new LocalLDAModel(Matrices.fromBreeze(lambda).transpose, alpha, eta)
585+
.setSeed(randomGenerator.nextLong())
582586
}
583587

584588
}
@@ -605,18 +609,20 @@ private[clustering] object OnlineLDAOptimizer {
605609
expElogbeta: BDM[Double],
606610
alpha: breeze.linalg.Vector[Double],
607611
gammaShape: Double,
608-
k: Int): (BDV[Double], BDM[Double], List[Int]) = {
612+
k: Int,
613+
seed: Long): (BDV[Double], BDM[Double], List[Int]) = {
609614
val (ids: List[Int], cts: Array[Double]) = termCounts match {
610615
case v: DenseVector => ((0 until v.size).toList, v.values)
611616
case v: SparseVector => (v.indices.toList, v.values)
612617
}
613618
// Initialize the variational distribution q(theta|gamma) for the mini-batch
619+
val randBasis = new RandBasis(new org.apache.commons.math3.random.MersenneTwister(seed))
614620
val gammad: BDV[Double] =
615-
new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k) // K
621+
new Gamma(gammaShape, 1.0 / gammaShape)(randBasis).samplesVector(k) // K
616622
val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad)) // K
617623
val expElogbetad = expElogbeta(ids, ::).toDenseMatrix // ids * K
618624

619-
val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100 // ids
625+
val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100 // ids
620626
var meanGammaChange = 1D
621627
val ctsVector = new BDV[Double](cts) // ids
622628

mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,12 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
253253
val lda = new LDA()
254254
testEstimatorAndModelReadWrite(lda, dataset, LDASuite.allParamSettings,
255255
LDASuite.allParamSettings, checkModelData)
256+
257+
// Make sure the result is deterministic after saving and loading the model
258+
val model = lda.fit(dataset)
259+
val model2 = testDefaultReadWrite(model)
260+
assert(model.logLikelihood(dataset) ~== model2.logLikelihood(dataset) absTol 1e-6)
261+
assert(model.logPerplexity(dataset) ~== model2.logPerplexity(dataset) absTol 1e-6)
256262
}
257263

258264
test("read/write DistributedLDAModel") {

python/pyspark/ml/classification.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,6 +1222,10 @@ class GBTClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol
12221222
True
12231223
>>> model.trees
12241224
[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]
1225+
>>> validation = spark.createDataFrame([(0.0, Vectors.dense(-1.0),)],
1226+
... ["indexed", "features"])
1227+
>>> model.evaluateEachIteration(validation)
1228+
[0.25..., 0.23..., 0.21..., 0.19..., 0.18...]
12251229
12261230
.. versionadded:: 1.4.0
12271231
"""
@@ -1319,6 +1323,17 @@ def trees(self):
13191323
"""Trees in this ensemble. Warning: These have null parent Estimators."""
13201324
return [DecisionTreeRegressionModel(m) for m in list(self._call_java("trees"))]
13211325

1326+
@since("2.4.0")
1327+
def evaluateEachIteration(self, dataset):
1328+
"""
1329+
Method to compute error or loss for every iteration of gradient boosting.
1330+
1331+
:param dataset:
1332+
Test dataset to evaluate model on, where dataset is an
1333+
instance of :py:class:`pyspark.sql.DataFrame`
1334+
"""
1335+
return self._call_java("evaluateEachIteration", dataset)
1336+
13221337

13231338
@inherit_doc
13241339
class NaiveBayes(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasProbabilityCol,

python/pyspark/ml/regression.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,6 +1056,10 @@ class GBTRegressor(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol,
10561056
True
10571057
>>> model.trees
10581058
[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]
1059+
>>> validation = spark.createDataFrame([(0.0, Vectors.dense(-1.0))],
1060+
... ["label", "features"])
1061+
>>> model.evaluateEachIteration(validation, "squared")
1062+
[0.0, 0.0, 0.0, 0.0, 0.0]
10591063
10601064
.. versionadded:: 1.4.0
10611065
"""
@@ -1156,6 +1160,20 @@ def trees(self):
11561160
"""Trees in this ensemble. Warning: These have null parent Estimators."""
11571161
return [DecisionTreeRegressionModel(m) for m in list(self._call_java("trees"))]
11581162

1163+
@since("2.4.0")
1164+
def evaluateEachIteration(self, dataset, loss):
1165+
"""
1166+
Method to compute error or loss for every iteration of gradient boosting.
1167+
1168+
:param dataset:
1169+
Test dataset to evaluate model on, where dataset is an
1170+
instance of :py:class:`pyspark.sql.DataFrame`
1171+
:param loss:
1172+
The loss function used to compute error.
1173+
Supported options: squared, absolute
1174+
"""
1175+
return self._call_java("evaluateEachIteration", dataset, loss)
1176+
11591177

11601178
@inherit_doc
11611179
class AFTSurvivalRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol,

python/pyspark/ml/tests.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,6 +1595,44 @@ def test_default_read_write(self):
15951595
self.assertEqual(lr.uid, lr3.uid)
15961596
self.assertEqual(lr.extractParamMap(), lr3.extractParamMap())
15971597

1598+
def test_default_read_write_default_params(self):
1599+
lr = LogisticRegression()
1600+
self.assertFalse(lr.isSet(lr.getParam("threshold")))
1601+
1602+
lr.setMaxIter(50)
1603+
lr.setThreshold(.75)
1604+
1605+
# `threshold` is set by user, default param `predictionCol` is not set by user.
1606+
self.assertTrue(lr.isSet(lr.getParam("threshold")))
1607+
self.assertFalse(lr.isSet(lr.getParam("predictionCol")))
1608+
self.assertTrue(lr.hasDefault(lr.getParam("predictionCol")))
1609+
1610+
writer = DefaultParamsWriter(lr)
1611+
metadata = json.loads(writer._get_metadata_to_save(lr, self.sc))
1612+
self.assertTrue("defaultParamMap" in metadata)
1613+
1614+
reader = DefaultParamsReadable.read()
1615+
metadataStr = json.dumps(metadata, separators=[',', ':'])
1616+
loadedMetadata = reader._parseMetaData(metadataStr, )
1617+
reader.getAndSetParams(lr, loadedMetadata)
1618+
1619+
self.assertTrue(lr.isSet(lr.getParam("threshold")))
1620+
self.assertFalse(lr.isSet(lr.getParam("predictionCol")))
1621+
self.assertTrue(lr.hasDefault(lr.getParam("predictionCol")))
1622+
1623+
# manually create metadata without `defaultParamMap` section.
1624+
del metadata['defaultParamMap']
1625+
metadataStr = json.dumps(metadata, separators=[',', ':'])
1626+
loadedMetadata = reader._parseMetaData(metadataStr, )
1627+
with self.assertRaisesRegexp(AssertionError, "`defaultParamMap` section not found"):
1628+
reader.getAndSetParams(lr, loadedMetadata)
1629+
1630+
# Prior to 2.4.0, metadata doesn't have `defaultParamMap`.
1631+
metadata['sparkVersion'] = '2.3.0'
1632+
metadataStr = json.dumps(metadata, separators=[',', ':'])
1633+
loadedMetadata = reader._parseMetaData(metadataStr, )
1634+
reader.getAndSetParams(lr, loadedMetadata)
1635+
15981636

15991637
class LDATest(SparkSessionTestCase):
16001638

0 commit comments

Comments
 (0)