Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made…
… Java more robust.
  • Loading branch information
tdas committed Nov 13, 2014
commit eae4ad606e60f940a1537feea01e0c2cc4fb6ae8
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,16 @@ public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
);

ssc.start();
ssc.awaitTermination(3000);

long startTime = System.currentTimeMillis();
boolean sizeMatches = false;
while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
sizeMatches = sent.size() == result.size();
Thread.sleep(200);
}
Assert.assertEquals(sent.size(), result.size());
for (String k : sent.keySet()) {
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
}
ssc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ import org.apache.spark.util.Utils
* This is an abstract base class for Kafka testsuites. This has the functionality to set up
* and tear down local Kafka servers, and to push data using Kafka producers.
*/
abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
import KafkaTestUtils._
abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {

var zkAddress: String = _
var zkClient: ZkClient = _
Expand Down Expand Up @@ -78,7 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
val brokerProps = getBrokerConfig(brokerPort, zkAddress)
val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
Expand Down Expand Up @@ -134,111 +133,43 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
waitUntilMetadataIsPropagated(topic, 0)
}

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
producer.send(createTestMessage(topic, sent): _*)
producer.close()
logInfo("==================== 6 ====================")
}
}

class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
var ssc: StreamingContext = _

before {
setupKafka()
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
tearDownKafka()
}

test("Kafka input stream") {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map { case (k, v) => v }
.countByValue()
.foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
}
}
ssc.start()
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
assert(sent.size === result.size)
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
}

ssc.stop()
}
}


object KafkaTestUtils {

def getBrokerConfig(port: Int, zkConnect: String): Properties = {
private def getBrokerConfig(): Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("port", brokerPort.toString)
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
props.put("zookeeper.connect", zkConnect)
props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props
}

def getProducerConfig(brokerList: String): Properties = {
private def getProducerConfig(): Properties = {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
val props = new Properties()
props.put("metadata.broker.list", brokerList)
props.put("metadata.broker.list", brokerAddr)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}

def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
val startTime = System.currentTimeMillis()
while (true) {
if (condition())
return true
if (System.currentTimeMillis() > startTime + waitTime)
return false
Thread.sleep(waitTime.min(100L))
private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
assert(
server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
}
// Should never go to here
throw new RuntimeException("unexpected error")
}

def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
timeout: Long) {
assert(waitUntilTrue(() =>
servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
TopicAndPartition(topic, partition))), timeout),
s"Partition [$topic, $partition] metadata not propagated after timeout")
}

class EmbeddedZookeeper(val zkConnect: String) {
Expand All @@ -264,3 +195,53 @@ object KafkaTestUtils {
}
}
}


class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
var ssc: StreamingContext = _

before {
setupKafka()
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
tearDownKafka()
}

test("Kafka input stream") {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map(_._2).countByValue().foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
}
}
ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sent.size === result.size)
sent.keys.foreach { k =>
assert(sent(k) === result(k).toInt)
}
}
ssc.stop()
}
}