Skip to content
Closed
Prev Previous commit
Next Next commit
address
  • Loading branch information
zsxwing committed Aug 15, 2018
commit baef29f2983560c8010681c9bb7e74f711c8f2e7
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread

/**
* An exception to indicate there is a missing offset in the records returned by Kafka consumer.
* This means it's either a transaction (commit or abort) marker, or an aborted message if
* "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
* missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
*/
private[kafka010] class MissingOffsetException(
val offset: Long,
val nextOffsetToFetch: Long) extends Exception(
s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch")

private[kafka010] sealed trait KafkaDataConsumer {
/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
* Get the record for the given offset if available.
*
* If the record is invisible (either a
* transaction message, or an aborted message when the consumer's `isolation.level` is
* `read_committed`), it will be skipped and this method will try to fetch next available record
* within [offset, untilOffset).
*
* This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if failOnDataLoss is true then it should throw exception... isnt it?

nit: try its best

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

* throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we throw an exception even when its a control message and there is no real data loss?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we throw an exception even when its a control message and there is no real data loss?

No. It will be skipped and this method will try to fetch next available record within [offset, untilOffset).

* method will try to fetch next available record within [offset, untilOffset).
*
* When this method tries to skip offsets due to either invisible messages or data loss and
* reaches `untilOffset`, it will return `null`.
*
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
Expand Down Expand Up @@ -91,6 +90,17 @@ private[kafka010] case class InternalKafkaConsumer(
kafkaParams: ju.Map[String, Object]) extends Logging {
import InternalKafkaConsumer._

/**
* The internal object returned by the `fetchData` method. If `record` is empty, it means it is
* invisible (either a transaction message, or an aborted message when the consumer's
* `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
* instead.
*/
private case class FetchedRecord(
record: Option[ConsumerRecord[Array[Byte], Array[Byte]]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can;t we reuse the objects here. And do we need to have an Option, thus creating a lot of Option objects all the time?

nextOffsetToFetch: Long
)

private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

@volatile private var consumer = createConsumer
Expand All @@ -103,11 +113,13 @@ private[kafka010] case class InternalKafkaConsumer(

/** Iterator to the already fetch data */
@volatile private var fetchedData =
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
@volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET

@volatile private var offsetBeforePoll: Long = UNKNOWN_OFFSET

/**
* The next available offset returned by Kafka after polling. This is the next offset after
* draining `fetchedData`.
*/
@volatile private var offsetAfterPoll: Long = UNKNOWN_OFFSET

/** Create a KafkaConsumer to fetch records for `topicPartition` */
Expand Down Expand Up @@ -140,20 +152,7 @@ private[kafka010] case class InternalKafkaConsumer(
AvailableOffsetRange(earliestOffset, latestOffset)
}

/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
*
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
* offset if available, or throw exception.when `failOnDataLoss` is `false`,
* this method will either return record at offset if available, or return
* the next earliest available record less than untilOffset, or null. It
* will not throw any exception.
*/
/** @see [[KafkaDataConsumer.get]] */
def get(
offset: Long,
untilOffset: Long,
Expand All @@ -168,15 +167,25 @@ private[kafka010] case class InternalKafkaConsumer(
// we will move to the next available offset within `[offset, untilOffset)` and retry.
// If `failOnDataLoss` is `true`, the loop body will be executed only once.
var toFetchOffset = offset
var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
var fetchedRecord: FetchedRecord = null
// We want to break out of the while loop on a successful fetch to avoid using "return"
// which may causes a NonLocalReturnControl exception when this method is used as a function.
var isFetchComplete = false

while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
try {
consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
isFetchComplete = true
fetchedRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
if (fetchedRecord.record.nonEmpty) {
isFetchComplete = true
} else {
toFetchOffset = fetchedRecord.nextOffsetToFetch
if (toFetchOffset >= untilOffset) {
resetFetchedData()
toFetchOffset = UNKNOWN_OFFSET
} else {
logDebug(s"Skipped offsets [$offset, $toFetchOffset]")
}
}
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached
Expand All @@ -185,19 +194,11 @@ private[kafka010] case class InternalKafkaConsumer(
resetConsumer()
reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset", e)
toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset)
case e: MissingOffsetException =>
toFetchOffset = e.nextOffsetToFetch
if (toFetchOffset >= untilOffset) {
resetFetchedData()
toFetchOffset = UNKNOWN_OFFSET
} else {
logDebug(s"Skipped offsets [$offset, $toFetchOffset]")
}
}
}

if (isFetchComplete) {
consumerRecord
fetchedRecord.record.get
} else {
resetFetchedData()
null
Expand Down Expand Up @@ -262,7 +263,8 @@ private[kafka010] case class InternalKafkaConsumer(
}

/**
* Get the record for the given offset if available. Otherwise it will either throw error
* Get the fetched record for the given offset if available. If the Otherwise it will either throw
* error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
*
Expand All @@ -273,43 +275,26 @@ private[kafka010] case class InternalKafkaConsumer(
offset: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this method to fetchRecord, to make it consistent with return type.

untilOffset: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update docs of this method saying that it can throw MissingOffsetException and what it means?

pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
failOnDataLoss: Boolean): FetchedRecord = {
if (offset != nextOffsetInFetchedData) {
// This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
poll(pollTimeoutMs)
poll(offset, pollTimeoutMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment that this method updates fetchedData

} else if (!fetchedData.hasNext) {
// The last pre-fetched data has been drained.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I was confused with whether the above comment was for the else if above it or for the if below it. Maybe inline it with the else if. Or leave a line after it, before the if below.

if (offset < offsetAfterPoll) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its hard to understand this condition because it hard to understand what offsetAfterPoll means? Does it refer to the offset that will be fetched next by the KafkaConsumer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the place preventing me from making offsetAfterPoll be a local var.

// Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"skip them" is confusing. What does it mean to skip? Why are we still returning something.

resetFetchedData()
throw new MissingOffsetException(offset, offsetAfterPoll)
return FetchedRecord(None, offsetAfterPoll)
} else {
seek(offset)
poll(pollTimeoutMs)
poll(offset, pollTimeoutMs)
}
}

if (!fetchedData.hasNext()) {
// We cannot fetch anything after `poll`. Three possible cases:
// - `offset` is out of range so that Kafka returns nothing. Just throw
// `OffsetOutOfRangeException` to let the caller handle it.
// - Cannot fetch any data before timeout. TimeoutException will be thrown.
// - Fetched something but all of them are not valid date messages. In this case, the position
// will be changed and we can use it to determine this case.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else if (offsetBeforePoll == offsetAfterPoll) {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
} else {
assert(offset <= offsetAfterPoll,
s"seek to $offset and poll but the offset was reset to $offsetAfterPoll")
throw new MissingOffsetException(offset, offsetAfterPoll)
}
assert(offset <= offsetAfterPoll,
s"seek to $offset and poll but the offset was reset to $offsetAfterPoll")
FetchedRecord(None, offsetAfterPoll)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this else and reduce the condition nesting. The previous if statement always ends in an exception, so we can remove this else.

val record = fetchedData.next()
nextOffsetInFetchedData = record.offset + 1
Expand All @@ -318,8 +303,13 @@ private[kafka010] case class InternalKafkaConsumer(
if (record.offset > offset) {
val range = getAvailableOffsetRange()
if (range.earliest <= offset) {
resetFetchedData()
throw new MissingOffsetException(offset, record.offset)
// `offset` is still valid but the corresponding message is invisible. We should skip it
// and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
// `fetchData` can just return `record` directly.
assert(fetchedData.hasPrevious, "fetchedData cannot move back")
fetchedData.previous()
nextOffsetInFetchedData = record.offset
return FetchedRecord(None, record.offset)
}
// This may happen when some records aged out but their offsets already got verified
if (failOnDataLoss) {
Expand All @@ -332,7 +322,7 @@ private[kafka010] case class InternalKafkaConsumer(
null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be returning null EVER when we are using FetchedRecord.record = null to signify lack of record.

} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
record
FetchedRecord(Some(record), nextOffsetInFetchedData)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This can be unnested.
if ... else { if ... else ... } -> if ... else if .. else

}
} else if (record.offset < offset) {
Expand All @@ -341,7 +331,7 @@ private[kafka010] case class InternalKafkaConsumer(
throw new IllegalStateException(
s"Tried to fetch $offset but the returned record offset was ${record.offset}")
} else {
record
FetchedRecord(Some(record), nextOffsetInFetchedData)
}
}
}
Expand All @@ -356,7 +346,7 @@ private[kafka010] case class InternalKafkaConsumer(
/** Reset the internal pre-fetched data. */
private def resetFetchedData(): Unit = {
nextOffsetInFetchedData = UNKNOWN_OFFSET
fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
fetchedData = ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
}

/**
Expand Down Expand Up @@ -390,14 +380,40 @@ private[kafka010] case class InternalKafkaConsumer(
consumer.seek(topicPartition, offset)
}

private def poll(pollTimeoutMs: Long): Unit = {
offsetBeforePoll = consumer.position(topicPartition)
/**
* Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`.
* `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible
* messages (either transaction messages, or aborted messages when `isolation.level` is
* `read_committed`).
*
* @throws OffsetOutOfRangeException if `offset` is out of range.
* @throws TimeoutException if the consumer position is not changed after polling. It means the
* consumer polls nothing before timeout.
*/
private def poll(offset: Long, pollTimeoutMs: Long): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this method to be consistent with that it does .... fetch data.

seek(offset)
val p = consumer.poll(pollTimeoutMs)
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
offsetAfterPoll = consumer.position(topicPartition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly think that this should not be a var, rather a clear return value. we have been burnt by too many mutable vars/defs (see all the flakiness caused by the structured ProgressReporter) and we should consciously try to improve this everywhere by not having vars all over the place.

logDebug(s"Offset changed from $offsetBeforePoll to $offsetAfterPoll after polling")
fetchedData = r.iterator
logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
fetchedData = r.listIterator
if (!fetchedData.hasNext) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
// be thrown.
// - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
// - Fetched something but all of them are not invisible. This is a valid case and let the
// caller handles this.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else if (offset == offsetAfterPoll) {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
}
}
}
}

Expand Down