Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class KafkaContinuousReadSupport(
override def initialOffset(): Offset = {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
logInfo(s"Initial offsets: $offsets")
Expand Down Expand Up @@ -107,7 +107,7 @@ class KafkaContinuousReadSupport(

override def needsReconfiguration(config: ScanConfig): Boolean = {
val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
offsetReader.fetchLatestOffsets().keySet != knownPartitions
offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
}

override def toString(): String = s"KafkaSource[$offsetReader]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(

override def latestOffset(start: Offset): Offset = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
}.getOrElse {
Expand Down Expand Up @@ -138,6 +138,15 @@ private[kafka010] class KafkaMicroBatchReadSupport(
fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
untilOffsets = endPartitionOffsets,
executorLocations = getSortedExecutorList())
offsetRanges.filter { range =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was missing when we wrote Kafka source v2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's my bad. But no tests caught it :(
Is it possible to write a test for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas Very good call. I found the fix here was not correct when writing a unit test.

if (range.untilOffset < range.fromOffset) {
reportDataLoss(s"Partition ${range.topicPartition}'s offset was changed from " +
s"${range.fromOffset} to ${range.untilOffset}, some data may have been missed")
false
} else {
true
}
}

// Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
// that is, concurrent tasks will not read the same TopicPartitions.
Expand Down Expand Up @@ -186,7 +195,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
case EarliestOffsetRangeLimit =>
KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit =>
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.{util => ju}
import java.util.concurrent.{Executors, ThreadFactory}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
Expand Down Expand Up @@ -144,16 +145,22 @@ private[kafka010] class KafkaOffsetReader(
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")

partitionOffsets.foreach {
case (tp, KafkaOffsetRangeLimit.LATEST) =>
consumer.seekToEnd(ju.Arrays.asList(tp))
case (tp, KafkaOffsetRangeLimit.EARLIEST) =>
consumer.seekToBeginning(ju.Arrays.asList(tp))
case (tp, off) => consumer.seek(tp, off)
}
partitionOffsets.map {
case (tp, _) => tp -> consumer.position(tp)
def _fetchOffsets(): PartitionOffsetMap = {
partitionOffsets.foreach {
case (tp, KafkaOffsetRangeLimit.LATEST) =>
consumer.seekToEnd(ju.Arrays.asList(tp))
case (tp, KafkaOffsetRangeLimit.EARLIEST) =>
consumer.seekToBeginning(ju.Arrays.asList(tp))
case (tp, off) => consumer.seek(tp, off)
}
partitionOffsets.map {
case (tp, _) => tp -> consumer.position(tp)
}
}

// Fetch the offsets twice to reduce the chance to hit KAFKA-7703.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more info why calling this twice works.

_fetchOffsets()
_fetchOffsets()
}
}

Expand Down Expand Up @@ -192,19 +199,76 @@ private[kafka010] class KafkaOffsetReader(
/**
* Fetch the latest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
*
* Kafka may return earliest offsets when we are requesting latest offsets (KAFKA-7703). To avoid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, Kafka returns earliest offset only if there is another call (poll I guess) right before the seekToEnd.

* hitting this issue, we will use the given `knownOffsets` to audit the latest offsets returned
* by Kafka, if we find some incorrect offsets (a latest offset is less than an offset in
* `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. If `knownOffsets` is not
* provided, we simply fetch the latest offsets twice and use the second result which is more
* likely correct.
*
* When a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We
* cannot distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after
* retrying.
*/
def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
def fetchLatestOffsets(
knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")

consumer.seekToEnd(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
if (knownOffsets.isEmpty) {
// Fetch the latest offsets twice and use the second result which is more likely correct.
consumer.seekToEnd(partitions)
partitions.asScala.map(p => p -> consumer.position(p)).toMap
consumer.seekToEnd(partitions)
partitions.asScala.map(p => p -> consumer.position(p)).toMap
} else {
var partitionOffsets: PartitionOffsetMap = Map.empty

/**
* Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
* latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
*/
def findIncorrectOffsets: Seq[(TopicPartition, Long, Long)] = {
var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
partitionOffsets.foreach { case (tp, offset) =>
knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
if (knownOffset > offset) {
val incorrectOffset = (tp, knownOffset, offset)
incorrectOffsets += incorrectOffset
}
})
}
incorrectOffsets
}

// Retry to fetch latest offsets when detecting incorrect offsets. We don't use
// `withRetriesWithoutInterrupt` to retry because:
//
// - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh
// consumer has a much bigger chance to hit KAFKA-7703.
// - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
var attempt = 0
do {
consumer.seekToEnd(partitions)
partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
attempt += 1

incorrectOffsets = findIncorrectOffsets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: findIncorrectOffsets doesnt look like a property so better to call this with ()

if (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts) {
logWarning("Retrying to fetch latest offsets because of incorrect offsets " +
"(partition, previous offset, fetched offset): " + incorrectOffsets)
}
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)

logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
}
metadataLog.add(0, offsets)
Expand All @@ -148,7 +148,8 @@ private[kafka010] class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets

val latest = kafkaReader.fetchLatestOffsets()
val latest = kafkaReader.fetchLatestOffsets(
currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
val offsets = maxOffsetsPerTrigger match {
case None =>
latest
Expand Down