Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
76913e2
Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader
koeninger Nov 23, 2014
1d70625
WIP on kafka cluster
koeninger Nov 23, 2014
0b94b33
use dropWhile rather than filter to trim beginning of fetch response
koeninger Nov 24, 2014
4dafd1b
method to get leader offsets, switch rdd bound to being exclusive sta…
koeninger Nov 24, 2014
ce91c59
method to get consumer offsets, explicit error handling
koeninger Nov 24, 2014
7d050bc
methods to set consumer offsets and get topic metadata, switch back t…
koeninger Nov 24, 2014
783b477
update tests for kafka 8.1.1
koeninger Nov 25, 2014
29c6b43
cleanup logging
koeninger Nov 25, 2014
3c2a96a
fix scalastyle errors
koeninger Nov 25, 2014
4b078bf
differentiate between leader and consumer offsets in error message
koeninger Nov 25, 2014
8d7de4a
make sure leader offsets can be found even for leaders that arent in …
koeninger Nov 25, 2014
979da25
dont allow empty leader offsets to be returned
koeninger Nov 26, 2014
38bb727
give easy access to the parameters of a KafkaRDD
koeninger Dec 3, 2014
326ff3c
add some tests
koeninger Dec 16, 2014
6bf14f2
first attempt at a Kafka dstream that allows for exactly-once semantics
koeninger Dec 24, 2014
bcca8a4
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Dec 24, 2014
37d3053
make KafkaRDDPartition available to users so offsets can be committed…
koeninger Dec 25, 2014
cac63ee
additional testing, fix fencepost error
koeninger Dec 25, 2014
e09045b
[SPARK-4964] add foreachPartitionWithIndex, to avoid doing equivalent…
koeninger Dec 26, 2014
8bfd6c0
[SPARK-4964] configure rate limiting via spark.streaming.receiver.max…
koeninger Dec 30, 2014
1d50749
[SPARK-4964] code cleanup per tdas
koeninger Dec 30, 2014
adf99a6
[SPARK-4964] fix serialization issues for checkpointing
koeninger Jan 5, 2015
356c7cc
[SPARK-4964] code cleanup per helena
koeninger Jan 9, 2015
e93eb72
[SPARK-4964] refactor to add preferredLocations. depends on SPARK-4014
koeninger Jan 9, 2015
e86317b
[SPARK-4964] try seed brokers in random order to spread metadata requ…
koeninger Jan 10, 2015
0458e4e
[SPARK-4964] recovery of generated rdds from checkpoint
koeninger Jan 10, 2015
548d529
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Jan 14, 2015
c1bd6d9
[SPARK-4964] use newly available attemptNumber for correct retry beha…
koeninger Jan 14, 2015
d4a7cf7
[SPARK-4964] allow for use cases that need to override compute for cu…
koeninger Jan 14, 2015
bb80bbe
[SPARK-4964] scalastyle line length
koeninger Jan 26, 2015
2e67117
[SPARK-4964] one potential way of hiding most of the implementation, …
koeninger Jan 28, 2015
19406cc
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Jan 28, 2015
99d2eba
[SPARK-4964] Reduce level of nesting. If beginning is past end, its …
koeninger Jan 29, 2015
80fd6ae
[SPARK-4964] Rename createExactlyOnceStream so it isnt over-promising…
koeninger Jan 29, 2015
2b340d8
[SPARK-4964] refactor per TD feedback
koeninger Jan 30, 2015
9a838c2
[SPARK-4964] code cleanup, add more tests
koeninger Jan 30, 2015
0090553
[SPARK-4964] javafication of interfaces
koeninger Jan 30, 2015
9adaa0a
[SPARK-4964] formatting
koeninger Jan 30, 2015
4354bce
[SPARK-4964] per td, remove java interfaces, replace with final class…
koeninger Feb 3, 2015
825110f
[SPARK-4964] rename stuff per TD
koeninger Feb 3, 2015
8991017
[SPARK-4964] formatting
koeninger Feb 3, 2015
0df3ebe
[SPARK-4964] add comments per pwendell / dibbhatt
koeninger Feb 3, 2015
8c31855
[SPARK-4964] remove HasOffsetRanges interface from return types
koeninger Feb 4, 2015
59e29f6
[SPARK-4964] settle on "Direct" as a naming convention for the new st…
koeninger Feb 4, 2015
1dc2941
[SPARK-4964] silence ConsumerConfig warnings about broker connection …
koeninger Feb 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP on kafka cluster
  • Loading branch information
koeninger committed Nov 23, 2014
commit 1d706257ac848d37caeaff0409bf60b080d66e48
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd.kafka

import scala.util.control.NonFatal
import java.util.Properties
import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
import kafka.consumer.{ConsumerConfig, SimpleConsumer}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you turn all scaladoc style into javadoc for this pr?

* Convenience methods for interacting with a Kafka cluster.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/
class KafkaCluster(val kafkaParams: Map[String, String]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be good to make this private[spark] and keep it as an internal utility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rdd would be really unpleasant to actually use without the convenience methods exposed by KafkaCluster, especially if you're keeping your offsets in zookeeper and doing idempotent writes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - sorry let me look more, I didn't realize this is necessary for users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala#L60

We also use it for doing things like e.g. starting a stream at the leader offsets before a given time

val brokers: Array[(String, Int)] =
kafkaParams.get("metadata.broker.list")
.orElse(kafkaParams.get("bootstrap.servers"))
.getOrElse(throw new Exception("Must specify metadata.broker.list or bootstrap.servers"))
.split(",").map { hp =>
val hpa = hp.split(":")
(hpa(0), hpa(1).toInt)
}

val config: ConsumerConfig = KafkaCluster.consumerConfig(kafkaParams)

def connect(host: String, port: Int): SimpleConsumer =
new SimpleConsumer(host, port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId)

def connect(hostAndPort: (String, Int)): SimpleConsumer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better remove this method since it doesn't do much ...

connect(hostAndPort._1, hostAndPort._2)

def connectLeader(topic: String, partition: Int): Option[SimpleConsumer] =
findLeader(topic, partition).map(connect)

def findLeader(topic: String, partition: Int): Option[(String, Int)] = {
brokers.foreach { hp =>
var consumer: SimpleConsumer = null
try {
consumer = connect(hp)
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, Seq(topic))
val resp: TopicMetadataResponse = consumer.send(req)
resp.topicsMetadata.find(_.topic == topic).flatMap { t =>
t.partitionsMetadata.find(_.partitionId == partition)
}.foreach { partitionMeta =>
partitionMeta.leader.foreach { leader =>
return Some((leader.host, leader.port))
}
}
} catch {
case NonFatal(e) =>
} finally {
if (consumer != null) consumer.close()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you can stay away from java nulls

var consumer: Option[SimpleConsumer] = None
try {
  consumer = Some(connect(hp))
  consumer map (c => fn(c)) 
} catch {
  case NonFatal(e) => errs.append(e)
} finally consumer map (_.close())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the feedback, but it's impossible to "stay away from java nulls" in a jvm language, without runtime checks.

Despite propaganda to the contrary, option.map is not a replacement for null checks.

The code you wrote can still throw a null pointer exception (if SimpleConsumer returns null, for instance).

You can hide the null pointer check inside of Option.apply instead of using Some, but at that point I'd rather just be explicit about what is going on.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, it could because I was not conclusive on the suggestion adding None where applicable. catch => None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the use of null here is fine, and very clear. the pattern matching with the finally actually makes it much harder to understand what is going on.

one nitpick, you need to put curly braces around consumer.close(), i.e.

if (consumer != null) {
  consumer.close()
}

}
None
}
}

object KafkaCluster {
/** Make a consumer config without requiring group.id or zookeeper.connect,
* since communicating with brokers also needs common settings such as timeout
*/
def consumerConfig(kafkaParams: Map[String, String]): ConsumerConfig = {
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
Seq("zookeeper.connect", "group.id").foreach { s =>
if (!props.contains(s))
props.setProperty(s, "")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

containsKey is less expensive than contains

 props.putAll(scala.collection.JavaConversions.mapAsJavaMap (kafkaParams)) 
 Seq("zookeeper.connect", "group.id").collect { 
  case s if props containsKey s => props.setProperty(s, "") 
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't use the collect (even though it allows you to do the partial function) because it is actually obfuscating the intention here, because it is perceived as a filter. foreach makes more sense.

new ConsumerConfig(props)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[spark] case class KafkaRDDPartition(
untilOffset: Long
) extends Partition

/** A batch-oriented interface to Kafka.
/** A batch-oriented interface for consuming from Kafka.
* Each given Kafka topic/partition corresponds to an RDD partition.
* Starting and ending offsets are specified in advance, so that you can control exactly-once semantics.
* For an easy interface to Kafka-managed offsets, see {@link org.apache.spark.rdd.kafka.KafkaCluster}
Expand Down Expand Up @@ -74,34 +74,31 @@ class KafkaRDD[
override def compute(thePart: Partition, context: TaskContext) = new NextIterator[R] {
context.addTaskCompletionListener{ context => closeIfNeeded() }

val kc = new KafkaCluster(kafkaParams)
val part = thePart.asInstanceOf[KafkaRDDPartition]
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
val fetchSize = Option(props.getProperty("fetch.message.max.bytes")).map(_.toInt).getOrElse(1024*1024)
val leaderBackoff = Option(props.getProperty("refresh.leader.backoff.ms")).map(_.toLong).getOrElse(200L)
val consumerConfig = new ConsumerConfig(props)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.newInstance(kc.config.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.newInstance(kc.config.props)
.asInstanceOf[Decoder[V]]
val consumer: SimpleConsumer = ???
val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition)
.getOrElse(throw new Exception(s"Couldn't connect to leader for topic ${part.topic} ${part.partition}"))
var requestOffset = part.fromOffset
var iter: Iterator[MessageAndOffset] = null

override def getNext: R = {
if (iter == null || !iter.hasNext) {
val req = new FetchRequestBuilder().
addFetch(part.topic, part.partition, requestOffset, fetchSize).
addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes).
build()
val resp = consumer.fetch(req)
if (resp.hasError) {
val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode) {
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, sleeping for ${leaderBackoff}ms")
Thread.sleep(leaderBackoff)
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
Thread.sleep(kc.config.refreshLeaderBackoffMs)
}
// Let normal rdd retry sort out reconnect attempts
throw ErrorMapping.exceptionFor(err)
Expand Down