Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add some comments
  • Loading branch information
jerryshao committed Nov 11, 2014
commit 0894aefaccf3e095107896b86964dcf628b66792
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*
*/
def createStream(
jssc: JavaStreamingContext,
Expand Down Expand Up @@ -145,6 +144,16 @@ object KafkaUtils {
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}

/**
* Create an reliable input stream that pulls messages from a Kafka Broker.
* @param ssc StreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def createReliableStream(
ssc: StreamingContext,
zkQuorum: String,
Expand All @@ -159,6 +168,15 @@ object KafkaUtils {
ssc, kafkaParams, topics, storageLevel)
}

/**
* Create an reliable input stream that pulls messages from a Kafka Broker.
* @param ssc StreamingContext object
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
def createReliableStream[
K: ClassTag,
V: ClassTag,
Expand All @@ -172,7 +190,16 @@ object KafkaUtils {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, true, storageLevel)
}

def createReliableStream(
/**
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
*/
def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
groupId: String,
Expand All @@ -181,6 +208,15 @@ object KafkaUtils {
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}

/**
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*/
def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
Expand All @@ -192,6 +228,19 @@ object KafkaUtils {
storageLevel)
}

/**
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
* @param jssc JavaStreamingContext object
* @param keyTypeClass Key type of RDD
* @param valueTypeClass value type of RDD
* @param keyDecoderClass Type of kafka key decoder
* @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel RDD storage level.
*/
def createReliableStream[K, V, U <: Decoder[_], T <: Decoder[_]](
jssc: JavaStreamingContext,
keyTypeClass: Class[K],
Expand Down