-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23541][SS] Allow Kafka source to read data with greater parallelism than the number of topic-partitions #20698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets | |
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.commons.io.IOUtils | ||
| import org.apache.kafka.common.TopicPartition | ||
|
|
||
| import org.apache.spark.SparkEnv | ||
| import org.apache.spark.internal.Logging | ||
|
|
@@ -64,8 +63,6 @@ private[kafka010] class KafkaMicroBatchReader( | |
| failOnDataLoss: Boolean) | ||
| extends MicroBatchReader with SupportsScanUnsafeRow with Logging { | ||
|
|
||
| type PartitionOffsetMap = Map[TopicPartition, Long] | ||
|
|
||
| private var startPartitionOffsets: PartitionOffsetMap = _ | ||
| private var endPartitionOffsets: PartitionOffsetMap = _ | ||
|
|
||
|
|
@@ -76,6 +73,7 @@ private[kafka010] class KafkaMicroBatchReader( | |
| private val maxOffsetsPerTrigger = | ||
| Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) | ||
|
|
||
| private val rangeCalculator = KafkaOffsetRangeCalculator(options) | ||
| /** | ||
| * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only | ||
| * called in StreamExecutionThread. Otherwise, interrupting a thread while running | ||
|
|
@@ -106,15 +104,15 @@ private[kafka010] class KafkaMicroBatchReader( | |
| override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = { | ||
| // Find the new partitions, and get their earliest offsets | ||
| val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet) | ||
| val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) | ||
| if (newPartitionOffsets.keySet != newPartitions) { | ||
| val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) | ||
| if (newPartitionInitialOffsets.keySet != newPartitions) { | ||
| // We cannot get from offsets for some partitions. It means they got deleted. | ||
| val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet) | ||
| val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) | ||
| reportDataLoss( | ||
| s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed") | ||
| } | ||
| logInfo(s"Partitions added: $newPartitionOffsets") | ||
| newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) => | ||
| logInfo(s"Partitions added: $newPartitionInitialOffsets") | ||
| newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => | ||
| reportDataLoss( | ||
| s"Added partition $p starts from $o instead of 0. Some data may have been missed") | ||
| } | ||
|
|
@@ -125,46 +123,28 @@ private[kafka010] class KafkaMicroBatchReader( | |
| reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") | ||
| } | ||
|
|
||
| // Use the until partitions to calculate offset ranges to ignore partitions that have | ||
| // Use the end partitions to calculate offset ranges to ignore partitions that have | ||
| // been deleted | ||
| val topicPartitions = endPartitionOffsets.keySet.filter { tp => | ||
| // Ignore partitions that we don't know the from offsets. | ||
| newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp) | ||
| newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp) | ||
| }.toSeq | ||
| logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) | ||
|
|
||
| val sortedExecutors = getSortedExecutorList() | ||
| val numExecutors = sortedExecutors.length | ||
| logDebug("Sorted executors: " + sortedExecutors.mkString(", ")) | ||
|
|
||
| // Calculate offset ranges | ||
| val factories = topicPartitions.flatMap { tp => | ||
| val fromOffset = startPartitionOffsets.get(tp).getOrElse { | ||
| newPartitionOffsets.getOrElse( | ||
| tp, { | ||
| // This should not happen since newPartitionOffsets contains all partitions not in | ||
| // fromPartitionOffsets | ||
| throw new IllegalStateException(s"$tp doesn't have a from offset") | ||
| }) | ||
| } | ||
| val untilOffset = endPartitionOffsets(tp) | ||
|
|
||
| if (untilOffset >= fromOffset) { | ||
| // This allows cached KafkaConsumers in the executors to be re-used to read the same | ||
| // partition in every batch. | ||
| val preferredLoc = if (numExecutors > 0) { | ||
| Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors))) | ||
| } else None | ||
| val range = KafkaOffsetRange(tp, fromOffset, untilOffset) | ||
| Some( | ||
| new KafkaMicroBatchDataReaderFactory( | ||
| range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss)) | ||
| } else { | ||
| reportDataLoss( | ||
| s"Partition $tp's offset was changed from " + | ||
| s"$fromOffset to $untilOffset, some data may have been missed") | ||
| None | ||
| } | ||
| val offsetRanges = rangeCalculator.getRanges( | ||
| fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets, | ||
| untilOffsets = endPartitionOffsets, | ||
| executorLocations = getSortedExecutorList()) | ||
|
|
||
| // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions, | ||
| // that is, concurrent tasks will not read the same TopicPartitions. | ||
| val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size | ||
|
|
||
| // Generate factories based on the offset ranges | ||
| val factories = offsetRanges.map { range => | ||
| new KafkaMicroBatchDataReaderFactory( | ||
| range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer) | ||
| } | ||
| factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava | ||
| } | ||
|
|
@@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader( | |
| // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. | ||
| // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever | ||
| // (KAFKA-1894). | ||
| assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) | ||
| require(Thread.currentThread().isInstanceOf[UninterruptibleThread]) | ||
|
|
||
| // SparkSession is required for getting Hadoop configuration for writing to checkpoints | ||
| assert(SparkSession.getActiveSession.nonEmpty) | ||
| require(SparkSession.getActiveSession.nonEmpty) | ||
|
|
||
| val metadataLog = | ||
| new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath) | ||
|
|
@@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader( | |
| } | ||
|
|
||
| /** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */ | ||
| private[kafka010] class KafkaMicroBatchDataReaderFactory( | ||
| range: KafkaOffsetRange, | ||
| preferredLoc: Option[String], | ||
| private[kafka010] case class KafkaMicroBatchDataReaderFactory( | ||
| offsetRange: KafkaOffsetRange, | ||
| executorKafkaParams: ju.Map[String, Object], | ||
| pollTimeoutMs: Long, | ||
| failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] { | ||
| failOnDataLoss: Boolean, | ||
| reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] { | ||
|
|
||
| override def preferredLocations(): Array[String] = preferredLoc.toArray | ||
| override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray | ||
|
|
||
| override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader( | ||
| range, executorKafkaParams, pollTimeoutMs, failOnDataLoss) | ||
| offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer) | ||
| } | ||
|
|
||
| /** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */ | ||
| private[kafka010] class KafkaMicroBatchDataReader( | ||
| private[kafka010] case class KafkaMicroBatchDataReader( | ||
| offsetRange: KafkaOffsetRange, | ||
| executorKafkaParams: ju.Map[String, Object], | ||
| pollTimeoutMs: Long, | ||
| failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging { | ||
| failOnDataLoss: Boolean, | ||
| reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging { | ||
|
|
||
| private val consumer = { | ||
| if (!reuseKafkaConsumer) { | ||
| // If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. As here we | ||
|
||
| // uses `assign`, we don't need to worry about the "group.id" conflicts. | ||
| CachedKafkaConsumer.createUncached( | ||
| offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) | ||
| } else { | ||
| CachedKafkaConsumer.getOrCreate( | ||
| offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) | ||
| } | ||
| } | ||
|
|
||
| private val consumer = CachedKafkaConsumer.getOrCreate( | ||
| offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) | ||
| private val rangeToRead = resolveRange(offsetRange) | ||
| private val converter = new KafkaRecordToUnsafeRowConverter | ||
|
|
||
|
|
@@ -370,8 +361,14 @@ private[kafka010] class KafkaMicroBatchDataReader( | |
|
|
||
| override def close(): Unit = { | ||
| // Indicate that we're no longer using this consumer | ||
|
||
| CachedKafkaConsumer.releaseKafkaConsumer( | ||
| offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) | ||
| if (!reuseKafkaConsumer) { | ||
| // Don't forget to close non-reuse KafkaConsumers. You may take down your cluster! | ||
| consumer.close() | ||
| } else { | ||
| // Indicate that we're no longer using this consumer | ||
| CachedKafkaConsumer.releaseKafkaConsumer( | ||
| offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) | ||
| } | ||
| } | ||
|
|
||
| private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = { | ||
|
|
@@ -392,12 +389,9 @@ private[kafka010] class KafkaMicroBatchDataReader( | |
| } else { | ||
| range.untilOffset | ||
| } | ||
| KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset) | ||
| KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None) | ||
| } else { | ||
| range | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private[kafka010] case class KafkaOffsetRange( | ||
| topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.kafka010 | ||
|
|
||
| import org.apache.kafka.common.TopicPartition | ||
|
|
||
| import org.apache.spark.sql.sources.v2.DataSourceOptions | ||
|
|
||
|
|
||
| /** | ||
| * Class to calculate offset ranges to process based on the the from and until offsets, and | ||
| * the configured `minPartitions`. | ||
| */ | ||
| private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { | ||
|
||
| require(minPartitions >= 0) | ||
|
|
||
| import KafkaOffsetRangeCalculator._ | ||
| /** | ||
| * Calculate the offset ranges that we are going to process this batch. If `numPartitions` | ||
|
||
| * is not set or is set less than or equal the number of `topicPartitions` that we're going to | ||
| * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If | ||
| * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up | ||
| * the read tasks of the skewed partitions to multiple Spark tasks. | ||
| * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more | ||
| * depending on rounding errors or Kafka partitions that didn't receive any new data. | ||
| */ | ||
| def getRanges( | ||
| fromOffsets: PartitionOffsetMap, | ||
| untilOffsets: PartitionOffsetMap, | ||
| executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { | ||
| val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. was this check here before? What if there are new topic partitions? Are we missing those, because they may not exist in fromOffsets?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| val offsetRanges = partitionsToRead.toSeq.map { tp => | ||
| KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) | ||
| } | ||
|
|
||
| // If minPartitions not set or there are enough partitions to satisfy minPartitions | ||
| if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { | ||
|
||
| // Assign preferred executor locations to each range such that the same topic-partition is | ||
| // always read from the same executor and the KafkaConsumer can be reused | ||
|
||
| offsetRanges.map { range => | ||
| range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) | ||
| } | ||
| } else { | ||
|
|
||
| // Splits offset ranges with relatively large amount of data to smaller ones. | ||
| val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum | ||
|
||
| offsetRanges.flatMap { offsetRange => | ||
| val tp = offsetRange.topicPartition | ||
| val size = offsetRange.untilOffset - offsetRange.fromOffset | ||
| // number of partitions to divvy up this topic partition to | ||
| val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt | ||
|
||
| var remaining = size | ||
| var startOffset = offsetRange.fromOffset | ||
| (0 until parts).map { part => | ||
| // Fine to do integer division. Last partition will consume all the round off errors | ||
| val thisPartition = remaining / (parts - part) | ||
| remaining -= thisPartition | ||
| val endOffset = startOffset + thisPartition | ||
| val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, preferredLoc = None) | ||
| startOffset = endOffset | ||
| offsetRange | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { | ||
| def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b | ||
|
|
||
| val numExecutors = executorLocations.length | ||
| if (numExecutors > 0) { | ||
| // This allows cached KafkaConsumers in the executors to be re-used to read the same | ||
| // partition in every batch. | ||
| Some(executorLocations(floorMod(tp.hashCode, numExecutors))) | ||
| } else None | ||
| } | ||
| } | ||
|
|
||
| private[kafka010] object KafkaOffsetRangeCalculator { | ||
|
|
||
| private val DEFAULT_MIN_PARTITIONS = 0 | ||
|
||
|
|
||
| def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = { | ||
| new KafkaOffsetRangeCalculator(options.getInt("minPartitions", DEFAULT_MIN_PARTITIONS)) | ||
| } | ||
| } | ||
|
|
||
| private[kafka010] case class KafkaOffsetRange( | ||
| topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long, preferredLoc: Option[String]) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.sql | ||
|
|
||
| import org.apache.kafka.common.TopicPartition | ||
|
|
||
| package object kafka010 { // scalastyle:ignore | ||
| // ^^ scalastyle:ignore is for ignoring warnings about digits in package name | ||
| type PartitionOffsetMap = Map[TopicPartition, Long] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between assert and require here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not much really. assert throws Assertions and require throws IllegalArgumentException. Just a matter of preference. I can revert this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assertions can be turned off