Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Initial commit for reliable Kafka receiver
  • Loading branch information
jerryshao committed Nov 11, 2014
commit dd9aeebe21a83a2eb75320318f43e183909f48f1
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
package org.apache.spark.streaming.kafka

import scala.collection.Map
import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag

import java.util.Properties
import java.util.concurrent.Executors

import kafka.consumer._
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties

Expand Down Expand Up @@ -51,11 +47,16 @@ class KafkaInputDStream[
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
reliableStoreEnabled: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
if (!reliableStoreEnabled) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[Receiver[(K, V)]]
} else {
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[Receiver[(K, V)]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make the ReliableKafkaReceiver extend Receiver[(K, V)], then this asInstanceOf is not necessary.

}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kafka

import scala.collection.Map
import scala.reflect.{classTag, ClassTag}

import java.util.Properties
import java.util.concurrent.Executors

import kafka.consumer._
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

private[streaming]
class KafkaReceiver[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends Receiver[Any](storageLevel) with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be probably be Receiver[(K, V)]. I know that this is how it is in the existing receiver, but this probably wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will fix this.


// Connection to Kafka
var consumerConnector: ConsumerConnector = null

def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
}
}

def onStart() {

logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))

// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))

val zkConnect = kafkaParams("zookeeper.connect")
// Create the connection to the cluster
logInfo("Connecting to Zookeeper: " + zkConnect)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)

// When auto.offset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
if (kafkaParams.contains("auto.offset.reset")) {
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]

// Create Threads for each Topic/Message Stream we are listening
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed unnecessary capitalization in threads, topic, message, stream

val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

val executorPool = Executors.newFixedThreadPool(topics.values.sum)
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
} finally {
executorPool.shutdown() // Just causes threads to terminate after work is done
}
}

// Handles Kafka Messages
private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
try {
for (msgAndMetadata <- stream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you replace this with a while loop? Scala's for loops are less efficient that plain old while loop.

http://dynamicsofprogramming.blogspot.co.uk/2013/01/loop-performance-and-local-variables-in.html

Not sure how much performance different it is, but better to use while loop any ways

store((msgAndMetadata.key, msgAndMetadata.message))
}
} catch {
case e: Throwable => logError("Error handling message; exiting", e)
}
}
}

// It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
// from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
// 'smallest'/'largest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
// scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
val dir = "/consumers/" + groupId
logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Scala string interpolation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi TD, this is a quite old code and should be removed since we already update Kafka to 0.8, would you mind taking a look at this PR #1420 , it is pending for a long while.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, sorry, I should merge that. But if I merge that, you will have to update this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I will.

val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
try {
zk.deleteRecursive(dir)
} catch {
case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
} finally {
zk.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, false, storageLevel)
}

/**
Expand Down Expand Up @@ -144,4 +144,71 @@ object KafkaUtils {
createStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}

def createReliableStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
: ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
createReliableStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, storageLevel)
}

def createReliableStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, true, storageLevel)
}

def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairReceiverInputDStream[String, String] = {
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}

def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[String, String] = {
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}

def createReliableStream[K, V, U <: Decoder[_], T <: Decoder[_]](
jssc: JavaStreamingContext,
keyTypeClass: Class[K],
valueTypeClass: Class[V],
keyDecoderClass: Class[U],
valueDecoderClass: Class[T],
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)

implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)

createReliableStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
}
Loading