Skip to content
177 changes: 129 additions & 48 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

from py4j.protocol import Py4JJavaError

if sys.version > '3':
basestring = str

if sys.version_info[:2] <= (2, 6):
try:
import unittest2 as unittest
Expand Down Expand Up @@ -86,9 +89,42 @@ def tearDown(self):
self.ssc.stop(False)

@staticmethod
def _ssc_wait(start_time, end_time, sleep_time):
while time() - start_time < end_time:
def _eventually(condition, timeout=30.0, catch_assertions=False):
"""
Wait a given amount of time for a condition to pass, else fail with an error.
This is a helper utility for streaming ML tests.
:param condition: Function that checks for termination conditions.
condition() can return:
- True: Conditions met. Return without error.
- other value: Conditions not met yet. Continue. Upon timeout,
include last such value in error message.
Note that this method may be called at any time during
streaming execution (e.g., even before any results
have been created).
:param timeout: Number of seconds to wait. Default 30 seconds.
:param catch_assertions: If False (default), do not catch AssertionErrors.
If True, catch AssertionErrors; continue, but save
error to throw upon timeout.
"""
start_time = time()
lastValue = None
while time() - start_time < timeout:
if catch_assertions:
try:
lastValue = condition()
except AssertionError as e:
lastValue = e
else:
lastValue = condition()
if lastValue is True:
return
sleep(0.01)
if isinstance(lastValue, AssertionError):
raise lastValue
else:
raise AssertionError(
"Test failed due to timeout after %g sec, with last condition returning: %s"
% (timeout, lastValue))


def _squared_distance(a, b):
Expand Down Expand Up @@ -999,10 +1035,13 @@ def test_accuracy_for_single_center(self):
[self.sc.parallelize(batch, 1) for batch in batches])
stkm.trainOn(input_stream)

t = time()
self.ssc.start()
self._ssc_wait(t, 10.0, 0.01)
self.assertEquals(stkm.latestModel().clusterWeights, [25.0])

def condition():
self.assertEquals(stkm.latestModel().clusterWeights, [25.0])
return True
self._eventually(condition, catch_assertions=True)

realCenters = array_sum(array(centers), axis=0)
for i in range(5):
modelCenters = stkm.latestModel().centers[0][i]
Expand All @@ -1027,7 +1066,7 @@ def test_trainOn_model(self):
stkm.setInitialCenters(
centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])

# Create a toy dataset by setting a tiny offest for each point.
# Create a toy dataset by setting a tiny offset for each point.
offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
batches = []
for offset in offsets:
Expand All @@ -1037,14 +1076,15 @@ def test_trainOn_model(self):
batches = [self.sc.parallelize(batch, 1) for batch in batches]
input_stream = self.ssc.queueStream(batches)
stkm.trainOn(input_stream)
t = time()
self.ssc.start()

# Give enough time to train the model.
self._ssc_wait(t, 6.0, 0.01)
finalModel = stkm.latestModel()
self.assertTrue(all(finalModel.centers == array(initCenters)))
self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
def condition():
finalModel = stkm.latestModel()
self.assertTrue(all(finalModel.centers == array(initCenters)))
self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
return True
self._eventually(condition, catch_assertions=True)

def test_predictOn_model(self):
"""Test that the model predicts correctly on toy data."""
Expand All @@ -1066,10 +1106,13 @@ def update(rdd):
result.append(rdd_collect)

predict_val.foreachRDD(update)
t = time()
self.ssc.start()
self._ssc_wait(t, 6.0, 0.01)
self.assertEquals(result, [[0], [1], [2], [3]])

def condition():
self.assertEquals(result, [[0], [1], [2], [3]])
return True

self._eventually(condition, catch_assertions=True)

def test_trainOn_predictOn(self):
"""Test that prediction happens on the updated model."""
Expand All @@ -1095,10 +1138,13 @@ def collect(rdd):
predict_stream = stkm.predictOn(input_stream)
predict_stream.foreachRDD(collect)

t = time()
self.ssc.start()
self._ssc_wait(t, 6.0, 0.01)
self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])

def condition():
self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
return True

self._eventually(condition, catch_assertions=True)


class LinearDataGeneratorTests(MLlibTestCase):
Expand Down Expand Up @@ -1156,11 +1202,14 @@ def test_parameter_accuracy(self):
slr.setInitialWeights([0.0])
slr.trainOn(input_stream)

t = time()
self.ssc.start()
self._ssc_wait(t, 20.0, 0.01)
rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
self.assertAlmostEqual(rel, 0.1, 1)

def condition():
rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
self.assertAlmostEqual(rel, 0.1, 1)
return True

self._eventually(condition, catch_assertions=True)

def test_convergence(self):
"""
Expand All @@ -1179,13 +1228,18 @@ def test_convergence(self):
input_stream.foreachRDD(
lambda x: models.append(slr.latestModel().weights[0]))

t = time()
self.ssc.start()
self._ssc_wait(t, 15.0, 0.01)

def condition():
self.assertEquals(len(models), len(input_batches))
return True

# We want all batches to finish for this test.
self._eventually(condition, 60.0, catch_assertions=True)

t_models = array(models)
diff = t_models[1:] - t_models[:-1]

# Test that weights improve with a small tolerance,
# Test that weights improve with a small tolerance
self.assertTrue(all(diff >= -0.1))
self.assertTrue(array_sum(diff > 0) > 1)

Expand All @@ -1208,9 +1262,13 @@ def test_predictions(self):
predict_stream = slr.predictOnValues(input_stream)
true_predicted = []
predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
t = time()
self.ssc.start()
self._ssc_wait(t, 5.0, 0.01)

def condition():
self.assertEquals(len(true_predicted), len(input_batches))
return True

self._eventually(condition, catch_assertions=True)

# Test that the accuracy error is no more than 0.4 on each batch.
for batch in true_predicted:
Expand Down Expand Up @@ -1242,12 +1300,17 @@ def collect_errors(rdd):
ps = slr.predictOnValues(predict_stream)
ps.foreachRDD(lambda x: collect_errors(x))

t = time()
self.ssc.start()
self._ssc_wait(t, 20.0, 0.01)

# Test that the improvement in error is atleast 0.3
self.assertTrue(errors[1] - errors[-1] > 0.3)
def condition():
# Test that the improvement in error is > 0.3
if len(errors) == len(predict_batches):
self.assertGreater(errors[1] - errors[-1], 0.3)
if len(errors) >= 3 and errors[1] - errors[-1] > 0.3:
return True
return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))

self._eventually(condition)


class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
Expand All @@ -1274,13 +1337,16 @@ def test_parameter_accuracy(self):
batches.append(sc.parallelize(batch))

input_stream = self.ssc.queueStream(batches)
t = time()
slr.trainOn(input_stream)
self.ssc.start()
self._ssc_wait(t, 10, 0.01)
self.assertArrayAlmostEqual(
slr.latestModel().weights.array, [10., 10.], 1)
self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)

def condition():
self.assertArrayAlmostEqual(
slr.latestModel().weights.array, [10., 10.], 1)
self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
return True

self._eventually(condition, catch_assertions=True)

def test_parameter_convergence(self):
"""Test that the model parameters improve with streaming data."""
Expand All @@ -1298,13 +1364,18 @@ def test_parameter_convergence(self):
input_stream = self.ssc.queueStream(batches)
input_stream.foreachRDD(
lambda x: model_weights.append(slr.latestModel().weights[0]))
t = time()
slr.trainOn(input_stream)
self.ssc.start()
self._ssc_wait(t, 10, 0.01)

model_weights = array(model_weights)
diff = model_weights[1:] - model_weights[:-1]
def condition():
self.assertEquals(len(model_weights), len(batches))
return True

# We want all batches to finish for this test.
self._eventually(condition, catch_assertions=True)

w = array(model_weights)
diff = w[1:] - w[:-1]
self.assertTrue(all(diff >= -0.1))

def test_prediction(self):
Expand All @@ -1323,13 +1394,18 @@ def test_prediction(self):
sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))

input_stream = self.ssc.queueStream(batches)
t = time()
output_stream = slr.predictOnValues(input_stream)
samples = []
output_stream.foreachRDD(lambda x: samples.append(x.collect()))

self.ssc.start()
self._ssc_wait(t, 5, 0.01)

def condition():
self.assertEquals(len(samples), len(batches))
return True

# We want all batches to finish for this test.
self._eventually(condition, catch_assertions=True)

# Test that mean absolute error on each batch is less than 0.1
for batch in samples:
Expand All @@ -1350,22 +1426,27 @@ def test_train_prediction(self):

predict_batches = [
b.map(lambda lp: (lp.label, lp.features)) for b in batches]
mean_absolute_errors = []
errors = []

def func(rdd):
true, predicted = zip(*rdd.collect())
mean_absolute_errors.append(mean(abs(true) - abs(predicted)))
errors.append(mean(abs(true) - abs(predicted)))

model_weights = []
input_stream = self.ssc.queueStream(batches)
output_stream = self.ssc.queueStream(predict_batches)
t = time()
slr.trainOn(input_stream)
output_stream = slr.predictOnValues(output_stream)
output_stream.foreachRDD(func)
self.ssc.start()
self._ssc_wait(t, 10, 0.01)
self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)

def condition():
if len(errors) == len(predict_batches):
self.assertGreater(errors[1] - errors[-1], 2)
if len(errors) >= 3 and errors[1] - errors[-1] > 2:
return True
return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))

self._eventually(condition)


class MLUtilsTests(MLlibTestCase):
Expand Down