-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4964] [Streaming] Exactly-once semantics for Kafka #3798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
76913e2
1d70625
0b94b33
4dafd1b
ce91c59
7d050bc
783b477
29c6b43
3c2a96a
4b078bf
8d7de4a
979da25
38bb727
326ff3c
6bf14f2
bcca8a4
37d3053
cac63ee
e09045b
8bfd6c0
1d50749
adf99a6
356c7cc
e93eb72
e86317b
0458e4e
548d529
c1bd6d9
d4a7cf7
bb80bbe
2e67117
19406cc
99d2eba
80fd6ae
2b340d8
9a838c2
0090553
9adaa0a
4354bce
825110f
8991017
0df3ebe
8c31855
59e29f6
1dc2941
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,8 +18,9 @@ | |
| package org.apache.spark.rdd.kafka | ||
|
|
||
| import scala.util.control.NonFatal | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import java.util.Properties | ||
| import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetRequestInfo, TopicMetadataRequest, TopicMetadataResponse} | ||
| import kafka.api.{OffsetRequest, OffsetResponse, OffsetFetchRequest, OffsetFetchResponse, PartitionOffsetRequestInfo, TopicMetadataRequest, TopicMetadataResponse} | ||
| import kafka.common.{ErrorMapping, TopicAndPartition} | ||
| import kafka.consumer.{ConsumerConfig, SimpleConsumer} | ||
|
|
||
|
|
@@ -30,6 +31,8 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer} | |
| * NOT zookeeper servers, specified in host1:port1,host2:port2 form | ||
| */ | ||
| class KafkaCluster(val kafkaParams: Map[String, String]) { | ||
| type Err = ArrayBuffer[Throwable] | ||
|
|
||
| val brokers: Array[(String, Int)] = | ||
| kafkaParams.get("metadata.broker.list") | ||
| .orElse(kafkaParams.get("bootstrap.servers")) | ||
|
|
@@ -47,79 +50,113 @@ class KafkaCluster(val kafkaParams: Map[String, String]) { | |
| def connect(hostAndPort: (String, Int)): SimpleConsumer = | ||
|
||
| connect(hostAndPort._1, hostAndPort._2) | ||
|
|
||
| def connectLeader(topic: String, partition: Int): Option[SimpleConsumer] = | ||
| findLeader(topic, partition).map(connect) | ||
| def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = | ||
| findLeader(topic, partition).right.map(connect) | ||
|
|
||
| def findLeader(topic: String, partition: Int): Option[(String, Int)] = { | ||
| def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { | ||
| val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, Seq(topic)) | ||
| brokers.foreach { hp => | ||
| var consumer: SimpleConsumer = null | ||
| try { | ||
| consumer = connect(hp) | ||
| val resp: TopicMetadataResponse = consumer.send(req) | ||
| resp.topicsMetadata.find(_.topic == topic).flatMap { t => | ||
| t.partitionsMetadata.find(_.partitionId == partition) | ||
| }.foreach { partitionMeta => | ||
| partitionMeta.leader.foreach { leader => | ||
| return Some((leader.host, leader.port)) | ||
| } | ||
| val errs = new Err | ||
| withBrokers(errs) { consumer => | ||
| val resp: TopicMetadataResponse = consumer.send(req) | ||
| resp.topicsMetadata.find(_.topic == topic).flatMap { t => | ||
|
||
| t.partitionsMetadata.find(_.partitionId == partition) | ||
| }.foreach { partitionMeta => | ||
| partitionMeta.leader.foreach { leader => | ||
| return Right((leader.host, leader.port)) | ||
| } | ||
| } catch { | ||
| case NonFatal(e) => | ||
| } finally { | ||
| if (consumer != null) consumer.close() | ||
| } | ||
| } | ||
| None | ||
| Left(errs) | ||
| } | ||
|
|
||
| def getLatestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Map[TopicAndPartition, Long] = | ||
| def getLatestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] = | ||
| getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) | ||
|
|
||
| def getEarliestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Map[TopicAndPartition, Long] = | ||
| def getEarliestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] = | ||
| getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) | ||
|
|
||
| def getLeaderOffsets(topicAndPartitions: Set[TopicAndPartition], before: Long): Map[TopicAndPartition, Long] = | ||
| getLeaderOffsets(topicAndPartitions, before, 1).map { kv => | ||
| // mapValues isnt serializable, see SI-7005 | ||
| kv._1 -> kv._2.head | ||
| def getLeaderOffsets(topicAndPartitions: Set[TopicAndPartition], before: Long): Either[Err, Map[TopicAndPartition, Long]] = | ||
| getLeaderOffsets(topicAndPartitions, before, 1).right.map { r => | ||
| r.map { kv => | ||
| // mapValues isnt serializable, see SI-7005 | ||
| kv._1 -> kv._2.head | ||
| } | ||
| } | ||
|
|
||
| def getLeaderOffsets(topicAndPartitions: Set[TopicAndPartition], before: Long, maxNumOffsets: Int): Map[TopicAndPartition, Seq[Long]] = { | ||
| def getLeaderOffsets( | ||
| topicAndPartitions: Set[TopicAndPartition], | ||
| before: Long, | ||
| maxNumOffsets: Int | ||
| ): Either[Err, Map[TopicAndPartition, Seq[Long]]] = { | ||
| var result = Map[TopicAndPartition, Seq[Long]]() | ||
| val req = OffsetRequest( | ||
| topicAndPartitions.map(tp => tp -> PartitionOffsetRequestInfo(before, 1)).toMap | ||
| ) | ||
| val errs = new Err | ||
| withBrokers(errs) { consumer => | ||
| val resp: OffsetResponse = consumer.getOffsetsBefore(req) | ||
| val respMap = resp.partitionErrorAndOffsets | ||
| val needed = topicAndPartitions.diff(result.keys.toSet) | ||
| needed.foreach { tp => | ||
| respMap.get(tp).foreach { errAndOffsets => | ||
| if (errAndOffsets.error == ErrorMapping.NoError) { | ||
| result += tp -> errAndOffsets.offsets | ||
| } else { | ||
| errs.append(ErrorMapping.exceptionFor(errAndOffsets.error)) | ||
| } | ||
| } | ||
| } | ||
| if (result.keys.size == topicAndPartitions.size) { | ||
| return Right(result) | ||
| } | ||
| } | ||
| val missing = topicAndPartitions.diff(result.keys.toSet) | ||
| errs.append(new Exception(s"Couldn't find offsets for ${missing}")) | ||
| Left(errs) | ||
| } | ||
|
|
||
| def getConsumerOffsets(groupId: String, topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] = { | ||
| var result = Map[TopicAndPartition, Long]() | ||
| val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq) | ||
| val errs = new Err | ||
| withBrokers(errs) { consumer => | ||
| val resp: OffsetFetchResponse = consumer.fetchOffsets(req) | ||
| val respMap = resp.requestInfo | ||
| val needed = topicAndPartitions.diff(result.keys.toSet) | ||
| needed.foreach { tp => | ||
| respMap.get(tp).foreach { offsetMeta => | ||
| if (offsetMeta.error == ErrorMapping.NoError) { | ||
| result += tp -> offsetMeta.offset | ||
| } else { | ||
| errs.append(ErrorMapping.exceptionFor(offsetMeta.error)) | ||
| } | ||
| } | ||
| } | ||
| if (result.keys.size == topicAndPartitions.size) { | ||
| return Right(result) | ||
|
||
| } | ||
| } | ||
| val missing = topicAndPartitions.diff(result.keys.toSet) | ||
| errs.append(new Exception(s"Couldn't find offsets for ${missing}")) | ||
| Left(errs) | ||
| } | ||
|
|
||
| def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long]): Unit = ??? | ||
|
|
||
| private def withBrokers(errs: Err)(fn: SimpleConsumer => Any): Unit = { | ||
| brokers.foreach { hp => | ||
| var consumer: SimpleConsumer = null | ||
| try { | ||
| consumer = connect(hp) | ||
| val resp: OffsetResponse = consumer.getOffsetsBefore(req) | ||
| val respParts = resp.partitionErrorAndOffsets | ||
| val needed = topicAndPartitions.diff(result.keys.toSet) | ||
| needed.foreach { tp => | ||
| respParts.get(tp).foreach { errAndOffsets => | ||
| if (errAndOffsets.error == ErrorMapping.NoError) { | ||
| result += tp -> errAndOffsets.offsets | ||
| } | ||
| } | ||
| } | ||
| if (result.keys.size == topicAndPartitions.size) { | ||
| return result | ||
| } | ||
| fn(consumer) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| errs.append(e) | ||
| } finally { | ||
| if (consumer != null) consumer.close() | ||
| } | ||
|
||
| } | ||
| val missing = topicAndPartitions.diff(result.keys.toSet) | ||
| throw new Exception(s"Couldn't find offsets for ${missing}") | ||
| } | ||
|
|
||
| def getConsumerOffsets(topicAndPartitions: Set[TopicAndPartition]): Map[TopicAndPartition, Long] = ??? | ||
|
|
||
| def setConsumerOffsets(offsets: Map[TopicAndPartition, Long]): Unit = ??? | ||
| } | ||
|
|
||
| object KafkaCluster { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would probably be good to make this
private[spark]and keep it as an internal utility.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rdd would be really unpleasant to actually use without the convenience methods exposed by KafkaCluster, especially if you're keeping your offsets in zookeeper and doing idempotent writes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see - sorry let me look more, I didn't realize this is necessary for users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for example
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala#L60
We also use it for doing things like e.g. starting a stream at the leader offsets before a given time