Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address
  • Loading branch information
zsxwing committed Dec 17, 2018
commit e686f10d21294848001b8b4cee02bfb22f09a5a4
Original file line number Diff line number Diff line change
Expand Up @@ -138,29 +138,29 @@ private[kafka010] class KafkaOffsetReader(
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()

// Call `position` to wait until the potential offset request triggered by `poll(0)` is
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
// `poll(0)` may reset offsets that should have been set by another request.
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})

consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")

def _fetchOffsets(): PartitionOffsetMap = {
partitionOffsets.foreach {
case (tp, KafkaOffsetRangeLimit.LATEST) =>
consumer.seekToEnd(ju.Arrays.asList(tp))
case (tp, KafkaOffsetRangeLimit.EARLIEST) =>
consumer.seekToBeginning(ju.Arrays.asList(tp))
case (tp, off) => consumer.seek(tp, off)
}
partitionOffsets.map {
case (tp, _) => tp -> consumer.position(tp)
}
partitionOffsets.foreach {
case (tp, KafkaOffsetRangeLimit.LATEST) =>
consumer.seekToEnd(ju.Arrays.asList(tp))
case (tp, KafkaOffsetRangeLimit.EARLIEST) =>
consumer.seekToBeginning(ju.Arrays.asList(tp))
case (tp, off) => consumer.seek(tp, off)
}
partitionOffsets.map {
case (tp, _) => tp -> consumer.position(tp)
}

// Fetch the offsets twice to reduce the chance to hit KAFKA-7703.
_fetchOffsets()
_fetchOffsets()
}
}

Expand Down Expand Up @@ -200,30 +200,32 @@ private[kafka010] class KafkaOffsetReader(
* Fetch the latest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
*
* Kafka may return earliest offsets when we are requesting latest offsets (KAFKA-7703). To avoid
* hitting this issue, we will use the given `knownOffsets` to audit the latest offsets returned
* by Kafka, if we find some incorrect offsets (a latest offset is less than an offset in
* `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. If `knownOffsets` is not
* provided, we simply fetch the latest offsets twice and use the second result which is more
* likely correct.
* Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
* right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
* `poll` to wait until the potential offset request triggered by `poll(0)` is done.
*
* When a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We
* cannot distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after
* retrying.
* In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
* latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
* than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
* a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
* distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
*/
def fetchLatestOffsets(
knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()

// Call `position` to wait until the potential offset request triggered by `poll(0)` is
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
// `poll(0)` may reset offsets that should have been set by another request.
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})

consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")

if (knownOffsets.isEmpty) {
// Fetch the latest offsets twice and use the second result which is more likely correct.
consumer.seekToEnd(partitions)
partitions.asScala.map(p => p -> consumer.position(p)).toMap
consumer.seekToEnd(partitions)
partitions.asScala.map(p => p -> consumer.position(p)).toMap
} else {
Expand All @@ -233,7 +235,7 @@ private[kafka010] class KafkaOffsetReader(
* Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
* latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
*/
def findIncorrectOffsets: Seq[(TopicPartition, Long, Long)] = {
def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
partitionOffsets.foreach { case (tp, offset) =>
knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
Expand All @@ -259,10 +261,14 @@ private[kafka010] class KafkaOffsetReader(
partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
attempt += 1

incorrectOffsets = findIncorrectOffsets
if (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts) {
logWarning("Retrying to fetch latest offsets because of incorrect offsets " +
"(partition, previous offset, fetched offset): " + incorrectOffsets)
incorrectOffsets = findIncorrectOffsets()
if (incorrectOffsets.nonEmpty) {
logWarning("Found incorrect offsets in some partitions " +
s"(partition, previous offset, fetched offset): $incorrectOffsets")
}
if (attempt < maxOffsetFetchAttempts) {
logWarning("Retrying to fetch latest offsets because of incorrect offsets")
Thread.sleep(offsetFetchAttemptIntervalMs)
}
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)

Expand Down