Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._

import org.apache.commons.io.IOUtils
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -64,8 +63,6 @@ private[kafka010] class KafkaMicroBatchReader(
failOnDataLoss: Boolean)
extends MicroBatchReader with SupportsScanUnsafeRow with Logging {

type PartitionOffsetMap = Map[TopicPartition, Long]

private var startPartitionOffsets: PartitionOffsetMap = _
private var endPartitionOffsets: PartitionOffsetMap = _

Expand All @@ -76,6 +73,7 @@ private[kafka010] class KafkaMicroBatchReader(
private val maxOffsetsPerTrigger =
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)

private val rangeCalculator = KafkaOffsetRangeCalculator(options)
/**
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
Expand Down Expand Up @@ -106,15 +104,15 @@ private[kafka010] class KafkaMicroBatchReader(
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
// Find the new partitions, and get their earliest offsets
val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
if (newPartitionOffsets.keySet != newPartitions) {
val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
if (newPartitionInitialOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
reportDataLoss(
s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
}
logInfo(s"Partitions added: $newPartitionOffsets")
newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
logInfo(s"Partitions added: $newPartitionInitialOffsets")
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
reportDataLoss(
s"Added partition $p starts from $o instead of 0. Some data may have been missed")
}
Expand All @@ -125,46 +123,28 @@ private[kafka010] class KafkaMicroBatchReader(
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
}

// Use the until partitions to calculate offset ranges to ignore partitions that have
// Use the end partitions to calculate offset ranges to ignore partitions that have
// been deleted
val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
// Ignore partitions that we don't know the from offsets.
newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp)
}.toSeq
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))

val sortedExecutors = getSortedExecutorList()
val numExecutors = sortedExecutors.length
logDebug("Sorted executors: " + sortedExecutors.mkString(", "))

// Calculate offset ranges
val factories = topicPartitions.flatMap { tp =>
val fromOffset = startPartitionOffsets.get(tp).getOrElse {
newPartitionOffsets.getOrElse(
tp, {
// This should not happen since newPartitionOffsets contains all partitions not in
// fromPartitionOffsets
throw new IllegalStateException(s"$tp doesn't have a from offset")
})
}
val untilOffset = endPartitionOffsets(tp)

if (untilOffset >= fromOffset) {
// This allows cached KafkaConsumers in the executors to be re-used to read the same
// partition in every batch.
val preferredLoc = if (numExecutors > 0) {
Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
} else None
val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
Some(
new KafkaMicroBatchDataReaderFactory(
range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
} else {
reportDataLoss(
s"Partition $tp's offset was changed from " +
s"$fromOffset to $untilOffset, some data may have been missed")
None
}
val offsetRanges = rangeCalculator.getRanges(
fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
untilOffsets = endPartitionOffsets,
executorLocations = getSortedExecutorList())

// Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
// that is, concurrent tasks will not read the same TopicPartitions.
val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size

// Generate factories based on the offset ranges
val factories = offsetRanges.map { range =>
new KafkaMicroBatchDataReaderFactory(
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
}
factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
}
Expand Down Expand Up @@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader(
}

/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
private[kafka010] class KafkaMicroBatchDataReaderFactory(
range: KafkaOffsetRange,
preferredLoc: Option[String],
private[kafka010] case class KafkaMicroBatchDataReaderFactory(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {

override def preferredLocations(): Array[String] = preferredLoc.toArray
override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray

override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader(
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
}

/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
private[kafka010] class KafkaMicroBatchDataReader(
private[kafka010] case class KafkaMicroBatchDataReader(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging {
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging {

private val consumer = {
if (!reuseKafkaConsumer) {
// If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. We
// uses `assign` here, hence we don't need to worry about the "group.id" conflicts.
CachedKafkaConsumer.createUncached(
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
} else {
CachedKafkaConsumer.getOrCreate(
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
}
}

private val consumer = CachedKafkaConsumer.getOrCreate(
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
private val rangeToRead = resolveRange(offsetRange)
private val converter = new KafkaRecordToUnsafeRowConverter

Expand Down Expand Up @@ -369,9 +360,14 @@ private[kafka010] class KafkaMicroBatchDataReader(
}

override def close(): Unit = {
// Indicate that we're no longer using this consumer
CachedKafkaConsumer.releaseKafkaConsumer(
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
if (!reuseKafkaConsumer) {
// Don't forget to close non-reuse KafkaConsumers. You may take down your cluster!
consumer.close()
} else {
// Indicate that we're no longer using this consumer
CachedKafkaConsumer.releaseKafkaConsumer(
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
}
}

private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
Expand All @@ -392,12 +388,9 @@ private[kafka010] class KafkaMicroBatchDataReader(
} else {
range.untilOffset
}
KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset)
KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None)
} else {
range
}
}
}

private[kafka010] case class KafkaOffsetRange(
topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long)
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.sources.v2.DataSourceOptions


/**
* Class to calculate offset ranges to process based on the the from and until offsets, and
* the configured `minPartitions`.
*/
private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) {
require(minPartitions.isEmpty || minPartitions.get > 0)

import KafkaOffsetRangeCalculator._
/**
* Calculate the offset ranges that we are going to process this batch. If `minPartitions`
* is not set or is set less than or equal the number of `topicPartitions` that we're going to
* consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If
* `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up
* the read tasks of the skewed partitions to multiple Spark tasks.
* The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more
* depending on rounding errors or Kafka partitions that didn't receive any new data.
*/
def getRanges(
fromOffsets: PartitionOffsetMap,
untilOffsets: PartitionOffsetMap,
executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = {
val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this check here before? What if there are new topic partitions? Are we missing those, because they may not exist in fromOffsets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromOffsets here will contain the initial offsets of new partitions. See the how fromOffsets is set with startOffsets + newPartitionInitialOffsets.


val offsetRanges = partitionsToRead.toSeq.map { tp =>
KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None)
}.filter(_.size > 0)

// If minPartitions not set or there are enough partitions to satisfy minPartitions
if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
// Assign preferred executor locations to each range such that the same topic-partition is
// preferentially read from the same executor and the KafkaConsumer can be reused.
offsetRanges.map { range =>
range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations))
}
} else {

// Splits offset ranges with relatively large amount of data to smaller ones.
val totalSize = offsetRanges.map(_.size).sum
val idealRangeSize = totalSize.toDouble / minPartitions.get

offsetRanges.flatMap { range =>
// Split the current range into subranges as close to the ideal range size
val numSplitsInRange = math.round(range.size.toDouble / idealRangeSize).toInt

(0 until numSplitsInRange).map { i =>
val splitStart = range.fromOffset + range.size * (i.toDouble / numSplitsInRange)
val splitEnd = range.fromOffset + range.size * ((i.toDouble + 1) / numSplitsInRange)
KafkaOffsetRange(
range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None)
}
}
}
}

private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = {
def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b

val numExecutors = executorLocations.length
if (numExecutors > 0) {
// This allows cached KafkaConsumers in the executors to be re-used to read the same
// partition in every batch.
Some(executorLocations(floorMod(tp.hashCode, numExecutors)))
} else None
}
}

private[kafka010] object KafkaOffsetRangeCalculator {

def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = {
val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .orNull instead of .orElse(null). Why don't you actually do:

options.get("minPartitions").map(_.toInt)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it returns java Optional and not scala Option.

new KafkaOffsetRangeCalculator(optionalValue)
}
}

private[kafka010] case class KafkaOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]) {
lazy val size: Long = untilOffset - fromOffset
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
throw new IllegalArgumentException("Unknown option")
}

// Validate minPartitions value if present
if (caseInsensitiveParams.contains(MIN_PARTITIONS_OPTION_KEY)) {
val p = caseInsensitiveParams(MIN_PARTITIONS_OPTION_KEY).toInt
if (p <= 0) throw new IllegalArgumentException("minPartitions must be positive")
}

// Validate user-specified Kafka options

if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
Expand Down Expand Up @@ -455,6 +461,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
private val MIN_PARTITIONS_OPTION_KEY = "minpartitions"

val TOPIC_OPTION_KEY = "topic"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.kafka.common.TopicPartition

package object kafka010 { // scalastyle:ignore
// ^^ scalastyle:ignore is for ignoring warnings about digits in package name
type PartitionOffsetMap = Map[TopicPartition, Long]
}
Loading