Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e768164
#2808 update kafka to version 0.8.2
Dec 7, 2014
d9dc2bc
Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.…
Dec 23, 2014
2e67c66
#SPARK-2808 Update to Kafka 0.8.2.0 GA from beta.
Feb 5, 2015
6953429
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
koeninger Feb 11, 2015
77de6c2
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Mar 18, 2015
407382e
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1
koeninger Mar 18, 2015
ed02d2c
[SPARK-2808][Streaming][Kafka] move default argument for api version …
koeninger Apr 15, 2015
1d10751
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Apr 27, 2015
c70ee43
[SPARK-2808][Streaming][Kafka] add more asserts to test, try to figur…
koeninger Apr 28, 2015
9edab4c
[SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins fail…
koeninger Apr 28, 2015
af6f3ec
[SPARK-2808][Streaming][Kafka] delay test until latest leader offset …
koeninger Apr 29, 2015
61b3464
[SPARK-2808][Streaming][Kafka] delay for second send in boundary cond…
koeninger Apr 29, 2015
3824ce3
[SPARK-2808][Streaming][Kafka] naming / comments per tdas
koeninger Apr 29, 2015
2b92d3f
[SPARK-2808][Streaming][Kafka] wait for leader offsets in the java te…
koeninger Apr 29, 2015
2712649
[SPARK-2808][Streaming][Kafka] add more logging to python test, see w…
koeninger Apr 29, 2015
115aeee
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Apr 29, 2015
4c4557f
[SPARK-2808][Streaming][Kafka] add even more logging to python test
koeninger Apr 30, 2015
1d896e2
[SPARK-2808][Streaming][Kafka] add even even more logging to python test
koeninger Apr 30, 2015
30d991d
[SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks p…
koeninger May 1, 2015
d4267e9
[SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script
koeninger May 1, 2015
1770abc
[SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to c…
koeninger May 1, 2015
e6dfaf6
[SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger…
koeninger May 1, 2015
803aa2c
[SPARK-2808][Streaming][Kafka] code cleanup per TD
koeninger May 1, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-2808][Streaming][Kafka] add even even more logging to python test
  • Loading branch information
koeninger committed Apr 30, 2015
commit 1d896e2a262569089b80b5cc018b3dff8121368e
14 changes: 9 additions & 5 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
duration = 1

def setUp(self):
print >> sys.stderr, "KafkaStreamTests setUp started"
super(KafkaStreamTests, self).setUp()

kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
Expand Down Expand Up @@ -640,9 +639,9 @@ def _validateStreamResult(self, sendData, stream):

def _validateRddResult(self, sendData, rdd):
result = {}
print >> sys.stderr, "_validateRddResult started"
for i in rdd.map(lambda x: x[1]).collect():
result[i] = result.get(i, 0) + 1

self.assertEqual(sendData, result)

def test_kafka_stream(self):
Expand All @@ -655,7 +654,7 @@ def test_kafka_stream(self):
print >> sys.stderr, "test_kafka_stream created topic"
self._kafkaTestUtils.sendMessages(topic, sendData)
print >> sys.stderr, "test_kafka_stream sent messages"
time.sleep(1)
time.sleep(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the waitUntilLeaderOffset has been added to Scala, shouldnt they be added to Python as well? Maybe that then this arbitrary sleep wont be needed.


stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
Expand Down Expand Up @@ -702,8 +701,10 @@ def test_kafka_rdd(self):
kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}

self._kafkaTestUtils.createTopic(topic)
print >> sys.stderr, "test_kafka_rdd created topic"
self._kafkaTestUtils.sendMessages(topic, sendData)

print >> sys.stderr, "test_kafka_rdd sent data"
time.sleep(5)
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)

Expand All @@ -719,9 +720,12 @@ def test_kafka_rdd_with_leaders(self):
leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))}

self._kafkaTestUtils.createTopic(topic)
print >> sys.stderr, "test_kafka_rdd_with_leaders created topic"
self._kafkaTestUtils.sendMessages(topic, sendData)

print >> sys.stderr, "test_kafka_rdd_with_leaders sent data"
time.sleep(5)
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
print >> sys.stderr, "test_kafka_rdd_with_leaders created rdd"
self._validateRddResult(sendData, rdd)

if __name__ == "__main__":
Expand Down