Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
76913e2
Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader
koeninger Nov 23, 2014
1d70625
WIP on kafka cluster
koeninger Nov 23, 2014
0b94b33
use dropWhile rather than filter to trim beginning of fetch response
koeninger Nov 24, 2014
4dafd1b
method to get leader offsets, switch rdd bound to being exclusive sta…
koeninger Nov 24, 2014
ce91c59
method to get consumer offsets, explicit error handling
koeninger Nov 24, 2014
7d050bc
methods to set consumer offsets and get topic metadata, switch back t…
koeninger Nov 24, 2014
783b477
update tests for kafka 8.1.1
koeninger Nov 25, 2014
29c6b43
cleanup logging
koeninger Nov 25, 2014
3c2a96a
fix scalastyle errors
koeninger Nov 25, 2014
4b078bf
differentiate between leader and consumer offsets in error message
koeninger Nov 25, 2014
8d7de4a
make sure leader offsets can be found even for leaders that arent in …
koeninger Nov 25, 2014
979da25
dont allow empty leader offsets to be returned
koeninger Nov 26, 2014
38bb727
give easy access to the parameters of a KafkaRDD
koeninger Dec 3, 2014
326ff3c
add some tests
koeninger Dec 16, 2014
6bf14f2
first attempt at a Kafka dstream that allows for exactly-once semantics
koeninger Dec 24, 2014
bcca8a4
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Dec 24, 2014
37d3053
make KafkaRDDPartition available to users so offsets can be committed…
koeninger Dec 25, 2014
cac63ee
additional testing, fix fencepost error
koeninger Dec 25, 2014
e09045b
[SPARK-4964] add foreachPartitionWithIndex, to avoid doing equivalent…
koeninger Dec 26, 2014
8bfd6c0
[SPARK-4964] configure rate limiting via spark.streaming.receiver.max…
koeninger Dec 30, 2014
1d50749
[SPARK-4964] code cleanup per tdas
koeninger Dec 30, 2014
adf99a6
[SPARK-4964] fix serialization issues for checkpointing
koeninger Jan 5, 2015
356c7cc
[SPARK-4964] code cleanup per helena
koeninger Jan 9, 2015
e93eb72
[SPARK-4964] refactor to add preferredLocations. depends on SPARK-4014
koeninger Jan 9, 2015
e86317b
[SPARK-4964] try seed brokers in random order to spread metadata requ…
koeninger Jan 10, 2015
0458e4e
[SPARK-4964] recovery of generated rdds from checkpoint
koeninger Jan 10, 2015
548d529
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Jan 14, 2015
c1bd6d9
[SPARK-4964] use newly available attemptNumber for correct retry beha…
koeninger Jan 14, 2015
d4a7cf7
[SPARK-4964] allow for use cases that need to override compute for cu…
koeninger Jan 14, 2015
bb80bbe
[SPARK-4964] scalastyle line length
koeninger Jan 26, 2015
2e67117
[SPARK-4964] one potential way of hiding most of the implementation, …
koeninger Jan 28, 2015
19406cc
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Jan 28, 2015
99d2eba
[SPARK-4964] Reduce level of nesting. If beginning is past end, its …
koeninger Jan 29, 2015
80fd6ae
[SPARK-4964] Rename createExactlyOnceStream so it isnt over-promising…
koeninger Jan 29, 2015
2b340d8
[SPARK-4964] refactor per TD feedback
koeninger Jan 30, 2015
9a838c2
[SPARK-4964] code cleanup, add more tests
koeninger Jan 30, 2015
0090553
[SPARK-4964] javafication of interfaces
koeninger Jan 30, 2015
9adaa0a
[SPARK-4964] formatting
koeninger Jan 30, 2015
4354bce
[SPARK-4964] per td, remove java interfaces, replace with final class…
koeninger Feb 3, 2015
825110f
[SPARK-4964] rename stuff per TD
koeninger Feb 3, 2015
8991017
[SPARK-4964] formatting
koeninger Feb 3, 2015
0df3ebe
[SPARK-4964] add comments per pwendell / dibbhatt
koeninger Feb 3, 2015
8c31855
[SPARK-4964] remove HasOffsetRanges interface from return types
koeninger Feb 4, 2015
59e29f6
[SPARK-4964] settle on "Direct" as a naming convention for the new st…
koeninger Feb 4, 2015
1dc2941
[SPARK-4964] silence ConsumerConfig warnings about broker connection …
koeninger Feb 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-4964] javafication of interfaces
  • Loading branch information
koeninger committed Jan 30, 2015
commit 0090553eba09240b6ad4cf508ea33503705b12d9
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kafka;

/** Something that has a collection of OffsetRanges */
public interface HasOffsetRanges {
/** array of OffsetRanges */
public OffsetRange[] offsetRanges();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,19 @@
* limitations under the License.
*/

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition
package org.apache.spark.streaming.kafka;

/** Host info for the leader of a Kafka TopicAndPartition */

trait Leader {
/** kafka topic name */
def topic: String
public interface Leader {
/** kafka topic name */
public String topic();

/** kafka partition id */
def partition: Int
public int partition();

/** kafka hostname */
def host: String
public String host();

/** kafka host's port */
def port: Int
}

private class LeaderImpl(
override val topic: String,
override val partition: Int,
override val host: String,
override val port: Int
) extends Leader

object Leader {
def create(
topic: String,
partition: Int,
host: String,
port: Int): Leader =
new LeaderImpl(
topic,
partition,
host,
port)

def create(
topicAndPartition: TopicAndPartition,
host: String,
port: Int): Leader =
new LeaderImpl(
topicAndPartition.topic,
topicAndPartition.partition,
host,
port)

public int port();
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,19 @@
* limitations under the License.
*/

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition
package org.apache.spark.streaming.kafka;

/** Represents a range of offsets from a single Kafka TopicAndPartition */
trait OffsetRange {
public interface OffsetRange {
/** kafka topic name */
def topic: String
public String topic();

/** kafka partition id */
def partition: Int
public int partition();

/** inclusive starting offset */
def fromOffset: Long
public long fromOffset();

/** exclusive ending offset */
def untilOffset: Long
}

/** Something that has a collection of OffsetRanges */
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}

private class OffsetRangeImpl(
override val topic: String,
override val partition: Int,
override val fromOffset: Long,
override val untilOffset: Long
) extends OffsetRange

object OffsetRange {
def create(
topic: String,
partition: Int,
fromOffset: Long,
untilOffset: Long): OffsetRange =
new OffsetRangeImpl(
topic,
partition,
fromOffset,
untilOffset)

def create(
topicAndPartition: TopicAndPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange =
new OffsetRangeImpl(
topicAndPartition.topic,
topicAndPartition.partition,
fromOffset,
untilOffset)

public long untilOffset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,75 @@ object KafkaUtils {
ok => ok
)
}

private class OffsetRangeImpl(
override val topic: String,
override val partition: Int,
override val fromOffset: Long,
override val untilOffset: Long) extends OffsetRange

/**
* Behaviorless container for a range of offsets from a single Kafka TopicAndPartition
* @param topic kafka topic name
* @param partition kafka partition id
* @param fromOffset inclusive starting offset
* @param untilOffset exclusive ending offset
*/
@Experimental
def createOffsetRange(
topic: String,
partition: Int,
fromOffset: Long,
untilOffset: Long): OffsetRange =
new OffsetRangeImpl(topic, partition, fromOffset, untilOffset)

/**
* Behaviorless container for a range of offsets from a single Kafka TopicAndPartition
* @param topicAndPartition kafka TopicAndPartition
* @param fromOffset inclusive starting offset
* @param untilOffset exclusive ending offset
*/
@Experimental
def createOffsetRange(
topicAndPartition: TopicAndPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange =
new OffsetRangeImpl(
topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)

private class LeaderImpl(
override val topic: String,
override val partition: Int,
override val host: String,
override val port: Int) extends Leader

/**
* Behaviorless container of host info for the leader of a Kafka TopicAndPartition
* @param topic kafka topic name
* @param partition kafka partition id
* @param host kafka hostname
* @param port kafka host's port
*/
@Experimental
def createLeader(topic: String, partition: Int, host: String, port: Int): Leader =
new LeaderImpl(topic,partition,
host,
port)

/**
* Behaviorless container of host info for the leader of a Kafka TopicAndPartition
* @param topicAndPartition kafka TopicAndPartition
* @param host kafka hostname
* @param port kafka host's port
*/
@Experimental
def createLeader(
topicAndPartition: TopicAndPartition,
host: String,
port: Int): Leader =
new LeaderImpl(
topicAndPartition.topic,
topicAndPartition.partition,
host,
port)
}