Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fix typo
  • Loading branch information
zsxwing committed Apr 25, 2017
commit bde6120320bb8a4abc68597b0c021d9284998f93
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private[kafka010] case class CachedKafkaConsumer private(

case class AvailableOffsetRange(earliest: Long, latest: Long)

private def runUninterruptiblyIfPossiable[T](body: => T): T = Thread.currentThread match {
private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly(body)
case _ =>
Expand All @@ -76,7 +76,7 @@ private[kafka010] case class CachedKafkaConsumer private(
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
def getAvailableOffsetRange(): AvailableOffsetRange = runUninterruptiblyIfPossiable {
def getAvailableOffsetRange(): AvailableOffsetRange = runUninterruptiblyIfPossible {
consumer.seekToBeginning(Set(topicPartition).asJava)
val earliestOffset = consumer.position(topicPartition)
consumer.seekToEnd(Set(topicPartition).asJava)
Expand All @@ -103,7 +103,7 @@ private[kafka010] case class CachedKafkaConsumer private(
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean):
ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossiable {
ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
require(offset < untilOffset,
s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset")
Expand Down