@@ -33,9 +33,19 @@ import org.apache.spark.util.UninterruptibleThread
3333
3434private [kafka010] sealed trait KafkaDataConsumer {
3535 /**
36- * Get the record for the given offset if available. Otherwise it will either throw error
37- * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
38- * or null.
36+ * Get the record for the given offset if available.
37+ *
38+ * If the record is invisible (either a
39+ * transaction message, or an aborted message when the consumer's `isolation.level` is
40+ * `read_committed`), it will be skipped and this method will try to fetch next available record
41+ * within [offset, untilOffset).
42+ *
43+ * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will
44+ * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
45+ * method will try to fetch next available record within [offset, untilOffset).
46+ *
47+ * When this method tries to skip offsets due to either invisible messages or data loss and
48+ * reaches `untilOffset`, it will return `null`.
3949 *
4050 * @param offset the offset to fetch.
4151 * @param untilOffset the max offset to fetch. Exclusive.
@@ -80,6 +90,83 @@ private[kafka010] case class InternalKafkaConsumer(
8090 kafkaParams : ju.Map [String , Object ]) extends Logging {
8191 import InternalKafkaConsumer ._
8292
93+ /**
94+ * The internal object to store the fetched data from Kafka consumer and the next offset to poll.
95+ *
96+ * @param _records the pre-fetched Kafka records.
97+ * @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we
98+ * should check if the pre-fetched data is still valid.
99+ * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
100+ * poll when `records` is drained.
101+ */
102+ private case class FetchedData (
103+ private var _records : ju.ListIterator [ConsumerRecord [Array [Byte ], Array [Byte ]]],
104+ private var _nextOffsetInFetchedData : Long ,
105+ private var _offsetAfterPoll : Long ) {
106+
107+ def withNewPoll (
108+ records : ju.ListIterator [ConsumerRecord [Array [Byte ], Array [Byte ]]],
109+ offsetAfterPoll : Long ): FetchedData = {
110+ this ._records = records
111+ this ._nextOffsetInFetchedData = UNKNOWN_OFFSET
112+ this ._offsetAfterPoll = offsetAfterPoll
113+ this
114+ }
115+
116+ /** Whether there are more elements */
117+ def hasNext : Boolean = _records.hasNext
118+
119+ /** Move `records` forward and return the next record. */
120+ def next (): ConsumerRecord [Array [Byte ], Array [Byte ]] = {
121+ val record = _records.next()
122+ _nextOffsetInFetchedData = record.offset + 1
123+ record
124+ }
125+
126+ /** Move `records` backward and return the previous record. */
127+ def previous (): ConsumerRecord [Array [Byte ], Array [Byte ]] = {
128+ assert(_records.hasPrevious, " fetchedData cannot move back" )
129+ val record = _records.previous()
130+ _nextOffsetInFetchedData = record.offset
131+ record
132+ }
133+
134+ /** Reset the internal pre-fetched data. */
135+ def reset (): Unit = {
136+ _records = ju.Collections .emptyListIterator()
137+ }
138+
139+ /**
140+ * Returns the next offset in `records`. We use this to verify if we should check if the
141+ * pre-fetched data is still valid.
142+ */
143+ def nextOffsetInFetchedData : Long = _nextOffsetInFetchedData
144+
145+ /**
146+ * Returns the next offset to poll after draining the pre-fetched records.
147+ */
148+ def offsetAfterPoll : Long = _offsetAfterPoll
149+ }
150+
151+ /**
152+ * The internal object returned by the `fetchRecord` method. If `record` is empty, it means it is
153+ * invisible (either a transaction message, or an aborted message when the consumer's
154+ * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
155+ * instead.
156+ */
157+ private case class FetchedRecord (
158+ var record : ConsumerRecord [Array [Byte ], Array [Byte ]],
159+ var nextOffsetToFetch : Long ) {
160+
161+ def withRecord (
162+ record : ConsumerRecord [Array [Byte ], Array [Byte ]],
163+ nextOffsetToFetch : Long ): FetchedRecord = {
164+ this .record = record
165+ this .nextOffsetToFetch = nextOffsetToFetch
166+ this
167+ }
168+ }
169+
83170 private val groupId = kafkaParams.get(ConsumerConfig .GROUP_ID_CONFIG ).asInstanceOf [String ]
84171
85172 @ volatile private var consumer = createConsumer
@@ -90,10 +177,21 @@ private[kafka010] case class InternalKafkaConsumer(
90177 /** indicate whether this consumer is going to be stopped in the next release */
91178 @ volatile var markedForClose = false
92179
93- /** Iterator to the already fetch data */
94- @ volatile private var fetchedData =
95- ju.Collections .emptyIterator[ConsumerRecord [Array [Byte ], Array [Byte ]]]
96- @ volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET
180+ /**
181+ * The fetched data returned from Kafka consumer. This is a reusable private object to avoid
182+ * memory allocation.
183+ */
184+ private val fetchedData = FetchedData (
185+ ju.Collections .emptyListIterator[ConsumerRecord [Array [Byte ], Array [Byte ]]],
186+ UNKNOWN_OFFSET ,
187+ UNKNOWN_OFFSET )
188+
189+ /**
190+ * The fetched record returned from the `fetchRecord` method. This is a reusable private object to
191+ * avoid memory allocation.
192+ */
193+ private val fetchedRecord : FetchedRecord = FetchedRecord (null , UNKNOWN_OFFSET )
194+
97195
98196 /** Create a KafkaConsumer to fetch records for `topicPartition` */
99197 private def createConsumer : KafkaConsumer [Array [Byte ], Array [Byte ]] = {
@@ -125,20 +223,7 @@ private[kafka010] case class InternalKafkaConsumer(
125223 AvailableOffsetRange (earliestOffset, latestOffset)
126224 }
127225
128- /**
129- * Get the record for the given offset if available. Otherwise it will either throw error
130- * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
131- * or null.
132- *
133- * @param offset the offset to fetch.
134- * @param untilOffset the max offset to fetch. Exclusive.
135- * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
136- * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
137- * offset if available, or throw exception.when `failOnDataLoss` is `false`,
138- * this method will either return record at offset if available, or return
139- * the next earliest available record less than untilOffset, or null. It
140- * will not throw any exception.
141- */
226+ /** @see [[KafkaDataConsumer.get ]] */
142227 def get (
143228 offset : Long ,
144229 untilOffset : Long ,
@@ -147,21 +232,32 @@ private[kafka010] case class InternalKafkaConsumer(
147232 ConsumerRecord [Array [Byte ], Array [Byte ]] = runUninterruptiblyIfPossible {
148233 require(offset < untilOffset,
149234 s " offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset] " )
150- logDebug(s " Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset" )
235+ logDebug(s " Get $groupId $topicPartition nextOffset ${fetchedData.nextOffsetInFetchedData} " +
236+ s " requested $offset" )
151237 // The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
152238 // `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
153239 // we will move to the next available offset within `[offset, untilOffset)` and retry.
154240 // If `failOnDataLoss` is `true`, the loop body will be executed only once.
155241 var toFetchOffset = offset
156- var consumerRecord : ConsumerRecord [ Array [ Byte ], Array [ Byte ]] = null
242+ var fetchedRecord : FetchedRecord = null
157243 // We want to break out of the while loop on a successful fetch to avoid using "return"
158244 // which may cause a NonLocalReturnControl exception when this method is used as a function.
159245 var isFetchComplete = false
160246
161247 while (toFetchOffset != UNKNOWN_OFFSET && ! isFetchComplete) {
162248 try {
163- consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
164- isFetchComplete = true
249+ fetchedRecord = fetchRecord(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
250+ if (fetchedRecord.record != null ) {
251+ isFetchComplete = true
252+ } else {
253+ toFetchOffset = fetchedRecord.nextOffsetToFetch
254+ if (toFetchOffset >= untilOffset) {
255+ fetchedData.reset()
256+ toFetchOffset = UNKNOWN_OFFSET
257+ } else {
258+ logDebug(s " Skipped offsets [ $offset, $toFetchOffset] " )
259+ }
260+ }
165261 } catch {
166262 case e : OffsetOutOfRangeException =>
167263 // When there is some error thrown, it's better to use a new consumer to drop all cached
@@ -174,9 +270,9 @@ private[kafka010] case class InternalKafkaConsumer(
174270 }
175271
176272 if (isFetchComplete) {
177- consumerRecord
273+ fetchedRecord.record
178274 } else {
179- resetFetchedData ()
275+ fetchedData.reset ()
180276 null
181277 }
182278 }
@@ -239,65 +335,81 @@ private[kafka010] case class InternalKafkaConsumer(
239335 }
240336
241337 /**
242- * Get the record for the given offset if available. Otherwise it will either throw error
243- * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
244- * or null.
338+ * Get the fetched record for the given offset if available.
339+ *
340+ * If the record is invisible (either a transaction message, or an aborted message when the
341+ * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the
342+ * next offset to fetch.
343+ *
344+ * This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will
345+ * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
346+ * method will return `null` if the next available record is within [offset, untilOffset).
245347 *
246348 * @throws OffsetOutOfRangeException if `offset` is out of range
247349 * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
248350 */
249- private def fetchData (
351+ private def fetchRecord (
250352 offset : Long ,
251353 untilOffset : Long ,
252354 pollTimeoutMs : Long ,
253- failOnDataLoss : Boolean ): ConsumerRecord [Array [Byte ], Array [Byte ]] = {
254- if (offset != nextOffsetInFetchedData || ! fetchedData.hasNext()) {
255- // This is the first fetch, or the last pre-fetched data has been drained.
256- // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
257- seek(offset)
258- poll(pollTimeoutMs)
259- }
260-
261- if (! fetchedData.hasNext()) {
262- // We cannot fetch anything after `poll`. Two possible cases:
263- // - `offset` is out of range so that Kafka returns nothing. Just throw
264- // `OffsetOutOfRangeException` to let the caller handle it.
265- // - Cannot fetch any data before timeout. TimeoutException will be thrown.
266- val range = getAvailableOffsetRange()
267- if (offset < range.earliest || offset >= range.latest) {
268- throw new OffsetOutOfRangeException (
269- Map (topicPartition -> java.lang.Long .valueOf(offset)).asJava)
355+ failOnDataLoss : Boolean ): FetchedRecord = {
356+ if (offset != fetchedData.nextOffsetInFetchedData) {
357+ // This is the first fetch, or the fetched data has been reset.
358+ // Fetch records from Kafka and update `fetchedData`.
359+ fetchData(offset, pollTimeoutMs)
360+ } else if (! fetchedData.hasNext) { // The last pre-fetched data has been drained.
361+ if (offset < fetchedData.offsetAfterPoll) {
362+ // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask
363+ // the next call to start from `fetchedData.offsetAfterPoll`.
364+ fetchedData.reset()
365+ return fetchedRecord.withRecord(null , fetchedData.offsetAfterPoll)
270366 } else {
271- throw new TimeoutException (
272- s " Cannot fetch record for offset $offset in $ pollTimeoutMs milliseconds " )
367+ // Fetch records from Kafka and update `fetchedData`.
368+ fetchData( offset, pollTimeoutMs)
273369 }
370+ }
371+
372+ if (! fetchedData.hasNext) {
373+ // When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still
374+ // empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a
375+ // record to ask the next call to start from `fetchedData.offsetAfterPoll`.
376+ assert(offset <= fetchedData.offsetAfterPoll,
377+ s " seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}" )
378+ fetchedRecord.withRecord(null , fetchedData.offsetAfterPoll)
274379 } else {
275380 val record = fetchedData.next()
276- nextOffsetInFetchedData = record.offset + 1
277381 // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
278382 // available offset. Hence we need to handle offset mismatch.
279383 if (record.offset > offset) {
384+ val range = getAvailableOffsetRange()
385+ if (range.earliest <= offset) {
386+ // `offset` is still valid but the corresponding message is invisible. We should skip it
387+ // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
388+ // `fetchRecord` can just return `record` directly.
389+ fetchedData.previous()
390+ return fetchedRecord.withRecord(null , record.offset)
391+ }
280392 // This may happen when some records aged out but their offsets already got verified
281393 if (failOnDataLoss) {
282394 reportDataLoss(true , s " Cannot fetch records in [ $offset, ${record.offset}) " )
283395 // Never happen as "reportDataLoss" will throw an exception
284- null
396+ throw new IllegalStateException (
397+ " reportDataLoss didn't throw an exception when 'failOnDataLoss' is true" )
398+ } else if (record.offset >= untilOffset) {
399+ reportDataLoss(false , s " Skip missing records in [ $offset, $untilOffset) " )
400+ // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
401+ fetchedRecord.withRecord(null , untilOffset)
285402 } else {
286- if (record.offset >= untilOffset) {
287- reportDataLoss(false , s " Skip missing records in [ $offset, $untilOffset) " )
288- null
289- } else {
290- reportDataLoss(false , s " Skip missing records in [ $offset, ${record.offset}) " )
291- record
292- }
403+ reportDataLoss(false , s " Skip missing records in [ $offset, ${record.offset}) " )
404+ fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
293405 }
294406 } else if (record.offset < offset) {
295407 // This should not happen. If it does happen, then we probably misunderstand Kafka internal
296408 // mechanism.
297409 throw new IllegalStateException (
298410 s " Tried to fetch $offset but the returned record offset was ${record.offset}" )
299411 } else {
300- record
412+ fetchedRecord.withRecord( record, fetchedData.nextOffsetInFetchedData)
301413 }
302414 }
303415 }
@@ -306,13 +418,7 @@ private[kafka010] case class InternalKafkaConsumer(
306418 private def resetConsumer (): Unit = {
307419 consumer.close()
308420 consumer = createConsumer
309- resetFetchedData()
310- }
311-
312- /** Reset the internal pre-fetched data. */
313- private def resetFetchedData (): Unit = {
314- nextOffsetInFetchedData = UNKNOWN_OFFSET
315- fetchedData = ju.Collections .emptyIterator[ConsumerRecord [Array [Byte ], Array [Byte ]]]
421+ fetchedData.reset()
316422 }
317423
318424 /**
@@ -346,11 +452,40 @@ private[kafka010] case class InternalKafkaConsumer(
346452 consumer.seek(topicPartition, offset)
347453 }
348454
349- private def poll (pollTimeoutMs : Long ): Unit = {
455+ /**
456+ * Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be
457+ * empty if the Kafka consumer fetches some messages but all of them are not visible messages
458+ * (either transaction messages, or aborted messages when `isolation.level` is `read_committed`).
459+ *
460+ * @throws OffsetOutOfRangeException if `offset` is out of range.
461+ * @throws TimeoutException if the consumer position is not changed after polling. It means the
462+ * consumer polls nothing before timeout.
463+ */
464+ private def fetchData (offset : Long , pollTimeoutMs : Long ): Unit = {
465+ // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
466+ seek(offset)
350467 val p = consumer.poll(pollTimeoutMs)
351468 val r = p.records(topicPartition)
352469 logDebug(s " Polled $groupId ${p.partitions()} ${r.size}" )
353- fetchedData = r.iterator
470+ val offsetAfterPoll = consumer.position(topicPartition)
471+ logDebug(s " Offset changed from $offset to $offsetAfterPoll after polling " )
472+ fetchedData.withNewPoll(r.listIterator, offsetAfterPoll)
473+ if (! fetchedData.hasNext) {
474+ // We cannot fetch anything after `poll`. Two possible cases:
475+ // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
476+ // be thrown.
477+ // - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
478+ // - Fetched something but all of them are not invisible. This is a valid case and let the
479+ // caller handles this.
480+ val range = getAvailableOffsetRange()
481+ if (offset < range.earliest || offset >= range.latest) {
482+ throw new OffsetOutOfRangeException (
483+ Map (topicPartition -> java.lang.Long .valueOf(offset)).asJava)
484+ } else if (offset == offsetAfterPoll) {
485+ throw new TimeoutException (
486+ s " Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds " )
487+ }
488+ }
354489 }
355490}
356491
0 commit comments