Skip to content

Commit ca4b477

Browse files
committed
Merge branch 'master' of github.com:apache/spark into SPARK-7977
2 parents 83ab635 + 4528166 commit ca4b477

File tree

11 files changed

+354
-51
lines changed

11 files changed

+354
-51
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
831831
* }}}
832832
*
833833
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
834-
*
834+
* @note On some filesystems, `.../path/*` can be a more efficient way to read all files
835+
* in a directory rather than `.../path/` or `.../path`
835836
* @param minPartitions A suggestion value of the minimal splitting number for input data.
836837
*/
837838
def wholeTextFiles(
@@ -878,9 +879,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
878879
* (a-hdfs-path/part-nnnnn, its content)
879880
* }}}
880881
*
881-
* @param minPartitions A suggestion value of the minimal splitting number for input data.
882-
*
883882
* @note Small files are preferred; very large files may cause bad performance.
883+
* @note On some filesystems, `.../path/*` can be a more efficient way to read all files
884+
* in a directory rather than `.../path/` or `.../path`
885+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
884886
*/
885887
@Experimental
886888
def binaryFiles(

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ private[spark] class CoarseGrainedExecutorBackend(
6666
case Success(msg) => Utils.tryLogNonFatalError {
6767
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
6868
}
69-
case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
69+
case Failure(e) => {
70+
logError(s"Cannot register with driver: $driverUrl", e)
71+
System.exit(1)
72+
}
7073
}(ThreadUtils.sameThread)
7174
}
7275

docs/mllib-linear-methods.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,58 @@ will get better!
768768

769769
</div>
770770

771+
<div data-lang="python" markdown="1">
772+
773+
First, we import the necessary classes for parsing our input data and creating the model.
774+
775+
{% highlight python %}
776+
from pyspark.mllib.linalg import Vectors
777+
from pyspark.mllib.regression import LabeledPoint
778+
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD
779+
{% endhighlight %}
780+
781+
Then we make input streams for training and testing data. We assume a StreamingContext `ssc`
782+
has already been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing)
783+
for more info. For this example, we use labeled points in training and testing streams,
784+
but in practice you will likely want to use unlabeled vectors for test data.
785+
786+
{% highlight python %}
787+
def parse(lp):
788+
label = float(lp[lp.find('(') + 1: lp.find(',')])
789+
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
790+
return LabeledPoint(label, vec)
791+
792+
trainingData = ssc.textFileStream("/training/data/dir").map(parse).cache()
793+
testData = ssc.textFileStream("/testing/data/dir").map(parse)
794+
{% endhighlight %}
795+
796+
We create our model by initializing the weights to 0
797+
798+
{% highlight python %}
799+
numFeatures = 3
800+
model = StreamingLinearRegressionWithSGD()
801+
model.setInitialWeights([0.0, 0.0, 0.0])
802+
{% endhighlight %}
803+
804+
Now we register the streams for training and testing and start the job.
805+
806+
{% highlight python %}
807+
model.trainOn(trainingData)
808+
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
809+
810+
ssc.start()
811+
ssc.awaitTermination()
812+
{% endhighlight %}
813+
814+
We can now save text files with data to the training or testing folders.
815+
Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label
816+
and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir`
817+
the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions.
818+
As you feed more data to the training directory, the predictions
819+
will get better!
820+
821+
</div>
822+
771823
</div>
772824

773825

python/pyspark/mllib/classification.py

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
from pyspark.streaming import DStream
2525
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
2626
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
27-
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
27+
from pyspark.mllib.regression import (
28+
LabeledPoint, LinearModel, _regression_train_wrapper,
29+
StreamingLinearAlgorithm)
2830
from pyspark.mllib.util import Saveable, Loader, inherit_doc
2931

3032

@@ -585,55 +587,13 @@ def train(cls, data, lambda_=1.0):
585587
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
586588

587589

588-
class StreamingLinearAlgorithm(object):
589-
"""
590-
Base class that has to be inherited by any StreamingLinearAlgorithm.
591-
592-
Prevents reimplementation of methods predictOn and predictOnValues.
593-
"""
594-
def __init__(self, model):
595-
self._model = model
596-
597-
def latestModel(self):
598-
"""
599-
Returns the latest model.
600-
"""
601-
return self._model
602-
603-
def _validate(self, dstream):
604-
if not isinstance(dstream, DStream):
605-
raise TypeError(
606-
"dstream should be a DStream object, got %s" % type(dstream))
607-
if not self._model:
608-
raise ValueError(
609-
"Model must be intialized using setInitialWeights")
610-
611-
def predictOn(self, dstream):
612-
"""
613-
Make predictions on a dstream.
614-
615-
:return: Transformed dstream object.
616-
"""
617-
self._validate(dstream)
618-
return dstream.map(lambda x: self._model.predict(x))
619-
620-
def predictOnValues(self, dstream):
621-
"""
622-
Make predictions on a keyed dstream.
623-
624-
:return: Transformed dstream object.
625-
"""
626-
self._validate(dstream)
627-
return dstream.mapValues(lambda x: self._model.predict(x))
628-
629-
630590
@inherit_doc
631591
class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
632592
"""
633-
Run LogisticRegression with SGD on a stream of data.
593+
Run LogisticRegression with SGD on a batch of data.
634594
635595
The weights obtained at the end of training a stream are used as initial
636-
weights for the next stream.
596+
weights for the next batch.
637597
638598
:param stepSize: Step size for each iteration of gradient descent.
639599
:param numIterations: Number of iterations run for each batch of data.

python/pyspark/mllib/regression.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from numpy import array
2020

2121
from pyspark import RDD
22+
from pyspark.streaming.dstream import DStream
2223
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
2324
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
2425
from pyspark.mllib.util import Saveable, Loader
@@ -570,6 +571,95 @@ def train(cls, data, isotonic=True):
570571
return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)
571572

572573

574+
class StreamingLinearAlgorithm(object):
575+
"""
576+
Base class that has to be inherited by any StreamingLinearAlgorithm.
577+
578+
Prevents reimplementation of methods predictOn and predictOnValues.
579+
"""
580+
def __init__(self, model):
581+
self._model = model
582+
583+
def latestModel(self):
584+
"""
585+
Returns the latest model.
586+
"""
587+
return self._model
588+
589+
def _validate(self, dstream):
590+
if not isinstance(dstream, DStream):
591+
raise TypeError(
592+
"dstream should be a DStream object, got %s" % type(dstream))
593+
if not self._model:
594+
raise ValueError(
595+
"Model must be intialized using setInitialWeights")
596+
597+
def predictOn(self, dstream):
598+
"""
599+
Make predictions on a dstream.
600+
601+
:return: Transformed dstream object.
602+
"""
603+
self._validate(dstream)
604+
return dstream.map(lambda x: self._model.predict(x))
605+
606+
def predictOnValues(self, dstream):
607+
"""
608+
Make predictions on a keyed dstream.
609+
610+
:return: Transformed dstream object.
611+
"""
612+
self._validate(dstream)
613+
return dstream.mapValues(lambda x: self._model.predict(x))
614+
615+
616+
@inherit_doc
617+
class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
618+
"""
619+
Run LinearRegression with SGD on a batch of data.
620+
621+
The problem minimized is (1 / n_samples) * (y - weights'X)**2.
622+
After training on a batch of data, the weights obtained at the end of
623+
training are used as initial weights for the next batch.
624+
625+
:param: stepSize Step size for each iteration of gradient descent.
626+
:param: numIterations Total number of iterations run.
627+
:param: miniBatchFraction Fraction of data on which SGD is run for each
628+
iteration.
629+
"""
630+
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0):
631+
self.stepSize = stepSize
632+
self.numIterations = numIterations
633+
self.miniBatchFraction = miniBatchFraction
634+
self._model = None
635+
super(StreamingLinearRegressionWithSGD, self).__init__(
636+
model=self._model)
637+
638+
def setInitialWeights(self, initialWeights):
639+
"""
640+
Set the initial value of weights.
641+
642+
This must be set before running trainOn and predictOn
643+
"""
644+
initialWeights = _convert_to_vector(initialWeights)
645+
self._model = LinearRegressionModel(initialWeights, 0)
646+
return self
647+
648+
def trainOn(self, dstream):
649+
"""Train the model on the incoming dstream."""
650+
self._validate(dstream)
651+
652+
def update(rdd):
653+
# LinearRegressionWithSGD.train raises an error for an empty RDD.
654+
if not rdd.isEmpty():
655+
self._model = LinearRegressionWithSGD.train(
656+
rdd, self.numIterations, self.stepSize,
657+
self.miniBatchFraction, self._model.weights,
658+
self._model.intercept)
659+
660+
dstream.foreachRDD(update)
661+
662+
573663
def _test():
574664
import doctest
575665
from pyspark import SparkContext

0 commit comments

Comments
 (0)