Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Minor updates.
  • Loading branch information
tdas committed Nov 13, 2014
commit d9a452cd3ffefc390622c147816d18a57d9428e4
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public void testKafkaStream() throws InterruptedException {
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
suiteBase.produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val data = Map("a" -> 10, "b" -> 10, "c" -> 10)

var topic: String = _

var groupId: String = _
var kafkaParams: Map[String, String] = _
var ssc: StreamingContext = _
var tempDirectory: File = null

before {
setupKafka()
topic = s"test-topic-${Random.nextInt(10000)}"
groupId = s"test-consumer-${Random.nextInt(10000)}"
kafkaParams = Map(
"zookeeper.connect" -> zkAddress,
Expand All @@ -78,6 +77,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter


test("Reliable Kafka input stream with single topic") {
var topic = "test-topic"
createTopic(topic)
produceAndSendMessage(topic, data)

Expand All @@ -95,7 +95,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
}
}
ssc.start()

eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
// A basic process verification for ReliableKafkaReceiver.
// Verify whether received message number is equal to the sent message number.
Expand All @@ -104,26 +103,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
data.keys.foreach { k => assert(data(k) === result(k).toInt) }
// Verify the offset number whether it is equal to the total message number.
assert(getCommitOffset(groupId, topic, 0) === Some(29L))

}
ssc.stop()
}
/*
test("Verify the offset commit") {
// Verify the correctness of offset commit mechanism.
createTopic(topic)
produceAndSendMessage(topic, data)

// Do this to consume all the message of this group/topic.
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
stream.foreachRDD(_ => Unit)
ssc.start()
eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
}
ssc.stop()
}
*/
test("Reliable Kafka input stream with multiple topics") {
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
topics.foreach { case (t, _) =>
Expand Down Expand Up @@ -152,7 +135,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
assert(zkClient != null, "Zookeeper client is not initialized")
val topicDirs = new ZKGroupTopicDirs(groupId, topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
val offset = ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
offset
ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
}
}