Skip to content

Commit 4528166

Browse files
MechCodermengxr
authored andcommitted
[SPARK-4127] [MLLIB] [PYSPARK] Python bindings for StreamingLinearRegressionWithSGD
Python bindings for StreamingLinearRegressionWithSGD Author: MechCoder <[email protected]> Closes apache#6744 from MechCoder/spark-4127 and squashes the following commits: d8f6457 [MechCoder] Moved StreamingLinearAlgorithm to pyspark.mllib.regression d47cc24 [MechCoder] Inherit from StreamingLinearAlgorithm 1b4ddd6 [MechCoder] minor 4de6c68 [MechCoder] Minor refactor 5e85a3b [MechCoder] Add tests for simultaneous training and prediction fb27889 [MechCoder] Add example and docs 505380b [MechCoder] Add tests d42bdae [MechCoder] [SPARK-4127] Python bindings for StreamingLinearRegressionWithSGD
1 parent ada384b commit 4528166

File tree

4 files changed

+269
-47
lines changed

4 files changed

+269
-47
lines changed

docs/mllib-linear-methods.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,58 @@ will get better!
768768

769769
</div>
770770

771+
<div data-lang="python" markdown="1">
772+
773+
First, we import the necessary classes for parsing our input data and creating the model.
774+
775+
{% highlight python %}
776+
from pyspark.mllib.linalg import Vectors
777+
from pyspark.mllib.regression import LabeledPoint
778+
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD
779+
{% endhighlight %}
780+
781+
Then we make input streams for training and testing data. We assume a StreamingContext `ssc`
782+
has already been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing)
783+
for more info. For this example, we use labeled points in training and testing streams,
784+
but in practice you will likely want to use unlabeled vectors for test data.
785+
786+
{% highlight python %}
787+
def parse(lp):
788+
label = float(lp[lp.find('(') + 1: lp.find(',')])
789+
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
790+
return LabeledPoint(label, vec)
791+
792+
trainingData = ssc.textFileStream("/training/data/dir").map(parse).cache()
793+
testData = ssc.textFileStream("/testing/data/dir").map(parse)
794+
{% endhighlight %}
795+
796+
We create our model by initializing the weights to 0
797+
798+
{% highlight python %}
799+
numFeatures = 3
800+
model = StreamingLinearRegressionWithSGD()
801+
model.setInitialWeights([0.0, 0.0, 0.0])
802+
{% endhighlight %}
803+
804+
Now we register the streams for training and testing and start the job.
805+
806+
{% highlight python %}
807+
model.trainOn(trainingData)
808+
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
809+
810+
ssc.start()
811+
ssc.awaitTermination()
812+
{% endhighlight %}
813+
814+
We can now save text files with data to the training or testing folders.
815+
Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label
816+
and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir`
817+
the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions.
818+
As you feed more data to the training directory, the predictions
819+
will get better!
820+
821+
</div>
822+
771823
</div>
772824

773825

python/pyspark/mllib/classification.py

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
from pyspark.streaming import DStream
2525
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
2626
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
27-
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
27+
from pyspark.mllib.regression import (
28+
LabeledPoint, LinearModel, _regression_train_wrapper,
29+
StreamingLinearAlgorithm)
2830
from pyspark.mllib.util import Saveable, Loader, inherit_doc
2931

3032

@@ -585,55 +587,13 @@ def train(cls, data, lambda_=1.0):
585587
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
586588

587589

588-
class StreamingLinearAlgorithm(object):
589-
"""
590-
Base class that has to be inherited by any StreamingLinearAlgorithm.
591-
592-
Prevents reimplementation of methods predictOn and predictOnValues.
593-
"""
594-
def __init__(self, model):
595-
self._model = model
596-
597-
def latestModel(self):
598-
"""
599-
Returns the latest model.
600-
"""
601-
return self._model
602-
603-
def _validate(self, dstream):
604-
if not isinstance(dstream, DStream):
605-
raise TypeError(
606-
"dstream should be a DStream object, got %s" % type(dstream))
607-
if not self._model:
608-
raise ValueError(
609-
"Model must be intialized using setInitialWeights")
610-
611-
def predictOn(self, dstream):
612-
"""
613-
Make predictions on a dstream.
614-
615-
:return: Transformed dstream object.
616-
"""
617-
self._validate(dstream)
618-
return dstream.map(lambda x: self._model.predict(x))
619-
620-
def predictOnValues(self, dstream):
621-
"""
622-
Make predictions on a keyed dstream.
623-
624-
:return: Transformed dstream object.
625-
"""
626-
self._validate(dstream)
627-
return dstream.mapValues(lambda x: self._model.predict(x))
628-
629-
630590
@inherit_doc
631591
class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
632592
"""
633-
Run LogisticRegression with SGD on a stream of data.
593+
Run LogisticRegression with SGD on a batch of data.
634594
635595
The weights obtained at the end of training a stream are used as initial
636-
weights for the next stream.
596+
weights for the next batch.
637597
638598
:param stepSize: Step size for each iteration of gradient descent.
639599
:param numIterations: Number of iterations run for each batch of data.

python/pyspark/mllib/regression.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from numpy import array
2020

2121
from pyspark import RDD
22+
from pyspark.streaming.dstream import DStream
2223
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
2324
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
2425
from pyspark.mllib.util import Saveable, Loader
@@ -570,6 +571,95 @@ def train(cls, data, isotonic=True):
570571
return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)
571572

572573

574+
class StreamingLinearAlgorithm(object):
575+
"""
576+
Base class that has to be inherited by any StreamingLinearAlgorithm.
577+
578+
Prevents reimplementation of methods predictOn and predictOnValues.
579+
"""
580+
def __init__(self, model):
581+
self._model = model
582+
583+
def latestModel(self):
584+
"""
585+
Returns the latest model.
586+
"""
587+
return self._model
588+
589+
def _validate(self, dstream):
590+
if not isinstance(dstream, DStream):
591+
raise TypeError(
592+
"dstream should be a DStream object, got %s" % type(dstream))
593+
if not self._model:
594+
raise ValueError(
595+
"Model must be intialized using setInitialWeights")
596+
597+
def predictOn(self, dstream):
598+
"""
599+
Make predictions on a dstream.
600+
601+
:return: Transformed dstream object.
602+
"""
603+
self._validate(dstream)
604+
return dstream.map(lambda x: self._model.predict(x))
605+
606+
def predictOnValues(self, dstream):
607+
"""
608+
Make predictions on a keyed dstream.
609+
610+
:return: Transformed dstream object.
611+
"""
612+
self._validate(dstream)
613+
return dstream.mapValues(lambda x: self._model.predict(x))
614+
615+
616+
@inherit_doc
617+
class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
618+
"""
619+
Run LinearRegression with SGD on a batch of data.
620+
621+
The problem minimized is (1 / n_samples) * (y - weights'X)**2.
622+
After training on a batch of data, the weights obtained at the end of
623+
training are used as initial weights for the next batch.
624+
625+
:param: stepSize Step size for each iteration of gradient descent.
626+
:param: numIterations Total number of iterations run.
627+
:param: miniBatchFraction Fraction of data on which SGD is run for each
628+
iteration.
629+
"""
630+
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0):
631+
self.stepSize = stepSize
632+
self.numIterations = numIterations
633+
self.miniBatchFraction = miniBatchFraction
634+
self._model = None
635+
super(StreamingLinearRegressionWithSGD, self).__init__(
636+
model=self._model)
637+
638+
def setInitialWeights(self, initialWeights):
639+
"""
640+
Set the initial value of weights.
641+
642+
This must be set before running trainOn and predictOn
643+
"""
644+
initialWeights = _convert_to_vector(initialWeights)
645+
self._model = LinearRegressionModel(initialWeights, 0)
646+
return self
647+
648+
def trainOn(self, dstream):
649+
"""Train the model on the incoming dstream."""
650+
self._validate(dstream)
651+
652+
def update(rdd):
653+
# LinearRegressionWithSGD.train raises an error for an empty RDD.
654+
if not rdd.isEmpty():
655+
self._model = LinearRegressionWithSGD.train(
656+
rdd, self.numIterations, self.stepSize,
657+
self.miniBatchFraction, self._model.weights,
658+
self._model.intercept)
659+
660+
dstream.foreachRDD(update)
661+
662+
573663
def _test():
574664
import doctest
575665
from pyspark import SparkContext

python/pyspark/mllib/tests.py

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
from shutil import rmtree
2828

2929
from numpy import (
30-
array, array_equal, zeros, inf, random, exp, dot, all, mean)
30+
array, array_equal, zeros, inf, random, exp, dot, all, mean, abs)
3131
from numpy import sum as array_sum
32+
3233
from py4j.protocol import Py4JJavaError
3334

3435
if sys.version_info[:2] <= (2, 6):
@@ -45,8 +46,8 @@
4546
from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
4647
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
4748
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
48-
from pyspark.mllib.regression import LabeledPoint
4949
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
50+
from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
5051
from pyspark.mllib.random import RandomRDDs
5152
from pyspark.mllib.stat import Statistics
5253
from pyspark.mllib.feature import Word2Vec
@@ -56,6 +57,7 @@
5657
from pyspark.serializers import PickleSerializer
5758
from pyspark.streaming import StreamingContext
5859
from pyspark.sql import SQLContext
60+
from pyspark.streaming import StreamingContext
5961

6062
_have_scipy = False
6163
try:
@@ -1170,6 +1172,124 @@ def collect_errors(rdd):
11701172
self.assertTrue(errors[1] - errors[-1] > 0.3)
11711173

11721174

1175+
class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
1176+
1177+
def assertArrayAlmostEqual(self, array1, array2, dec):
1178+
for i, j in array1, array2:
1179+
self.assertAlmostEqual(i, j, dec)
1180+
1181+
def test_parameter_accuracy(self):
1182+
"""Test that coefs are predicted accurately by fitting on toy data."""
1183+
1184+
# Test that fitting (10*X1 + 10*X2), (X1, X2) gives coefficients
1185+
# (10, 10)
1186+
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
1187+
slr.setInitialWeights([0.0, 0.0])
1188+
xMean = [0.0, 0.0]
1189+
xVariance = [1.0 / 3.0, 1.0 / 3.0]
1190+
1191+
# Create ten batches with 100 sample points in each.
1192+
batches = []
1193+
for i in range(10):
1194+
batch = LinearDataGenerator.generateLinearInput(
1195+
0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1)
1196+
batches.append(sc.parallelize(batch))
1197+
1198+
input_stream = self.ssc.queueStream(batches)
1199+
t = time()
1200+
slr.trainOn(input_stream)
1201+
self.ssc.start()
1202+
self._ssc_wait(t, 10, 0.01)
1203+
self.assertArrayAlmostEqual(
1204+
slr.latestModel().weights.array, [10., 10.], 1)
1205+
self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
1206+
1207+
def test_parameter_convergence(self):
1208+
"""Test that the model parameters improve with streaming data."""
1209+
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
1210+
slr.setInitialWeights([0.0])
1211+
1212+
# Create ten batches with 100 sample points in each.
1213+
batches = []
1214+
for i in range(10):
1215+
batch = LinearDataGenerator.generateLinearInput(
1216+
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
1217+
batches.append(sc.parallelize(batch))
1218+
1219+
model_weights = []
1220+
input_stream = self.ssc.queueStream(batches)
1221+
input_stream.foreachRDD(
1222+
lambda x: model_weights.append(slr.latestModel().weights[0]))
1223+
t = time()
1224+
slr.trainOn(input_stream)
1225+
self.ssc.start()
1226+
self._ssc_wait(t, 10, 0.01)
1227+
1228+
model_weights = array(model_weights)
1229+
diff = model_weights[1:] - model_weights[:-1]
1230+
self.assertTrue(all(diff >= -0.1))
1231+
1232+
def test_prediction(self):
1233+
"""Test prediction on a model with weights already set."""
1234+
# Create a model with initial Weights equal to coefs
1235+
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
1236+
slr.setInitialWeights([10.0, 10.0])
1237+
1238+
# Create ten batches with 100 sample points in each.
1239+
batches = []
1240+
for i in range(10):
1241+
batch = LinearDataGenerator.generateLinearInput(
1242+
0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0],
1243+
100, 42 + i, 0.1)
1244+
batches.append(
1245+
sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
1246+
1247+
input_stream = self.ssc.queueStream(batches)
1248+
t = time()
1249+
output_stream = slr.predictOnValues(input_stream)
1250+
samples = []
1251+
output_stream.foreachRDD(lambda x: samples.append(x.collect()))
1252+
1253+
self.ssc.start()
1254+
self._ssc_wait(t, 5, 0.01)
1255+
1256+
# Test that mean absolute error on each batch is less than 0.1
1257+
for batch in samples:
1258+
true, predicted = zip(*batch)
1259+
self.assertTrue(mean(abs(array(true) - array(predicted))) < 0.1)
1260+
1261+
def test_train_prediction(self):
1262+
"""Test that error on test data improves as model is trained."""
1263+
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
1264+
slr.setInitialWeights([0.0])
1265+
1266+
# Create ten batches with 100 sample points in each.
1267+
batches = []
1268+
for i in range(10):
1269+
batch = LinearDataGenerator.generateLinearInput(
1270+
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
1271+
batches.append(sc.parallelize(batch))
1272+
1273+
predict_batches = [
1274+
b.map(lambda lp: (lp.label, lp.features)) for b in batches]
1275+
mean_absolute_errors = []
1276+
1277+
def func(rdd):
1278+
true, predicted = zip(*rdd.collect())
1279+
mean_absolute_errors.append(mean(abs(true) - abs(predicted)))
1280+
1281+
model_weights = []
1282+
input_stream = self.ssc.queueStream(batches)
1283+
output_stream = self.ssc.queueStream(predict_batches)
1284+
t = time()
1285+
slr.trainOn(input_stream)
1286+
output_stream = slr.predictOnValues(output_stream)
1287+
output_stream.foreachRDD(func)
1288+
self.ssc.start()
1289+
self._ssc_wait(t, 10, 0.01)
1290+
self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)
1291+
1292+
11731293
if __name__ == "__main__":
11741294
if not _have_scipy:
11751295
print("NOTE: Skipping SciPy tests as it does not seem to be installed")

0 commit comments

Comments
 (0)