Skip to content
Closed
Prev Previous commit
Next Next commit
Add FetchedData
  • Loading branch information
zsxwing committed Aug 21, 2018
commit a06742fd3d19c3ee6d9c957b446bc5017be009bc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,53 @@ private[kafka010] case class InternalKafkaConsumer(
kafkaParams: ju.Map[String, Object]) extends Logging {
import InternalKafkaConsumer._

/**
* The internal object to store the fetched data from Kafka consumer and the next offset to poll.
*
* @param records the pre-fetched Kafka records.
* @param nextOffsetInFetchedData the next offset in `records`. We use this to verify if we should
* check if the pre-fetched data is still valid.
* @param offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to poll
* when `records` is drained.
*/
private case class FetchedData(
private var records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
var nextOffsetInFetchedData: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this public getter, private setter.

var offsetAfterPoll: Long) {

def withNewPoll(
records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
offsetAfterPoll: Long): FetchedData = {
this.records = records
this.nextOffsetInFetchedData = UNKNOWN_OFFSET
this.offsetAfterPoll = offsetAfterPoll
this
}

/** Whether there are more elements */
def hasNext: Boolean = records.hasNext

/** Move `records` forward and return the next record. */
def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
val record = records.next()
nextOffsetInFetchedData = record.offset + 1
record
}

/** Move `records` backward and return the previous record. */
def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
assert(records.hasPrevious, "fetchedData cannot move back")
val record = records.previous()
nextOffsetInFetchedData = record.offset
record
}

/** Reset the internal pre-fetched data. */
def reset(): Unit = {
records = ju.Collections.emptyListIterator()
}
}

/**
* The internal object returned by the `fetchData` method. If `record` is empty, it means it is
* invisible (either a transaction message, or an aborted message when the consumer's
Expand Down Expand Up @@ -119,22 +166,21 @@ private[kafka010] case class InternalKafkaConsumer(
/** indicate whether this consumer is going to be stopped in the next release */
@volatile var markedForClose = false

/** Iterator to the already fetch data */
@volatile private var fetchedData =
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
@volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET
/**
* The fetched data returned from Kafka consumer. This is a reusable private object to avoid
* memory allocation.
*/
private val fetchedData = FetchedData(
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
UNKNOWN_OFFSET,
UNKNOWN_OFFSET)

/**
* The fetched record returned from the `fetchData` method. This is a reusable private object to
* avoid memory allocation.
*/
private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET)

/**
* The next available offset returned by Kafka after polling. This is the next offset after
* draining `fetchedData`.
*/
@volatile private var offsetAfterPoll: Long = UNKNOWN_OFFSET

/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
Expand Down Expand Up @@ -175,7 +221,8 @@ private[kafka010] case class InternalKafkaConsumer(
ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
require(offset < untilOffset,
s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset")
logDebug(s"Get $groupId $topicPartition nextOffset ${fetchedData.nextOffsetInFetchedData} " +
s"requested $offset")
// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
// `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
// we will move to the next available offset within `[offset, untilOffset)` and retry.
Expand All @@ -194,7 +241,7 @@ private[kafka010] case class InternalKafkaConsumer(
} else {
toFetchOffset = fetchedRecord.nextOffsetToFetch
if (toFetchOffset >= untilOffset) {
resetFetchedData()
fetchedData.reset()
toFetchOffset = UNKNOWN_OFFSET
} else {
logDebug(s"Skipped offsets [$offset, $toFetchOffset]")
Expand All @@ -214,7 +261,7 @@ private[kafka010] case class InternalKafkaConsumer(
if (isFetchComplete) {
fetchedRecord.record
} else {
resetFetchedData()
fetchedData.reset()
null
}
}
Expand Down Expand Up @@ -295,28 +342,27 @@ private[kafka010] case class InternalKafkaConsumer(
untilOffset: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update docs of this method saying that it can throw MissingOffsetException and what it means?

pollTimeoutMs: Long,
failOnDataLoss: Boolean): FetchedRecord = {
if (offset != nextOffsetInFetchedData) {
if (offset != fetchedData.nextOffsetInFetchedData) {
// This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
poll(offset, pollTimeoutMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment that this method updates fetchedData

} else if (!fetchedData.hasNext) {
// The last pre-fetched data has been drained.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I was confused with whether the above comment was for the else if above it or for the if below it. Maybe inline it with the else if. Or leave a line after it, before the if below.

if (offset < offsetAfterPoll) {
if (offset < fetchedData.offsetAfterPoll) {
// Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"skip them" is confusing. What does it mean to skip? Why are we still returning something.

resetFetchedData()
return fetchedRecord.withRecord(null, offsetAfterPoll)
fetchedData.reset()
return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
poll(offset, pollTimeoutMs)
}
}

if (!fetchedData.hasNext()) {
assert(offset <= offsetAfterPoll,
s"seek to $offset and poll but the offset was reset to $offsetAfterPoll")
fetchedRecord.withRecord(null, offsetAfterPoll)
if (!fetchedData.hasNext) {
assert(offset <= fetchedData.offsetAfterPoll,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments here on what this case means.

s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}")
fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this else and reduce the condition nesting. The previous if statement always ends in an exception, so we can remove this else.

val record = fetchedData.next()
nextOffsetInFetchedData = record.offset + 1
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next
// available offset. Hence we need to handle offset mismatch.
if (record.offset > offset) {
Expand All @@ -325,9 +371,7 @@ private[kafka010] case class InternalKafkaConsumer(
// `offset` is still valid but the corresponding message is invisible. We should skip it
// and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
// `fetchData` can just return `record` directly.
assert(fetchedData.hasPrevious, "fetchedData cannot move back")
fetchedData.previous()
nextOffsetInFetchedData = record.offset
return fetchedRecord.withRecord(null, record.offset)
}
// This may happen when some records aged out but their offsets already got verified
Expand All @@ -341,7 +385,7 @@ private[kafka010] case class InternalKafkaConsumer(
null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be returning null EVER when we are using FetchedRecord.record = null to signify lack of record.

} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
fetchedRecord.withRecord(record, nextOffsetInFetchedData)
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This can be unnested.
if ... else { if ... else ... } -> if ... else if .. else

}
} else if (record.offset < offset) {
Expand All @@ -350,7 +394,7 @@ private[kafka010] case class InternalKafkaConsumer(
throw new IllegalStateException(
s"Tried to fetch $offset but the returned record offset was ${record.offset}")
} else {
fetchedRecord.withRecord(record, nextOffsetInFetchedData)
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
}
}
Expand All @@ -359,13 +403,7 @@ private[kafka010] case class InternalKafkaConsumer(
private def resetConsumer(): Unit = {
consumer.close()
consumer = createConsumer
resetFetchedData()
}

/** Reset the internal pre-fetched data. */
private def resetFetchedData(): Unit = {
nextOffsetInFetchedData = UNKNOWN_OFFSET
fetchedData = ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
fetchedData.reset()
}

/**
Expand Down Expand Up @@ -414,9 +452,9 @@ private[kafka010] case class InternalKafkaConsumer(
val p = consumer.poll(pollTimeoutMs)
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
offsetAfterPoll = consumer.position(topicPartition)
val offsetAfterPoll = consumer.position(topicPartition)
logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
fetchedData = r.listIterator
fetchedData.withNewPoll(r.listIterator, offsetAfterPoll)
if (!fetchedData.hasNext) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
Expand Down