-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25005][SS]Support non-consecutive offsets for Kafka #22042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray | ||
| } | ||
|
|
||
| override def count(): Long = offsetRanges.map(_.size).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assumption in these methods is no longer right, so remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Goooood catch. That would have never occurred to me!
|
cc @tdas |
|
Test build #94441 has finished for PR 22042 at commit
|
|
Test build #94446 has finished for PR 22042 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Round 1: I reviewed the non-test code, and I think it deserves a bit of refactoring. My most important point is that the why create a new exception type when you can handle that condition in the fetchData method itself.
| ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] | ||
| @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET | ||
|
|
||
| @volatile private var offsetBeforePoll: Long = UNKNOWN_OFFSET |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some docs to explain what these 2 vars siginify and why these vars are needed?
| * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are | ||
| * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. | ||
| */ | ||
| private[kafka010] class MissingOffsetException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is this meant to be used outside this KafkaDataConsumer class? If not, then maybe make it an inner class to KafkaDataConsumer.
| * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. | ||
| */ | ||
| private[kafka010] class MissingOffsetException( | ||
| val offset: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename offset to something like missingOffset. Its weird to have a generic named field "offset" next to a specifically named field "nextOffsetToFetch".
| */ | ||
| private def fetchData( | ||
| offset: Long, | ||
| untilOffset: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update docs of this method saying that it can throw MissingOffsetException and what it means?
| poll(pollTimeoutMs) | ||
| } else if (!fetchedData.hasNext) { | ||
| // The last pre-fetched data has been drained. | ||
| if (offset < offsetAfterPoll) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its hard to understand this condition because it hard to understand what offsetAfterPoll means? Does it refer to the offset that will be fetched next by the KafkaConsumer?
| } | ||
|
|
||
| private def poll(pollTimeoutMs: Long): Unit = { | ||
| offsetBeforePoll = consumer.position(topicPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable offsetBeforePoll seems to be only used to identify whether data was actually fetched in a poll and nothing else. Rather than define another var (there are already many that already confusing), why not just return a boolean from poll which is true or false depending on whether poll moved anything.
| throw new OffsetOutOfRangeException( | ||
| Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) | ||
| } else { | ||
| } else if (offsetBeforePoll == offsetAfterPoll) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be clear, can this happen only if there is a timeout?? And if so then why push this condition and exception into the poll() method thus simplifying this method?
| s"seek to $offset and poll but the offset was reset to $offsetAfterPoll") | ||
| throw new MissingOffsetException(offset, offsetAfterPoll) | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this else and reduce the condition nesting. The previous if statement always ends in an exception, so we can remove this else.
| if (offset < offsetAfterPoll) { | ||
| // Offsets in [offset, offsetAfterPoll) are missing. We should skip them. | ||
| resetFetchedData() | ||
| throw new MissingOffsetException(offset, offsetAfterPoll) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So MissingOffsetRange is only used to signal that some offset may be missing due to control messages and nothing else. And the higher function (i.e. get) just handles it by resetting the fetched offsets. Why not let this fetchData method handle the situation instead of creating a new exception just for this?
| offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray | ||
| } | ||
|
|
||
| override def count(): Long = offsetRanges.map(_.size).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Goooood catch. That would have never occurred to me!
|
Test build #94808 has finished for PR 22042 at commit
|
|
Test build #94809 has finished for PR 22042 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This first level of refactoring looks much better. But I think we can do more.
| * `read_committed`), it will be skipped and this method will try to fetch next available record | ||
| * within [offset, untilOffset). | ||
| * | ||
| * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if failOnDataLoss is true then it should throw exception... isnt it?
nit: try its best
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
| * within [offset, untilOffset). | ||
| * | ||
| * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will | ||
| * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we throw an exception even when its a control message and there is no real data loss?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we throw an exception even when its a control message and there is no real data loss?
No. It will be skipped and this method will try to fetch next available record within [offset, untilOffset).
| * instead. | ||
| */ | ||
| private case class FetchedRecord( | ||
| record: Option[ConsumerRecord[Array[Byte], Array[Byte]]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can;t we reuse the objects here. And do we need to have an Option, thus creating a lot of Option objects all the time?
| val r = p.records(topicPartition) | ||
| logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") | ||
| fetchedData = r.iterator | ||
| offsetAfterPoll = consumer.position(topicPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly think that this should not be a var, rather a clear return value. we have been burnt by too many mutable vars/defs (see all the flakiness caused by the structured ProgressReporter) and we should consciously try to improve this everywhere by not having vars all over the place.
| poll(offset, pollTimeoutMs) | ||
| } else if (!fetchedData.hasNext) { | ||
| // The last pre-fetched data has been drained. | ||
| if (offset < offsetAfterPoll) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the place preventing me from making offsetAfterPoll be a local var.
|
Test build #95056 has finished for PR 22042 at commit
|
|
Test build #95063 has finished for PR 22042 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactor with FetchedData looks cleaner. But this needs a bit more work, especially on the test side. Left a whole bunch questions and comments.
| */ | ||
| private case class FetchedData( | ||
| private var records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], | ||
| var nextOffsetInFetchedData: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this public getter, private setter.
| } | ||
|
|
||
| if (!fetchedData.hasNext) { | ||
| assert(offset <= fetchedData.offsetAfterPoll, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comments here on what this case means.
| * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. | ||
| */ | ||
| private def fetchData( | ||
| offset: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename this method to fetchRecord, to make it consistent with return type.
| * @throws TimeoutException if the consumer position is not changed after polling. It means the | ||
| * consumer polls nothing before timeout. | ||
| */ | ||
| private def poll(offset: Long, pollTimeoutMs: Long): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename this method to be consistent with that it does .... fetch data.
| if (offset < range.earliest || offset >= range.latest) { | ||
| throw new OffsetOutOfRangeException( | ||
| Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) | ||
| poll(offset, pollTimeoutMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment that this method updates fetchedData
| CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14 | ||
| AdvanceManualClock(100), | ||
| waitUntilBatchProcessed, | ||
| CheckAnswer((1 to 5) ++ (11 to 15): _*), // offset: 15, 16, 17* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use CheckNewAnswer instead cumulative CheckAnswer.
| true | ||
| } | ||
|
|
||
| val producer = testUtils.createProducer(usingTrascation = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could define a testWithProducer method and wrap the finally in it.
| // 1 from smallest, 1 from middle, 8 from biggest | ||
| CheckAnswer(), | ||
| WithKafkaProducer(topic, producer) { producer => | ||
| // Send 5 messages. They should be visible only after being committed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why so? This read_uncommitted, right?
| }, | ||
| AdvanceManualClock(100), | ||
| waitUntilBatchProcessed, | ||
| CheckAnswer(1 to 3: _*), // offset 0, 1, 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only 3 records when 1 to 5 has been sent already and we are reading uncommitted data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only 3 records when 1 to 5 has been sent already and we are reading uncommitted data?
I'm using maxOffsetsPerTrigger = 3 to cut the batches on purpose. Otherwise, it's really hard to cover all of cases.
| props | ||
| } | ||
|
|
||
| def createProducer(usingTrascation: Boolean): KafkaProducer[String, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: usingTrascation -> usingTranscation
|
Test build #95115 has finished for PR 22042 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few nits here there. This looks pretty good. Thank you for doing this. It was tricky to deal with Kafka transactions.
| offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer) | ||
|
|
||
| private val rangeToRead = resolveRange(offsetRange) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary
| object WithKafkaProducer { | ||
| def apply( | ||
| topic: String, | ||
| producer: KafkaProducer[String, String])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping on this comment. Maybe you missed this?
| reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") | ||
| record | ||
| fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This can be unnested.
if ... else { if ... else ... } -> if ... else if .. else
| def apply( | ||
| topic: String, | ||
| producer: KafkaProducer[String, String])( | ||
| func: KafkaProducer[String, String] => Unit): AssertOnQuery = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: AssertOnQuery -> StreamAction
| s"AddKafkaData(topics = $topics, data = $data, message = $message)" | ||
| } | ||
|
|
||
| object WithKafkaProducer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is not creating a KafkaProducer .. as most With*** methods. The point of this is to force synchronization of the consumer. So maybe rename it to WithOffsetSync { ... }?
|
Test build #95236 has finished for PR 22042 at commit
|
This is the flaky test I fixed in #22230 retest this please |
|
retest this please |
|
Test build #95297 has finished for PR 22042 at commit
|
|
Test build #95319 has finished for PR 22042 at commit
|
|
LGTM. |
|
Thanks! Merging to master. |
## What changes were proposed in this pull request? As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support: - The whole batch contains no data messages - The first offset in a batch is not a committed data message - The last offset in a batch is not a committed data message - There is a gap in the middle of a batch They are all covered by the new unit tests. ## How was this patch tested? The new unit tests. Closes apache#22042 from zsxwing/kafka-transaction-read. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
What changes were proposed in this pull request?
As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's
isolation.levelisread_committed,pollwill not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned bypoll. However, asseekToEndmay move the offset point to these missing offsets, there are 4 possible corner cases we need to support:They are all covered by the new unit tests.
How was this patch tested?
The new unit tests.