Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class KafkaContinuousPartitionReader(

// This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
// or if it's the endpoint of the data range (i.e. the "true" next offset).
case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
val range = consumer.getAvailableOffsetRange()
if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
// retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,19 @@ import org.apache.spark.util.UninterruptibleThread

private[kafka010] sealed trait KafkaDataConsumer {
/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
* Get the record for the given offset if available.
*
* If the record is invisible (either a
* transaction message, or an aborted message when the consumer's `isolation.level` is
* `read_committed`), it will be skipped and this method will try to fetch next available record
* within [offset, untilOffset).
*
* This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will
* throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
* method will try to fetch next available record within [offset, untilOffset).
*
* When this method tries to skip offsets due to either invisible messages or data loss and
* reaches `untilOffset`, it will return `null`.
*
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
Expand Down Expand Up @@ -80,6 +90,83 @@ private[kafka010] case class InternalKafkaConsumer(
kafkaParams: ju.Map[String, Object]) extends Logging {
import InternalKafkaConsumer._

/**
* The internal object to store the fetched data from Kafka consumer and the next offset to poll.
*
* @param _records the pre-fetched Kafka records.
* @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we
* should check if the pre-fetched data is still valid.
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
* poll when `records` is drained.
*/
private case class FetchedData(
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
private var _nextOffsetInFetchedData: Long,
private var _offsetAfterPoll: Long) {

def withNewPoll(
records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
offsetAfterPoll: Long): FetchedData = {
this._records = records
this._nextOffsetInFetchedData = UNKNOWN_OFFSET
this._offsetAfterPoll = offsetAfterPoll
this
}

/** Whether there are more elements */
def hasNext: Boolean = _records.hasNext

/** Move `records` forward and return the next record. */
def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
val record = _records.next()
_nextOffsetInFetchedData = record.offset + 1
record
}

/** Move `records` backward and return the previous record. */
def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
assert(_records.hasPrevious, "fetchedData cannot move back")
val record = _records.previous()
_nextOffsetInFetchedData = record.offset
record
}

/** Reset the internal pre-fetched data. */
def reset(): Unit = {
_records = ju.Collections.emptyListIterator()
}

/**
* Returns the next offset in `records`. We use this to verify if we should check if the
* pre-fetched data is still valid.
*/
def nextOffsetInFetchedData: Long = _nextOffsetInFetchedData

/**
* Returns the next offset to poll after draining the pre-fetched records.
*/
def offsetAfterPoll: Long = _offsetAfterPoll
}

/**
* The internal object returned by the `fetchRecord` method. If `record` is empty, it means it is
* invisible (either a transaction message, or an aborted message when the consumer's
* `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
* instead.
*/
private case class FetchedRecord(
var record: ConsumerRecord[Array[Byte], Array[Byte]],
var nextOffsetToFetch: Long) {

def withRecord(
record: ConsumerRecord[Array[Byte], Array[Byte]],
nextOffsetToFetch: Long): FetchedRecord = {
this.record = record
this.nextOffsetToFetch = nextOffsetToFetch
this
}
}

private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

@volatile private var consumer = createConsumer
Expand All @@ -90,10 +177,21 @@ private[kafka010] case class InternalKafkaConsumer(
/** indicate whether this consumer is going to be stopped in the next release */
@volatile var markedForClose = false

/** Iterator to the already fetch data */
@volatile private var fetchedData =
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
@volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET
/**
* The fetched data returned from Kafka consumer. This is a reusable private object to avoid
* memory allocation.
*/
private val fetchedData = FetchedData(
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
UNKNOWN_OFFSET,
UNKNOWN_OFFSET)

/**
* The fetched record returned from the `fetchRecord` method. This is a reusable private object to
* avoid memory allocation.
*/
private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET)


/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
Expand Down Expand Up @@ -125,20 +223,7 @@ private[kafka010] case class InternalKafkaConsumer(
AvailableOffsetRange(earliestOffset, latestOffset)
}

/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
*
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
* offset if available, or throw exception.when `failOnDataLoss` is `false`,
* this method will either return record at offset if available, or return
* the next earliest available record less than untilOffset, or null. It
* will not throw any exception.
*/
/** @see [[KafkaDataConsumer.get]] */
def get(
offset: Long,
untilOffset: Long,
Expand All @@ -147,21 +232,32 @@ private[kafka010] case class InternalKafkaConsumer(
ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
require(offset < untilOffset,
s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset")
logDebug(s"Get $groupId $topicPartition nextOffset ${fetchedData.nextOffsetInFetchedData} " +
s"requested $offset")
// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
// `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
// we will move to the next available offset within `[offset, untilOffset)` and retry.
// If `failOnDataLoss` is `true`, the loop body will be executed only once.
var toFetchOffset = offset
var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
var fetchedRecord: FetchedRecord = null
// We want to break out of the while loop on a successful fetch to avoid using "return"
// which may cause a NonLocalReturnControl exception when this method is used as a function.
var isFetchComplete = false

while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
try {
consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
isFetchComplete = true
fetchedRecord = fetchRecord(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
if (fetchedRecord.record != null) {
isFetchComplete = true
} else {
toFetchOffset = fetchedRecord.nextOffsetToFetch
if (toFetchOffset >= untilOffset) {
fetchedData.reset()
toFetchOffset = UNKNOWN_OFFSET
} else {
logDebug(s"Skipped offsets [$offset, $toFetchOffset]")
}
}
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached
Expand All @@ -174,9 +270,9 @@ private[kafka010] case class InternalKafkaConsumer(
}

if (isFetchComplete) {
consumerRecord
fetchedRecord.record
} else {
resetFetchedData()
fetchedData.reset()
null
}
}
Expand Down Expand Up @@ -239,65 +335,81 @@ private[kafka010] case class InternalKafkaConsumer(
}

/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
* Get the fetched record for the given offset if available.
*
* If the record is invisible (either a transaction message, or an aborted message when the
* consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the
* next offset to fetch.
*
* This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will
* throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
* method will return `null` if the next available record is within [offset, untilOffset).
*
* @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
*/
private def fetchData(
private def fetchRecord(
offset: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this method to fetchRecord, to make it consistent with return type.

untilOffset: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update docs of this method saying that it can throw MissingOffsetException and what it means?

pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
// This is the first fetch, or the last pre-fetched data has been drained.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
poll(pollTimeoutMs)
}

if (!fetchedData.hasNext()) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. Just throw
// `OffsetOutOfRangeException` to let the caller handle it.
// - Cannot fetch any data before timeout. TimeoutException will be thrown.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
failOnDataLoss: Boolean): FetchedRecord = {
if (offset != fetchedData.nextOffsetInFetchedData) {
// This is the first fetch, or the fetched data has been reset.
// Fetch records from Kafka and update `fetchedData`.
fetchData(offset, pollTimeoutMs)
} else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained.
if (offset < fetchedData.offsetAfterPoll) {
// Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask
// the next call to start from `fetchedData.offsetAfterPoll`.
fetchedData.reset()
return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
// Fetch records from Kafka and update `fetchedData`.
fetchData(offset, pollTimeoutMs)
}
}

if (!fetchedData.hasNext) {
// When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still
// empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a
// record to ask the next call to start from `fetchedData.offsetAfterPoll`.
assert(offset <= fetchedData.offsetAfterPoll,
s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}")
fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
val record = fetchedData.next()
nextOffsetInFetchedData = record.offset + 1
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next
// available offset. Hence we need to handle offset mismatch.
if (record.offset > offset) {
val range = getAvailableOffsetRange()
if (range.earliest <= offset) {
// `offset` is still valid but the corresponding message is invisible. We should skip it
// and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
// `fetchRecord` can just return `record` directly.
fetchedData.previous()
return fetchedRecord.withRecord(null, record.offset)
}
// This may happen when some records aged out but their offsets already got verified
if (failOnDataLoss) {
reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
// Never happen as "reportDataLoss" will throw an exception
null
throw new IllegalStateException(
"reportDataLoss didn't throw an exception when 'failOnDataLoss' is true")
} else if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
// Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
fetchedRecord.withRecord(null, untilOffset)
} else {
if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
null
} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
record
}
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
} else if (record.offset < offset) {
// This should not happen. If it does happen, then we probably misunderstand Kafka internal
// mechanism.
throw new IllegalStateException(
s"Tried to fetch $offset but the returned record offset was ${record.offset}")
} else {
record
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
}
}
Expand All @@ -306,13 +418,7 @@ private[kafka010] case class InternalKafkaConsumer(
private def resetConsumer(): Unit = {
consumer.close()
consumer = createConsumer
resetFetchedData()
}

/** Reset the internal pre-fetched data. */
private def resetFetchedData(): Unit = {
nextOffsetInFetchedData = UNKNOWN_OFFSET
fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
fetchedData.reset()
}

/**
Expand Down Expand Up @@ -346,11 +452,40 @@ private[kafka010] case class InternalKafkaConsumer(
consumer.seek(topicPartition, offset)
}

private def poll(pollTimeoutMs: Long): Unit = {
/**
* Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be
* empty if the Kafka consumer fetches some messages but all of them are not visible messages
* (either transaction messages, or aborted messages when `isolation.level` is `read_committed`).
*
* @throws OffsetOutOfRangeException if `offset` is out of range.
* @throws TimeoutException if the consumer position is not changed after polling. It means the
* consumer polls nothing before timeout.
*/
private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
val p = consumer.poll(pollTimeoutMs)
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
fetchedData = r.iterator
val offsetAfterPoll = consumer.position(topicPartition)
logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
fetchedData.withNewPoll(r.listIterator, offsetAfterPoll)
if (!fetchedData.hasNext) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
// be thrown.
// - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
// - Fetched something but all of them are not invisible. This is a valid case and let the
// caller handles this.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else if (offset == offsetAfterPoll) {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
}
}
}
}

Expand Down
Loading