Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
76913e2
Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader
koeninger Nov 23, 2014
1d70625
WIP on kafka cluster
koeninger Nov 23, 2014
0b94b33
use dropWhile rather than filter to trim beginning of fetch response
koeninger Nov 24, 2014
4dafd1b
method to get leader offsets, switch rdd bound to being exclusive sta…
koeninger Nov 24, 2014
ce91c59
method to get consumer offsets, explicit error handling
koeninger Nov 24, 2014
7d050bc
methods to set consumer offsets and get topic metadata, switch back t…
koeninger Nov 24, 2014
783b477
update tests for kafka 8.1.1
koeninger Nov 25, 2014
29c6b43
cleanup logging
koeninger Nov 25, 2014
3c2a96a
fix scalastyle errors
koeninger Nov 25, 2014
4b078bf
differentiate between leader and consumer offsets in error message
koeninger Nov 25, 2014
8d7de4a
make sure leader offsets can be found even for leaders that arent in …
koeninger Nov 25, 2014
979da25
dont allow empty leader offsets to be returned
koeninger Nov 26, 2014
38bb727
give easy access to the parameters of a KafkaRDD
koeninger Dec 3, 2014
326ff3c
add some tests
koeninger Dec 16, 2014
6bf14f2
first attempt at a Kafka dstream that allows for exactly-once semantics
koeninger Dec 24, 2014
bcca8a4
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Dec 24, 2014
37d3053
make KafkaRDDPartition available to users so offsets can be committed…
koeninger Dec 25, 2014
cac63ee
additional testing, fix fencepost error
koeninger Dec 25, 2014
e09045b
[SPARK-4964] add foreachPartitionWithIndex, to avoid doing equivalent…
koeninger Dec 26, 2014
8bfd6c0
[SPARK-4964] configure rate limiting via spark.streaming.receiver.max…
koeninger Dec 30, 2014
1d50749
[SPARK-4964] code cleanup per tdas
koeninger Dec 30, 2014
adf99a6
[SPARK-4964] fix serialization issues for checkpointing
koeninger Jan 5, 2015
356c7cc
[SPARK-4964] code cleanup per helena
koeninger Jan 9, 2015
e93eb72
[SPARK-4964] refactor to add preferredLocations. depends on SPARK-4014
koeninger Jan 9, 2015
e86317b
[SPARK-4964] try seed brokers in random order to spread metadata requ…
koeninger Jan 10, 2015
0458e4e
[SPARK-4964] recovery of generated rdds from checkpoint
koeninger Jan 10, 2015
548d529
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Jan 14, 2015
c1bd6d9
[SPARK-4964] use newly available attemptNumber for correct retry beha…
koeninger Jan 14, 2015
d4a7cf7
[SPARK-4964] allow for use cases that need to override compute for cu…
koeninger Jan 14, 2015
bb80bbe
[SPARK-4964] scalastyle line length
koeninger Jan 26, 2015
2e67117
[SPARK-4964] one potential way of hiding most of the implementation, …
koeninger Jan 28, 2015
19406cc
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Jan 28, 2015
99d2eba
[SPARK-4964] Reduce level of nesting. If beginning is past end, its …
koeninger Jan 29, 2015
80fd6ae
[SPARK-4964] Rename createExactlyOnceStream so it isnt over-promising…
koeninger Jan 29, 2015
2b340d8
[SPARK-4964] refactor per TD feedback
koeninger Jan 30, 2015
9a838c2
[SPARK-4964] code cleanup, add more tests
koeninger Jan 30, 2015
0090553
[SPARK-4964] javafication of interfaces
koeninger Jan 30, 2015
9adaa0a
[SPARK-4964] formatting
koeninger Jan 30, 2015
4354bce
[SPARK-4964] per td, remove java interfaces, replace with final class…
koeninger Feb 3, 2015
825110f
[SPARK-4964] rename stuff per TD
koeninger Feb 3, 2015
8991017
[SPARK-4964] formatting
koeninger Feb 3, 2015
0df3ebe
[SPARK-4964] add comments per pwendell / dibbhatt
koeninger Feb 3, 2015
8c31855
[SPARK-4964] remove HasOffsetRanges interface from return types
koeninger Feb 4, 2015
59e29f6
[SPARK-4964] settle on "Direct" as a naming convention for the new st…
koeninger Feb 4, 2015
1dc2941
[SPARK-4964] silence ConsumerConfig warnings about broker connection …
koeninger Feb 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-4964] settle on "Direct" as a naming convention for the new st…
…ream
  • Loading branch information
koeninger committed Feb 4, 2015
commit 59e29f61cd6a730eeea4e47a5316cbbe47615618
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.streaming.dstream._
* @param maxRetries maximum number of times in a row to retry getting leaders' offsets
*/
private[streaming]
class DeterministicKafkaInputDStream[
class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
Expand All @@ -68,7 +68,7 @@ class DeterministicKafkaInputDStream[
"spark.streaming.kafka.maxRetries", 1)

protected[streaming] override val checkpointData =
new DeterministicKafkaInputDStreamCheckpointData
new DirectKafkaInputDStreamCheckpointData

protected val kc = new KafkaCluster(kafkaParams)

Expand Down Expand Up @@ -129,7 +129,7 @@ class DeterministicKafkaInputDStream[
}

private[streaming]
class DeterministicKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
def batchForTime = data.asInstanceOf[mutable.HashMap[
Time, Array[OffsetRange.OffsetRangeTuple]]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ object KafkaUtils {
* starting point of the stream
*/
@Experimental
def createNewStream[
def createDirectStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
Expand All @@ -257,7 +257,7 @@ object KafkaUtils {
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
new DeterministicKafkaInputDStream[K, V, U, T, R](
new DirectKafkaInputDStream[K, V, U, T, R](
ssc, kafkaParams, fromOffsets, messageHandler)
}

Expand Down Expand Up @@ -289,7 +289,7 @@ object KafkaUtils {
* @param topics names of the topics to consume
*/
@Experimental
def createNewStream[
def createDirectStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
Expand All @@ -313,7 +313,7 @@ object KafkaUtils {
val fromOffsets = leaderOffsets.map { case (tp, lo) =>
(tp, lo.offset)
}
new DeterministicKafkaInputDStream[K, V, U, T, (K, V)](
new DirectKafkaInputDStream[K, V, U, T, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}).fold(
errs => throw new SparkException(errs.mkString("\n")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

class KafkaNewStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
Expand Down Expand Up @@ -63,7 +63,7 @@ class KafkaNewStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with
createTopic(t)
produceAndSendMessage(t, data)
}
val stream = KafkaUtils.createNewStream[String, String, StringDecoder, StringDecoder](
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
var total = 0L;

Expand Down