Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e768164
#2808 update kafka to version 0.8.2
Dec 7, 2014
d9dc2bc
Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.…
Dec 23, 2014
2e67c66
#SPARK-2808 Update to Kafka 0.8.2.0 GA from beta.
Feb 5, 2015
6953429
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
koeninger Feb 11, 2015
77de6c2
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Mar 18, 2015
407382e
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1
koeninger Mar 18, 2015
ed02d2c
[SPARK-2808][Streaming][Kafka] move default argument for api version …
koeninger Apr 15, 2015
1d10751
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Apr 27, 2015
c70ee43
[SPARK-2808][Streaming][Kafka] add more asserts to test, try to figur…
koeninger Apr 28, 2015
9edab4c
[SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins fail…
koeninger Apr 28, 2015
af6f3ec
[SPARK-2808][Streaming][Kafka] delay test until latest leader offset …
koeninger Apr 29, 2015
61b3464
[SPARK-2808][Streaming][Kafka] delay for second send in boundary cond…
koeninger Apr 29, 2015
3824ce3
[SPARK-2808][Streaming][Kafka] naming / comments per tdas
koeninger Apr 29, 2015
2b92d3f
[SPARK-2808][Streaming][Kafka] wait for leader offsets in the java te…
koeninger Apr 29, 2015
2712649
[SPARK-2808][Streaming][Kafka] add more logging to python test, see w…
koeninger Apr 29, 2015
115aeee
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Apr 29, 2015
4c4557f
[SPARK-2808][Streaming][Kafka] add even more logging to python test
koeninger Apr 30, 2015
1d896e2
[SPARK-2808][Streaming][Kafka] add even even more logging to python test
koeninger Apr 30, 2015
30d991d
[SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks p…
koeninger May 1, 2015
d4267e9
[SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script
koeninger May 1, 2015
1770abc
[SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to c…
koeninger May 1, 2015
e6dfaf6
[SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger…
koeninger May 1, 2015
803aa2c
[SPARK-2808][Streaming][Kafka] code cleanup per TD
koeninger May 1, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks p…
…ython 3 syntax
  • Loading branch information
koeninger committed May 1, 2015
commit 30d991da695b700352e0250995367238b5127424
49 changes: 0 additions & 49 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ class BasicOperationTests(PySparkStreamingTestCase):

def test_map(self):
"""Basic operation test for DStream.map."""
print >> sys.stderr, "test_map started"
input = [range(1, 5), range(5, 9), range(9, 13)]

def func(dstream):
Expand All @@ -152,7 +151,6 @@ def func(dstream):

def test_flatMap(self):
"""Basic operation test for DStream.faltMap."""
print >> sys.stderr, "test_flatMap started"
input = [range(1, 5), range(5, 9), range(9, 13)]

def func(dstream):
Expand All @@ -163,7 +161,6 @@ def func(dstream):

def test_filter(self):
"""Basic operation test for DStream.filter."""
print >> sys.stderr, "test_filter started"
input = [range(1, 5), range(5, 9), range(9, 13)]

def func(dstream):
Expand All @@ -173,7 +170,6 @@ def func(dstream):

def test_count(self):
"""Basic operation test for DStream.count."""
print >> sys.stderr, "test_count started"
input = [range(5), range(10), range(20)]

def func(dstream):
Expand All @@ -183,7 +179,6 @@ def func(dstream):

def test_reduce(self):
"""Basic operation test for DStream.reduce."""
print >> sys.stderr, "test_reduce started"
input = [range(1, 5), range(5, 9), range(9, 13)]

def func(dstream):
Expand All @@ -193,7 +188,6 @@ def func(dstream):

def test_reduceByKey(self):
"""Basic operation test for DStream.reduceByKey."""
print >> sys.stderr, "test_reduceByKey started"
input = [[("a", 1), ("a", 1), ("b", 1), ("b", 1)],
[("", 1), ("", 1), ("", 1), ("", 1)],
[(1, 1), (1, 1), (2, 1), (2, 1), (3, 1)]]
Expand All @@ -205,7 +199,6 @@ def func(dstream):

def test_mapValues(self):
"""Basic operation test for DStream.mapValues."""
print >> sys.stderr, "test_mapValues started"
input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
[(0, 4), (1, 1), (2, 2), (3, 3)],
[(1, 1), (2, 1), (3, 1), (4, 1)]]
Expand All @@ -219,7 +212,6 @@ def func(dstream):

def test_flatMapValues(self):
"""Basic operation test for DStream.flatMapValues."""
print >> sys.stderr, "test_flatMapValues started"
input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
[(0, 4), (1, 1), (2, 1), (3, 1)],
[(1, 1), (2, 1), (3, 1), (4, 1)]]
Expand All @@ -234,7 +226,6 @@ def func(dstream):

def test_glom(self):
"""Basic operation test for DStream.glom."""
print >> sys.stderr, "test_glom started"
input = [range(1, 5), range(5, 9), range(9, 13)]
rdds = [self.sc.parallelize(r, 2) for r in input]

Expand All @@ -245,7 +236,6 @@ def func(dstream):

def test_mapPartitions(self):
"""Basic operation test for DStream.mapPartitions."""
print >> sys.stderr, "test_mapPartitions started"
input = [range(1, 5), range(5, 9), range(9, 13)]
rdds = [self.sc.parallelize(r, 2) for r in input]

Expand All @@ -258,7 +248,6 @@ def f(iterator):

def test_countByValue(self):
"""Basic operation test for DStream.countByValue."""
print >> sys.stderr, "test_countByValue started"
input = [list(range(1, 5)) * 2, list(range(5, 7)) + list(range(5, 9)), ["a", "a", "b", ""]]

def func(dstream):
Expand All @@ -268,7 +257,6 @@ def func(dstream):

def test_groupByKey(self):
"""Basic operation test for DStream.groupByKey."""
print >> sys.stderr, "test_groupByKey started"
input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
[(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
[("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]
Expand All @@ -283,7 +271,6 @@ def func(dstream):

def test_combineByKey(self):
"""Basic operation test for DStream.combineByKey."""
print >> sys.stderr, "test_combineByKey started"
input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
[(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
[("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]
Expand All @@ -298,7 +285,6 @@ def add(a, b):
self._test_func(input, func, expected, sort=True)

def test_repartition(self):
print >> sys.stderr, "test_repartition started"
input = [range(1, 5), range(5, 9)]
rdds = [self.sc.parallelize(r, 2) for r in input]

Expand All @@ -308,7 +294,6 @@ def func(dstream):
self._test_func(rdds, func, expected)

def test_union(self):
print >> sys.stderr, "test_union started"
input1 = [range(3), range(5), range(6)]
input2 = [range(3, 6), range(5, 6)]

Expand All @@ -319,7 +304,6 @@ def func(d1, d2):
self._test_func(input1, func, expected, input2=input2)

def test_cogroup(self):
print >> sys.stderr, "test_cogroup started"
input = [[(1, 1), (2, 1), (3, 1)],
[(1, 1), (1, 1), (1, 1), (2, 1)],
[("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1)]]
Expand All @@ -336,7 +320,6 @@ def func(d1, d2):
self._test_func(input, func, expected, sort=True, input2=input2)

def test_join(self):
print >> sys.stderr, "test_join started"
input = [[('a', 1), ('b', 2)]]
input2 = [[('b', 3), ('c', 4)]]

Expand All @@ -347,7 +330,6 @@ def func(a, b):
self._test_func(input, func, expected, True, input2)

def test_left_outer_join(self):
print >> sys.stderr, "test_left_outer_join started"
input = [[('a', 1), ('b', 2)]]
input2 = [[('b', 3), ('c', 4)]]

Expand All @@ -358,7 +340,6 @@ def func(a, b):
self._test_func(input, func, expected, True, input2)

def test_right_outer_join(self):
print >> sys.stderr, "test_right_outer_join started"
input = [[('a', 1), ('b', 2)]]
input2 = [[('b', 3), ('c', 4)]]

Expand All @@ -369,7 +350,6 @@ def func(a, b):
self._test_func(input, func, expected, True, input2)

def test_full_outer_join(self):
print >> sys.stderr, "test_full_outer_join started"
input = [[('a', 1), ('b', 2)]]
input2 = [[('b', 3), ('c', 4)]]

Expand All @@ -380,7 +360,6 @@ def func(a, b):
self._test_func(input, func, expected, True, input2)

def test_update_state_by_key(self):
print >> sys.stderr, "test_update_state_by_key started"

def updater(vs, s):
if not s:
Expand All @@ -403,7 +382,6 @@ class WindowFunctionTests(PySparkStreamingTestCase):
timeout = 5

def test_window(self):
print >> sys.stderr, "test_window started"
input = [range(1), range(2), range(3), range(4), range(5)]

def func(dstream):
Expand All @@ -413,7 +391,6 @@ def func(dstream):
self._test_func(input, func, expected)

def test_count_by_window(self):
print >> sys.stderr, "test_count_by_window started"
input = [range(1), range(2), range(3), range(4), range(5)]

def func(dstream):
Expand All @@ -423,7 +400,6 @@ def func(dstream):
self._test_func(input, func, expected)

def test_count_by_window_large(self):
print >> sys.stderr, "test_count_by_window_large started"
input = [range(1), range(2), range(3), range(4), range(5), range(6)]

def func(dstream):
Expand All @@ -433,7 +409,6 @@ def func(dstream):
self._test_func(input, func, expected)

def test_count_by_value_and_window(self):
print >> sys.stderr, "test_count_by_value_and_window started"
input = [range(1), range(2), range(3), range(4), range(5), range(6)]

def func(dstream):
Expand All @@ -443,7 +418,6 @@ def func(dstream):
self._test_func(input, func, expected)

def test_group_by_key_and_window(self):
print >> sys.stderr, "test_group_by_key_and_window started"
input = [[('a', i)] for i in range(5)]

def func(dstream):
Expand All @@ -454,7 +428,6 @@ def func(dstream):
self._test_func(input, func, expected)

def test_reduce_by_invalid_window(self):
print >> sys.stderr, "test_reduce_by_invalid_window started"
input1 = [range(3), range(5), range(1), range(6)]
d1 = self.ssc.queueStream(input1)
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
Expand All @@ -471,28 +444,24 @@ def _add_input_stream(self):
self._collect(stream, 1, block=False)

def test_stop_only_streaming_context(self):
print >> sys.stderr, "test_stop_only_streaming_context started"
self._add_input_stream()
self.ssc.start()
self.ssc.stop(False)
self.assertEqual(len(self.sc.parallelize(range(5), 5).glom().collect()), 5)

def test_stop_multiple_times(self):
print >> sys.stderr, "test_stop_multiple_times started"
self._add_input_stream()
self.ssc.start()
self.ssc.stop(False)
self.ssc.stop(False)

def test_queue_stream(self):
print >> sys.stderr, "test_queue_stream started"
input = [list(range(i + 1)) for i in range(3)]
dstream = self.ssc.queueStream(input)
result = self._collect(dstream, 3)
self.assertEqual(input, result)

def test_text_file_stream(self):
print >> sys.stderr, "test_text_file_stream started"
d = tempfile.mkdtemp()
self.ssc = StreamingContext(self.sc, self.duration)
dstream2 = self.ssc.textFileStream(d).map(int)
Expand All @@ -506,7 +475,6 @@ def test_text_file_stream(self):
self.assertEqual([list(range(10)), list(range(10))], result)

def test_binary_records_stream(self):
print >> sys.stderr, "test_binary_records_stream started"
d = tempfile.mkdtemp()
self.ssc = StreamingContext(self.sc, self.duration)
dstream = self.ssc.binaryRecordsStream(d, 10).map(
Expand All @@ -521,7 +489,6 @@ def test_binary_records_stream(self):
self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result])

def test_union(self):
print >> sys.stderr, "test_union started"
input = [list(range(i + 1)) for i in range(3)]
dstream = self.ssc.queueStream(input)
dstream2 = self.ssc.queueStream(input)
Expand All @@ -531,7 +498,6 @@ def test_union(self):
self.assertEqual(expected, result)

def test_transform(self):
print >> sys.stderr, "test_transform started"
dstream1 = self.ssc.queueStream([[1]])
dstream2 = self.ssc.queueStream([[2]])
dstream3 = self.ssc.queueStream([[3]])
Expand All @@ -548,7 +514,6 @@ def func(rdds):
class CheckpointTests(unittest.TestCase):

def test_get_or_create(self):
print >> sys.stderr, "test_get_or_create started"
inputd = tempfile.mkdtemp()
outputd = tempfile.mkdtemp() + "/"

Expand Down Expand Up @@ -639,32 +604,26 @@ def _validateStreamResult(self, sendData, stream):

def _validateRddResult(self, sendData, rdd):
result = {}
print >> sys.stderr, "_validateRddResult started"
for i in rdd.map(lambda x: x[1]).collect():
result[i] = result.get(i, 0) + 1
self.assertEqual(sendData, result)

def test_kafka_stream(self):
"""Test the Python Kafka stream API."""
print >> sys.stderr, "test_kafka_stream started"
topic = self._randomTopic()
sendData = {"a": 3, "b": 5, "c": 10}

self._kafkaTestUtils.createTopic(topic)
print >> sys.stderr, "test_kafka_stream created topic"
self._kafkaTestUtils.sendMessages(topic, sendData)
print >> sys.stderr, "test_kafka_stream sent messages"
time.sleep(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the waitUntilLeaderOffset has been added to Scala, shouldnt they be added to Python as well? Maybe that then this arbitrary sleep wont be needed.


stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
{"auto.offset.reset": "smallest"})
print >> sys.stderr, "test_kafka_stream created stream"
self._validateStreamResult(sendData, stream)

def test_kafka_direct_stream(self):
"""Test the Python direct Kafka stream API."""
print >> sys.stderr, "test_kafka_direct_stream started"
topic = self._randomTopic()
sendData = {"a": 1, "b": 2, "c": 3}
kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
Expand All @@ -679,7 +638,6 @@ def test_kafka_direct_stream(self):
@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_from_offset(self):
"""Test the Python direct Kafka stream API with start offset specified."""
print >> sys.stderr, "test_kafka_direct_stream_from_offset started"
topic = self._randomTopic()
sendData = {"a": 1, "b": 2, "c": 3}
fromOffsets = {TopicAndPartition(topic, 0): long(0)}
Expand All @@ -694,24 +652,20 @@ def test_kafka_direct_stream_from_offset(self):
@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd(self):
"""Test the Python direct Kafka RDD API."""
print >> sys.stderr, "test_kafka_rdd started"
topic = self._randomTopic()
sendData = {"a": 1, "b": 2}
offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}

self._kafkaTestUtils.createTopic(topic)
print >> sys.stderr, "test_kafka_rdd created topic"
self._kafkaTestUtils.sendMessages(topic, sendData)
print >> sys.stderr, "test_kafka_rdd sent data"
time.sleep(5)
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd_with_leaders(self):
"""Test the Python direct Kafka RDD API with leaders."""
print >> sys.stderr, "test_kafka_rdd_with_leaders started"
topic = self._randomTopic()
sendData = {"a": 1, "b": 2, "c": 3}
offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
Expand All @@ -720,12 +674,9 @@ def test_kafka_rdd_with_leaders(self):
leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))}

self._kafkaTestUtils.createTopic(topic)
print >> sys.stderr, "test_kafka_rdd_with_leaders created topic"
self._kafkaTestUtils.sendMessages(topic, sendData)
print >> sys.stderr, "test_kafka_rdd_with_leaders sent data"
time.sleep(5)
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
print >> sys.stderr, "test_kafka_rdd_with_leaders created rdd"
self._validateRddResult(sendData, rdd)

if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rm -f $LOG_FILE
rm -rf metastore warehouse

function run_test() {
echo -en "Running test: $1 ... " | tee -a $LOG_FILE
echo -en "Running test: $1 ... " | tee -a $LOG_FILE 2>&1
start=$(date +"%s")
SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE

Expand Down