Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update the ReliableKafkaReceiver unit test
  • Loading branch information
jerryshao committed Nov 11, 2014
commit 96c7a1dd97a93d2abb4daad99b7ec24e5114356a
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.streaming.kafka

import java.io.File

import scala.collection.mutable

import kafka.serializer.StringDecoder
Expand All @@ -25,6 +27,7 @@ import kafka.utils.{ZkUtils, ZKGroupTopicDirs}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.util.Utils

class ReliableKafkaStreamSuite extends KafkaStreamSuite {
import KafkaTestUtils._
Expand All @@ -35,6 +38,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
.setAppName(framework)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, batchDuration)
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
s"test-checkpoint${random.nextInt(10000)}"
Utils.registerShutdownDeleteDir(new File(checkpointDir))
ssc.checkpoint(checkpointDir)

val topic = "test"
val sent = Map("a" -> 1, "b" -> 1, "c" -> 1)
createTopic(topic)
Expand Down Expand Up @@ -73,6 +81,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
.setAppName(framework)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, batchDuration)
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
s"test-checkpoint${random.nextInt(10000)}"
Utils.registerShutdownDeleteDir(new File(checkpointDir))
ssc.checkpoint(checkpointDir)

val topic = "test"
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
createTopic(topic)
Expand Down Expand Up @@ -105,6 +118,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
.setAppName(framework)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, batchDuration)
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
s"test-checkpoint${random.nextInt(10000)}"
Utils.registerShutdownDeleteDir(new File(checkpointDir))
ssc.checkpoint(checkpointDir)

val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
topics.foreach { case (t, _) =>
Expand Down Expand Up @@ -133,61 +151,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
}

test("Verify offset commit when exception is met") {
val sparkConf = new SparkConf()
.setMaster(master)
.setAppName(framework)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
var ssc = new StreamingContext(
sparkConf.clone.set("spark.streaming.blockInterval", "10000"),
batchDuration)
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
topics.foreach { case (t, _) =>
createTopic(t)
produceAndSendMessage(t, sent)
}

val groupId = s"test-consumer-${random.nextInt(10000)}"

val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> groupId,
"auto.offset.reset" -> "smallest")

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY)
.foreachRDD(_ => throw new Exception)
try {
ssc.start()
ssc.awaitTermination(1000)
} catch {
case e: Exception =>
if (ssc != null) {
ssc.stop()
ssc = null
}
}
// Failed before putting to BM, so offset is not updated.
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }

// Restart to see if data is consumed from last checkpoint.
ssc = new StreamingContext(sparkConf, batchDuration)
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY)
.foreachRDD(_ => Unit)
ssc.start()
ssc.awaitTermination(3000)
ssc.stop()

topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
}

private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
assert(zkClient != null, "Zookeeper client is not initialized")

Expand Down