Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2a1cd27
optimization + test
bogdanrdc Aug 23, 2018
421ee20
debug benchmark + early batch
bogdanrdc Aug 23, 2018
d7e49e7
revert benchmark
bogdanrdc Aug 23, 2018
ba6d91e
Merge remote-tracking branch 'upstream/master' into local-relation-fi…
bogdanrdc Aug 24, 2018
326e5d7
test fix
bogdanrdc Aug 24, 2018
4263bd2
[SPARK-25073][YARN] AM and Executor Memory validation message is not …
sujith71955 Aug 24, 2018
f84d256
[SPARK-25214][SS] Fix the issue that Kafka v2 source may return dupli…
zsxwing Aug 24, 2018
c721895
[SPARK-25174][YARN] Limit the size of diagnostic message for am to un…
yaooqinn Aug 24, 2018
f8536e3
[SPARK-25234][SPARKR] avoid integer overflow in parallelize
mengxr Aug 24, 2018
af6a91e
Correct missing punctuation in the documentation
Aug 25, 2018
c613c6b
[MINOR] Fix Scala 2.12 build
dbtsai Aug 25, 2018
ee1c0e8
[SPARK-24688][EXAMPLES] Modify the comments about LabeledPoint
huangweizhe123 Aug 25, 2018
77fb55e
[SPARK-25214][SS][FOLLOWUP] Fix the issue that Kafka v2 source may re…
zsxwing Aug 25, 2018
b00824c
[SPARK-23792][DOCS] Documentation improvements for datetime functions
abradbury Aug 26, 2018
ee6cb6c
[SPARK-23698][PYTHON][FOLLOWUP] Resolve undefiend names in setup.py
HyukjinKwon Aug 27, 2018
c129176
[SPARK-19355][SQL][FOLLOWUP] Remove the child.outputOrdering check in…
viirya Aug 27, 2018
368b42f
[SPARK-24978][SQL] Add spark.sql.fast.hash.aggregate.row.max.capacity…
heary-cao Aug 27, 2018
0378b1f
[SPARK-25249][CORE][TEST] add a unit test for OpenHashMap
10110346 Aug 27, 2018
d5a953a
[SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests.
jose-torres Aug 27, 2018
3598483
[SPARK-24149][YARN][FOLLOW-UP] Only get the delegation tokens of the …
wangyum Aug 27, 2018
397fa62
[SPARK-24090][K8S] Update running-on-kubernetes.md
liyinan926 Aug 27, 2018
dcd001b
[SPARK-24721][SQL] Exclude Python UDFs filters in FileSourceStrategy
icexelloss Aug 28, 2018
b23538b
[SPARK-25218][CORE] Fix potential resource leaks in TransportServer a…
zsxwing Aug 28, 2018
f769a94
[SPARK-25005][SS] Support non-consecutive offsets for Kafka
zsxwing Aug 28, 2018
68c41ff
comment
bogdanrdc Aug 28, 2018
dad6a7f
Merge remote-tracking branch 'upstream/master' into local-relation-fi…
bogdanrdc Aug 28, 2018
cb067c3
Merge remote-tracking branch 'upstream/master' into local-relation-fi…
bogdanrdc Aug 28, 2018
d552cc1
space
bogdanrdc Aug 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-25005][SS] Support non-consecutive offsets for Kafka
## What changes were proposed in this pull request?

As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support:

- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch

They are all covered by the new unit tests.

## How was this patch tested?

The new unit tests.

Closes #22042 from zsxwing/kafka-transaction-read.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
  • Loading branch information
zsxwing authored and bogdanrdc committed Aug 28, 2018
commit f769a9410d2c43517cc40c27f678ee92b0f6fbf3
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class KafkaContinuousPartitionReader(

// This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
// or if it's the endpoint of the data range (i.e. the "true" next offset).
case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
val range = consumer.getAvailableOffsetRange()
if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
// retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,19 @@ import org.apache.spark.util.UninterruptibleThread

private[kafka010] sealed trait KafkaDataConsumer {
/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
* Get the record for the given offset if available.
*
* If the record is invisible (either a
* transaction message, or an aborted message when the consumer's `isolation.level` is
* `read_committed`), it will be skipped and this method will try to fetch next available record
* within [offset, untilOffset).
*
* This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will
* throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
* method will try to fetch next available record within [offset, untilOffset).
*
* When this method tries to skip offsets due to either invisible messages or data loss and
* reaches `untilOffset`, it will return `null`.
*
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
Expand Down Expand Up @@ -80,6 +90,83 @@ private[kafka010] case class InternalKafkaConsumer(
kafkaParams: ju.Map[String, Object]) extends Logging {
import InternalKafkaConsumer._

/**
* The internal object to store the fetched data from Kafka consumer and the next offset to poll.
*
* @param _records the pre-fetched Kafka records.
* @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we
* should check if the pre-fetched data is still valid.
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
* poll when `records` is drained.
*/
private case class FetchedData(
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
private var _nextOffsetInFetchedData: Long,
private var _offsetAfterPoll: Long) {

def withNewPoll(
records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
offsetAfterPoll: Long): FetchedData = {
this._records = records
this._nextOffsetInFetchedData = UNKNOWN_OFFSET
this._offsetAfterPoll = offsetAfterPoll
this
}

/** Whether there are more elements */
def hasNext: Boolean = _records.hasNext

/** Move `records` forward and return the next record. */
def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
val record = _records.next()
_nextOffsetInFetchedData = record.offset + 1
record
}

/** Move `records` backward and return the previous record. */
def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
assert(_records.hasPrevious, "fetchedData cannot move back")
val record = _records.previous()
_nextOffsetInFetchedData = record.offset
record
}

/** Reset the internal pre-fetched data. */
def reset(): Unit = {
_records = ju.Collections.emptyListIterator()
}

/**
* Returns the next offset in `records`. We use this to verify if we should check if the
* pre-fetched data is still valid.
*/
def nextOffsetInFetchedData: Long = _nextOffsetInFetchedData

/**
* Returns the next offset to poll after draining the pre-fetched records.
*/
def offsetAfterPoll: Long = _offsetAfterPoll
}

/**
* The internal object returned by the `fetchRecord` method. If `record` is empty, it means it is
* invisible (either a transaction message, or an aborted message when the consumer's
* `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
* instead.
*/
private case class FetchedRecord(
var record: ConsumerRecord[Array[Byte], Array[Byte]],
var nextOffsetToFetch: Long) {

def withRecord(
record: ConsumerRecord[Array[Byte], Array[Byte]],
nextOffsetToFetch: Long): FetchedRecord = {
this.record = record
this.nextOffsetToFetch = nextOffsetToFetch
this
}
}

private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

@volatile private var consumer = createConsumer
Expand All @@ -90,10 +177,21 @@ private[kafka010] case class InternalKafkaConsumer(
/** indicate whether this consumer is going to be stopped in the next release */
@volatile var markedForClose = false

/** Iterator to the already fetch data */
@volatile private var fetchedData =
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
@volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET
/**
* The fetched data returned from Kafka consumer. This is a reusable private object to avoid
* memory allocation.
*/
private val fetchedData = FetchedData(
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
UNKNOWN_OFFSET,
UNKNOWN_OFFSET)

/**
* The fetched record returned from the `fetchRecord` method. This is a reusable private object to
* avoid memory allocation.
*/
private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET)


/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
Expand Down Expand Up @@ -125,20 +223,7 @@ private[kafka010] case class InternalKafkaConsumer(
AvailableOffsetRange(earliestOffset, latestOffset)
}

/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
*
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
* offset if available, or throw exception.when `failOnDataLoss` is `false`,
* this method will either return record at offset if available, or return
* the next earliest available record less than untilOffset, or null. It
* will not throw any exception.
*/
/** @see [[KafkaDataConsumer.get]] */
def get(
offset: Long,
untilOffset: Long,
Expand All @@ -147,21 +232,32 @@ private[kafka010] case class InternalKafkaConsumer(
ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
require(offset < untilOffset,
s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset")
logDebug(s"Get $groupId $topicPartition nextOffset ${fetchedData.nextOffsetInFetchedData} " +
s"requested $offset")
// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
// `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
// we will move to the next available offset within `[offset, untilOffset)` and retry.
// If `failOnDataLoss` is `true`, the loop body will be executed only once.
var toFetchOffset = offset
var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
var fetchedRecord: FetchedRecord = null
// We want to break out of the while loop on a successful fetch to avoid using "return"
// which may cause a NonLocalReturnControl exception when this method is used as a function.
var isFetchComplete = false

while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
try {
consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
isFetchComplete = true
fetchedRecord = fetchRecord(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
if (fetchedRecord.record != null) {
isFetchComplete = true
} else {
toFetchOffset = fetchedRecord.nextOffsetToFetch
if (toFetchOffset >= untilOffset) {
fetchedData.reset()
toFetchOffset = UNKNOWN_OFFSET
} else {
logDebug(s"Skipped offsets [$offset, $toFetchOffset]")
}
}
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached
Expand All @@ -174,9 +270,9 @@ private[kafka010] case class InternalKafkaConsumer(
}

if (isFetchComplete) {
consumerRecord
fetchedRecord.record
} else {
resetFetchedData()
fetchedData.reset()
null
}
}
Expand Down Expand Up @@ -239,65 +335,81 @@ private[kafka010] case class InternalKafkaConsumer(
}

/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
* Get the fetched record for the given offset if available.
*
* If the record is invisible (either a transaction message, or an aborted message when the
* consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the
* next offset to fetch.
*
* This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will
* throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
* method will return `null` if the next available record is within [offset, untilOffset).
*
* @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
*/
private def fetchData(
private def fetchRecord(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
// This is the first fetch, or the last pre-fetched data has been drained.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
poll(pollTimeoutMs)
}

if (!fetchedData.hasNext()) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. Just throw
// `OffsetOutOfRangeException` to let the caller handle it.
// - Cannot fetch any data before timeout. TimeoutException will be thrown.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
failOnDataLoss: Boolean): FetchedRecord = {
if (offset != fetchedData.nextOffsetInFetchedData) {
// This is the first fetch, or the fetched data has been reset.
// Fetch records from Kafka and update `fetchedData`.
fetchData(offset, pollTimeoutMs)
} else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained.
if (offset < fetchedData.offsetAfterPoll) {
// Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask
// the next call to start from `fetchedData.offsetAfterPoll`.
fetchedData.reset()
return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
// Fetch records from Kafka and update `fetchedData`.
fetchData(offset, pollTimeoutMs)
}
}

if (!fetchedData.hasNext) {
// When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still
// empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a
// record to ask the next call to start from `fetchedData.offsetAfterPoll`.
assert(offset <= fetchedData.offsetAfterPoll,
s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}")
fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
val record = fetchedData.next()
nextOffsetInFetchedData = record.offset + 1
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next
// available offset. Hence we need to handle offset mismatch.
if (record.offset > offset) {
val range = getAvailableOffsetRange()
if (range.earliest <= offset) {
// `offset` is still valid but the corresponding message is invisible. We should skip it
// and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
// `fetchRecord` can just return `record` directly.
fetchedData.previous()
return fetchedRecord.withRecord(null, record.offset)
}
// This may happen when some records aged out but their offsets already got verified
if (failOnDataLoss) {
reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
// Never happen as "reportDataLoss" will throw an exception
null
throw new IllegalStateException(
"reportDataLoss didn't throw an exception when 'failOnDataLoss' is true")
} else if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
// Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
fetchedRecord.withRecord(null, untilOffset)
} else {
if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
null
} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
record
}
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
} else if (record.offset < offset) {
// This should not happen. If it does happen, then we probably misunderstand Kafka internal
// mechanism.
throw new IllegalStateException(
s"Tried to fetch $offset but the returned record offset was ${record.offset}")
} else {
record
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
}
}
Expand All @@ -306,13 +418,7 @@ private[kafka010] case class InternalKafkaConsumer(
private def resetConsumer(): Unit = {
consumer.close()
consumer = createConsumer
resetFetchedData()
}

/** Reset the internal pre-fetched data. */
private def resetFetchedData(): Unit = {
nextOffsetInFetchedData = UNKNOWN_OFFSET
fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
fetchedData.reset()
}

/**
Expand Down Expand Up @@ -346,11 +452,40 @@ private[kafka010] case class InternalKafkaConsumer(
consumer.seek(topicPartition, offset)
}

private def poll(pollTimeoutMs: Long): Unit = {
/**
* Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be
* empty if the Kafka consumer fetches some messages but all of them are not visible messages
* (either transaction messages, or aborted messages when `isolation.level` is `read_committed`).
*
* @throws OffsetOutOfRangeException if `offset` is out of range.
* @throws TimeoutException if the consumer position is not changed after polling. It means the
* consumer polls nothing before timeout.
*/
private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
val p = consumer.poll(pollTimeoutMs)
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
fetchedData = r.iterator
val offsetAfterPoll = consumer.position(topicPartition)
logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
fetchedData.withNewPoll(r.listIterator, offsetAfterPoll)
if (!fetchedData.hasNext) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
// be thrown.
// - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
// - Fetched something but all of them are not invisible. This is a valid case and let the
// caller handles this.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else if (offset == offsetAfterPoll) {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
}
}
}
}

Expand Down
Loading