Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed comments
  • Loading branch information
tdas committed Mar 2, 2018
commit 3eae3f188deaf82d6384ebf698f09c9bf0d0b735
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader(
// Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
// Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
// (KAFKA-1894).
require(Thread.currentThread().isInstanceOf[UninterruptibleThread])
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])

// SparkSession is required for getting Hadoop configuration for writing to checkpoints
require(SparkSession.getActiveSession.nonEmpty)
assert(SparkSession.getActiveSession.nonEmpty)

val metadataLog =
new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
Expand Down Expand Up @@ -323,8 +323,8 @@ private[kafka010] case class KafkaMicroBatchDataReader(

private val consumer = {
if (!reuseKafkaConsumer) {
// If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. As here we
// uses `assign`, we don't need to worry about the "group.id" conflicts.
// If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. We
// uses `assign` here, hence we don't need to worry about the "group.id" conflicts.
CachedKafkaConsumer.createUncached(
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
} else {
Expand Down Expand Up @@ -360,7 +360,6 @@ private[kafka010] case class KafkaMicroBatchDataReader(
}

override def close(): Unit = {
// Indicate that we're no longer using this consumer
if (!reuseKafkaConsumer) {
// Don't forget to close non-reuse KafkaConsumers. You may take down your cluster!
consumer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions
* Class to calculate offset ranges to process based on the the from and until offsets, and
* the configured `minPartitions`.
*/
private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) {
require(minPartitions >= 0)
private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) {
require(minPartitions.isEmpty || minPartitions.get > 0)

import KafkaOffsetRangeCalculator._
/**
* Calculate the offset ranges that we are going to process this batch. If `numPartitions`
* Calculate the offset ranges that we are going to process this batch. If `minPartitions`
* is not set or is set less than or equal the number of `topicPartitions` that we're going to
* consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If
* `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up
Expand All @@ -50,32 +50,30 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) {
}

// If minPartitions not set or there are enough partitions to satisfy minPartitions
if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) {
if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
// Assign preferred executor locations to each range such that the same topic-partition is
// always read from the same executor and the KafkaConsumer can be reused
// preferentially read from the same executor and the KafkaConsumer can be reused.
offsetRanges.map { range =>
range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations))
}
} else {

// Splits offset ranges with relatively large amount of data to smaller ones.
val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: map(_.size).sum

offsetRanges.flatMap { offsetRange =>
val tp = offsetRange.topicPartition
val size = offsetRange.untilOffset - offsetRange.fromOffset
// number of partitions to divvy up this topic partition to
val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt
var remaining = size
var startOffset = offsetRange.fromOffset
(0 until parts).map { part =>
// Fine to do integer division. Last partition will consume all the round off errors
val thisPartition = remaining / (parts - part)
remaining -= thisPartition
val endOffset = startOffset + thisPartition
val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, preferredLoc = None)
startOffset = endOffset
offsetRange
val idealRangeSize = totalSize.toDouble / minPartitions.get

offsetRanges.flatMap { range =>
// Split the current range into subranges as close to the ideal range size
val rangeSize = range.untilOffset - range.fromOffset
val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: range.size, you may remove rangeSize above


(0 until numSplitsInRange).map { i =>
val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange)
val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange)
KafkaOffsetRange(
range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

}
}
}
Expand All @@ -94,10 +92,9 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) {

private[kafka010] object KafkaOffsetRangeCalculator {

private val DEFAULT_MIN_PARTITIONS = 0

def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = {
new KafkaOffsetRangeCalculator(options.getInt("minPartitions", DEFAULT_MIN_PARTITIONS))
val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .orNull instead of .orElse(null). Why don't you actually do:

options.get("minPartitions").map(_.toInt)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it returns java Optional and not scala Option.

new KafkaOffsetRangeCalculator(optionalValue)
}
}

Expand Down