-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4964] [Streaming] Exactly-once semantics for Kafka #3798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
76913e2
1d70625
0b94b33
4dafd1b
ce91c59
7d050bc
783b477
29c6b43
3c2a96a
4b078bf
8d7de4a
979da25
38bb727
326ff3c
6bf14f2
bcca8a4
37d3053
cac63ee
e09045b
8bfd6c0
1d50749
adf99a6
356c7cc
e93eb72
e86317b
0458e4e
548d529
c1bd6d9
d4a7cf7
bb80bbe
2e67117
19406cc
99d2eba
80fd6ae
2b340d8
9a838c2
0090553
9adaa0a
4354bce
825110f
8991017
0df3ebe
8c31855
59e29f6
1dc2941
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…o inclusive start / exclusive end to match typical kafka consumer behavior
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,8 +20,8 @@ package org.apache.spark.rdd.kafka | |
| import scala.util.control.NonFatal | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import java.util.Properties | ||
| import kafka.api.{OffsetRequest, OffsetResponse, OffsetFetchRequest, OffsetFetchResponse, PartitionOffsetRequestInfo, TopicMetadataRequest, TopicMetadataResponse} | ||
| import kafka.common.{ErrorMapping, TopicAndPartition} | ||
| import kafka.api.{OffsetCommitRequest, OffsetRequest, OffsetFetchRequest, PartitionOffsetRequestInfo, TopicMetadata, TopicMetadataRequest, TopicMetadataResponse} | ||
| import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} | ||
| import kafka.consumer.{ConsumerConfig, SimpleConsumer} | ||
|
|
||
| /** | ||
|
|
@@ -69,6 +69,27 @@ class KafkaCluster(val kafkaParams: Map[String, String]) { | |
| Left(errs) | ||
| } | ||
|
|
||
| def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = | ||
| getPartitionMetadata(topics).right.map { r => | ||
| r.flatMap { tm: TopicMetadata => | ||
| tm.partitionsMetadata.map { pm => | ||
| TopicAndPartition(tm.topic, pm.partitionId) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = { | ||
| val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq) | ||
| val errs = new Err | ||
| withBrokers(errs) { consumer => | ||
| val resp: TopicMetadataResponse = consumer.send(req) | ||
| // error codes here indicate missing / just created topic, | ||
| // repeating on a different broker wont be useful | ||
| return Right(resp.topicsMetadata.toSet) | ||
| } | ||
| Left(errs) | ||
| } | ||
|
|
||
| def getLatestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] = | ||
| getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) | ||
|
|
||
|
|
@@ -94,7 +115,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) { | |
| ) | ||
| val errs = new Err | ||
| withBrokers(errs) { consumer => | ||
| val resp: OffsetResponse = consumer.getOffsetsBefore(req) | ||
| val resp = consumer.getOffsetsBefore(req) | ||
| val respMap = resp.partitionErrorAndOffsets | ||
| val needed = topicAndPartitions.diff(result.keys.toSet) | ||
| needed.foreach { tp => | ||
|
|
@@ -116,17 +137,28 @@ class KafkaCluster(val kafkaParams: Map[String, String]) { | |
| } | ||
|
|
||
| def getConsumerOffsets(groupId: String, topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] = { | ||
| var result = Map[TopicAndPartition, Long]() | ||
| getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r => | ||
| r.map { kv => | ||
| kv._1 -> kv._2.offset | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def getConsumerOffsetMetadata( | ||
| groupId: String, | ||
| topicAndPartitions: Set[TopicAndPartition] | ||
| ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = { | ||
| var result = Map[TopicAndPartition, OffsetMetadataAndError]() | ||
| val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq) | ||
| val errs = new Err | ||
| withBrokers(errs) { consumer => | ||
| val resp: OffsetFetchResponse = consumer.fetchOffsets(req) | ||
| val resp = consumer.fetchOffsets(req) | ||
| val respMap = resp.requestInfo | ||
| val needed = topicAndPartitions.diff(result.keys.toSet) | ||
| needed.foreach { tp => | ||
| respMap.get(tp).foreach { offsetMeta => | ||
| if (offsetMeta.error == ErrorMapping.NoError) { | ||
| result += tp -> offsetMeta.offset | ||
| result += tp -> offsetMeta | ||
| } else { | ||
| errs.append(ErrorMapping.exceptionFor(offsetMeta.error)) | ||
| } | ||
|
|
@@ -141,7 +173,41 @@ class KafkaCluster(val kafkaParams: Map[String, String]) { | |
| Left(errs) | ||
| } | ||
|
|
||
| def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long]): Unit = ??? | ||
| def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long]): Unit = { | ||
| setConsumerOffsetMetadata(groupId, offsets.map { kv => | ||
|
||
| kv._1 -> OffsetMetadataAndError(kv._2) | ||
| }) | ||
| } | ||
|
|
||
| def setConsumerOffsetMetadata( | ||
| groupId: String, | ||
| metadata: Map[TopicAndPartition, OffsetMetadataAndError] | ||
| ): Either[Err, Map[TopicAndPartition, Short]] = { | ||
| var result = Map[TopicAndPartition, Short]() | ||
| val req = OffsetCommitRequest(groupId, metadata) | ||
| val errs = new Err | ||
| val topicAndPartitions = metadata.keys.toSet | ||
| withBrokers(errs) { consumer => | ||
| val resp = consumer.commitOffsets(req) | ||
| val respMap = resp.requestInfo | ||
| val needed = topicAndPartitions.diff(result.keys.toSet) | ||
| needed.foreach { tp => | ||
| respMap.get(tp).foreach { err => | ||
| if (err == ErrorMapping.NoError) { | ||
| result += tp -> err | ||
| } else { | ||
| errs.append(ErrorMapping.exceptionFor(err)) | ||
| } | ||
| } | ||
| } | ||
| if (result.keys.size == topicAndPartitions.size) { | ||
| return Right(result) | ||
|
||
| } | ||
| } | ||
| val missing = topicAndPartitions.diff(result.keys.toSet) | ||
| errs.append(new Exception(s"Couldn't set offsets for ${missing}")) | ||
| Left(errs) | ||
| } | ||
|
|
||
| private def withBrokers(errs: Err)(fn: SimpleConsumer => Any): Unit = { | ||
| brokers.foreach { hp => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,8 +35,8 @@ private[spark] case class KafkaRDDPartition( | |
| override val index: Int, | ||
| topic: String, | ||
| partition: Int, | ||
| afterOffset: Long, | ||
| throughOffset: Long | ||
| fromOffset: Long, | ||
| untilOffset: Long | ||
| ) extends Partition | ||
|
|
||
| /** A batch-oriented interface for consuming from Kafka. | ||
|
|
@@ -46,8 +46,8 @@ private[spark] case class KafkaRDDPartition( | |
| * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">configuration parameters</a>. | ||
| * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), | ||
| * NOT zookeeper servers, specified in host1:port1,host2:port2 form. | ||
| * @param afterOffsets per-topic/partition Kafka offsets defining the (exclusive) starting point of the batch | ||
| * @param throughOffsets per-topic/partition Kafka offsets defining the (inclusive) ending point of the batch | ||
| * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) starting point of the batch | ||
| * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive) ending point of the batch | ||
| * @param messageHandler function for translating each message into the desired type | ||
| */ | ||
| class KafkaRDD[ | ||
|
|
@@ -58,71 +58,80 @@ class KafkaRDD[ | |
| R: ClassTag]( | ||
| sc: SparkContext, | ||
| kafkaParams: Map[String, String], | ||
| afterOffsets: Map[TopicAndPartition, Long], | ||
| throughOffsets: Map[TopicAndPartition, Long], | ||
| fromOffsets: Map[TopicAndPartition, Long], | ||
| untilOffsets: Map[TopicAndPartition, Long], | ||
| messageHandler: MessageAndMetadata[K, V] => R | ||
| ) extends RDD[R](sc, Nil) with Logging { | ||
|
|
||
| assert(afterOffsets.keys == throughOffsets.keys, | ||
| assert(fromOffsets.keys == untilOffsets.keys, | ||
| "Must provide both from and until offsets for each topic/partition") | ||
|
|
||
| override def getPartitions: Array[Partition] = afterOffsets.zipWithIndex.map { kvi => | ||
| override def getPartitions: Array[Partition] = fromOffsets.zipWithIndex.map { kvi => | ||
| val ((tp, from), index) = kvi | ||
| new KafkaRDDPartition(index, tp.topic, tp.partition, from, throughOffsets(tp)) | ||
| new KafkaRDDPartition(index, tp.topic, tp.partition, from, untilOffsets(tp)) | ||
| }.toArray | ||
|
|
||
| override def compute(thePart: Partition, context: TaskContext) = new NextIterator[R] { | ||
| context.addTaskCompletionListener{ context => closeIfNeeded() } | ||
|
|
||
| val kc = new KafkaCluster(kafkaParams) | ||
| override def compute(thePart: Partition, context: TaskContext) = { | ||
| val part = thePart.asInstanceOf[KafkaRDDPartition] | ||
| val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) | ||
| .newInstance(kc.config.props) | ||
| .asInstanceOf[Decoder[K]] | ||
| val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) | ||
| .newInstance(kc.config.props) | ||
| .asInstanceOf[Decoder[V]] | ||
| val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold( | ||
| errs => throw new Exception(s"""Couldn't connect to leader for topic ${part.topic} ${part.partition}: ${errs.mkString("\n")}"""), | ||
| consumer => consumer | ||
| ) | ||
| var requestOffset = part.afterOffset + 1 | ||
| var iter: Iterator[MessageAndOffset] = null | ||
| if (part.fromOffset >= part.untilOffset) { | ||
| log.warn(s"Beginning offset is same or after ending offset, skipping ${part.topic} ${part.partition}") | ||
| Iterator.empty | ||
| } else { | ||
| new NextIterator[R] { | ||
| context.addTaskCompletionListener{ context => closeIfNeeded() } | ||
|
|
||
| val kc = new KafkaCluster(kafkaParams) | ||
| log.info(s"Computing partition ${part.topic} ${part.partition} ${part.fromOffset} -> ${part.untilOffset}") | ||
| val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) | ||
| .newInstance(kc.config.props) | ||
| .asInstanceOf[Decoder[K]] | ||
| val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) | ||
| .newInstance(kc.config.props) | ||
| .asInstanceOf[Decoder[V]] | ||
| val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold( | ||
| errs => throw new Exception(s"""Couldn't connect to leader for topic ${part.topic} ${part.partition}: ${errs.mkString("\n")}"""), | ||
| consumer => consumer | ||
| ) | ||
| var requestOffset = part.fromOffset | ||
| var iter: Iterator[MessageAndOffset] = null | ||
|
|
||
| override def getNext: R = { | ||
| if (iter == null || !iter.hasNext) { | ||
| val req = new FetchRequestBuilder(). | ||
| addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes). | ||
| build() | ||
| val resp = consumer.fetch(req) | ||
| if (resp.hasError) { | ||
| val err = resp.errorCode(part.topic, part.partition) | ||
| if (err == ErrorMapping.LeaderNotAvailableCode || | ||
| err == ErrorMapping.NotLeaderForPartitionCode) { | ||
| log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, sleeping for ${kc.config.refreshLeaderBackoffMs}ms") | ||
| Thread.sleep(kc.config.refreshLeaderBackoffMs) | ||
| override def close() = consumer.close() | ||
|
|
||
| override def getNext: R = { | ||
|
||
| if (iter == null || !iter.hasNext) { | ||
| log.info(s"Fetching ${part.topic}, ${part.partition}, ${requestOffset}") | ||
| val req = new FetchRequestBuilder(). | ||
| addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes). | ||
|
||
| build() | ||
| val resp = consumer.fetch(req) | ||
| if (resp.hasError) { | ||
| val err = resp.errorCode(part.topic, part.partition) | ||
| if (err == ErrorMapping.LeaderNotAvailableCode || | ||
|
||
| err == ErrorMapping.NotLeaderForPartitionCode) { | ||
| log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, sleeping for ${kc.config.refreshLeaderBackoffMs}ms") | ||
| Thread.sleep(kc.config.refreshLeaderBackoffMs) | ||
|
||
| } | ||
| // Let normal rdd retry sort out reconnect attempts | ||
| throw ErrorMapping.exceptionFor(err) | ||
| } | ||
| iter = resp.messageSet(part.topic, part.partition) | ||
| .iterator | ||
| .dropWhile(_.offset < requestOffset) | ||
|
||
| } | ||
| if (!iter.hasNext) { | ||
| finished = true | ||
| null.asInstanceOf[R] | ||
| } else { | ||
| val item = iter.next | ||
|
||
| if (item.offset > part.untilOffset) { | ||
| finished = true | ||
| } | ||
|
||
| requestOffset = item.nextOffset | ||
| messageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) | ||
| } | ||
| // Let normal rdd retry sort out reconnect attempts | ||
| throw ErrorMapping.exceptionFor(err) | ||
| } | ||
| iter = resp.messageSet(part.topic, part.partition) | ||
| .iterator | ||
| .dropWhile(_.offset < requestOffset) | ||
| } | ||
| if (!iter.hasNext) { | ||
| finished = true | ||
| null.asInstanceOf[R] | ||
| } else { | ||
| val item = iter.next | ||
| if (item.offset > part.throughOffset) { | ||
| finished = true | ||
| } | ||
| requestOffset = item.nextOffset | ||
| messageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) | ||
| } | ||
| } | ||
|
|
||
| override def close() = consumer.close() | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you turn all scaladoc style into javadoc for this pr?