Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Avoid using "return" inside CachedKafkaConsumer.get as it is passed…
… to `org.apache.spark.util.UninterruptibleThread.runUninterruptibly` as a function type which causes a NonLocalReturnControl to be called for every call
  • Loading branch information
Yuval Itzchakov committed Aug 26, 2017
commit c20bd14a4bed34644efc11de420a1caeccea329e
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,15 @@ private[kafka010] case class CachedKafkaConsumer private(
// we will move to the next available offset within `[offset, untilOffset)` and retry.
// If `failOnDataLoss` is `true`, the loop body will be executed only once.
var toFetchOffset = offset
while (toFetchOffset != UNKNOWN_OFFSET) {
var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
// We want to break out of the while loop on a successful fetch to avoid using "return"
// which may causes a NonLocalReturnControl exception when this method is used as a function.
var isFetchComplete = false

while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
try {
return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
isFetchComplete = true
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached
Expand All @@ -125,8 +131,11 @@ private[kafka010] case class CachedKafkaConsumer private(
toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset)
}
}
resetFetchedData()
null

if (isFetchComplete) consumerRecord else {
resetFetchedData()
null
}
}

/**
Expand Down