Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,32 @@ class DirectKafkaInputDStream[

private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
protected def maxMessagesPerPartition: Option[Long] = {

protected[streaming] def maxMessagesPerPartition(
offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
val numPartitions = currentOffsets.keys.size

val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0)
.map { limit =>
if (maxRateLimitPerPartition > 0) {
Math.min(maxRateLimitPerPartition, (limit / numPartitions))
} else {
limit / numPartitions

// calculate a per-partition rate limit based on current lag
val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
case Some(rate) =>
val lagPerPartition = offsets.map { case (tp, offset) =>
tp -> Math.max(offset - currentOffsets(tp), 0)
}
val totalLag = lagPerPartition.values.sum

lagPerPartition.map { case (tp, lag) =>
val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
tp -> (if (maxRateLimitPerPartition > 0) {
Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
}
}.getOrElse(maxRateLimitPerPartition)
case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
}

if (effectiveRateLimitPerPartition > 0) {
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
} else {
None
}
Expand Down Expand Up @@ -134,9 +143,12 @@ class DirectKafkaInputDStream[
// limits the maximum number of messages per partition
protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
maxMessagesPerPartition.map { mmp =>
leaderOffsets.map { case (tp, lo) =>
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
val offsets = leaderOffsets.mapValues(lo => lo.offset)

maxMessagesPerPartition(offsets).map { mmp =>
mmp.map { case (tp, messages) =>
val lo = leaderOffsets(tp)
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
}
}.getOrElse(leaderOffsets)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,15 @@ private[kafka] class KafkaTestUtils extends Logging {
}

/** Create a Kafka topic and wait until it is propagated to the whole cluster */
def createTopic(topic: String): Unit = {
AdminUtils.createTopic(zkClient, topic, 1, 1)
def createTopic(topic: String, partitions: Int): Unit = {
AdminUtils.createTopic(zkClient, topic, partitions, 1)
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
(0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) }
}

/** Single-argument version for backwards compatibility */
def createTopic(topic: String): Unit = createTopic(topic, 1)

/** Java-friendly function for sending messages to the Kafka broker */
def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long

private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
kafkaTestUtils.createTopic(topic);
kafkaTestUtils.createTopic(topic, 1);
kafkaTestUtils.sendMessages(topic, data);
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public String call(MessageAndMetadata<String, String> msgAndMd) {

private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
kafkaTestUtils.createTopic(topic);
kafkaTestUtils.createTopic(topic, 1);
kafkaTestUtils.sendMessages(topic, data);
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testKafkaStream() throws InterruptedException {
sent.put("b", 3);
sent.put("c", 10);

kafkaTestUtils.createTopic(topic);
kafkaTestUtils.createTopic(topic, 1);
kafkaTestUtils.sendMessages(topic, sent);

Map<String, String> kafkaParams = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,19 +353,47 @@ class DirectKafkaStreamSuite
ssc.stop()
}

test("maxMessagesPerPartition with backpressure disabled") {
val topic = "maxMessagesPerPartition"
val kafkaStream = getDirectKafkaStream(topic, None)

val input = Map(TopicAndPartition(topic, 0) -> 50L, TopicAndPartition(topic, 1) -> 50L)
assert(kafkaStream.maxMessagesPerPartition(input).get ==
Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
}

test("maxMessagesPerPartition with no lag") {
val topic = "maxMessagesPerPartition"
val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 100))
val kafkaStream = getDirectKafkaStream(topic, rateController)

val input = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
}

test("maxMessagesPerPartition respects max rate") {
val topic = "maxMessagesPerPartition"
val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 1000))
val kafkaStream = getDirectKafkaStream(topic, rateController)

val input = Map(TopicAndPartition(topic, 0) -> 1000L, TopicAndPartition(topic, 1) -> 1000L)
assert(kafkaStream.maxMessagesPerPartition(input).get ==
Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
}

test("using rate controller") {
val topic = "backpressure"
val topicPartition = TopicAndPartition(topic, 0)
kafkaTestUtils.createTopic(topic)
val topicPartitions = Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1))
kafkaTestUtils.createTopic(topic, 2)
val kafkaParams = Map(
"metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"auto.offset.reset" -> "smallest"
)

val batchIntervalMilliseconds = 100
val estimator = new ConstantEstimator(100)
val messageKeys = (1 to 200).map(_.toString)
val messages = messageKeys.map((_, 1)).toMap
val messages = Map("foo" -> 200)
kafkaTestUtils.sendMessages(topic, messages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was sendMessages moved to here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kafka messages don't actually need to be sent three times, just once is sufficent for all tests. When I send "foo" 200 times in one batch, they all go to the same partition. However, when I do this 3 times (for each of 100, 50, 20), the batches of 200 go to a random partition each time. I suspect something in how the test kafka cluster does the partitioning.

I was usually getting 200 on 1 partitions, and 400 on the other 2. I was explicitly changing the test case to "all the messages are on one partition" since I can't control the split deterministically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I haven't done much work with Kafka, it seems like we could maybe explicitly specify the partitioner for producer in the producerConfiguration to be round robin if we wanted to (although that requires some custom code from what I can tell) or rotating the partition key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you're just talking about for testing, or for the main code.
For production use the complication is that how messages are partitioned
into kafka is configurable at the time you're producing the messages, so
configuration of spark partitioner would have to match.

On Thu, Dec 10, 2015 at 2:14 PM, Holden Karau [email protected]
wrote:

In
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
#10089 (comment):

@@ -364,8 +365,8 @@ class DirectKafkaStreamSuite

 val batchIntervalMilliseconds = 100
 val estimator = new ConstantEstimator(100)
  • val messageKeys = (1 to 200).map(_.toString)
  • val messages = messageKeys.map((_, 1)).toMap
  • val messages = Map("foo" -> 200)
  • kafkaTestUtils.sendMessages(topic, messages)

While I haven't done much work with Kafka, it seems like we could maybe
explicitly specify the partitioner for producer in the
producerConfiguration to be round robin if we wanted to (although that
requires some custom code from what I can tell) or rotating the partition
key.


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/10089/files#r47278574.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yah only for testing - this was in response to switching the test to all messages on a single partition (which seemed limiting for testing code which changes us to handling each partitions back pressure instead of a single global).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original code, as I understand it, since the Kafka test setup wasn't cleared/reinitialized between testing rounds, only the first batch of 200 messages, produced in the first of 3 test rounds, was ever consumed. The other testing rounds produced messages that were never used. My changes aside, I think moving the test message generation outside of the individual test rounds makes the most sense.

This failure scenario depended on imbalanced Kafka partitions, would you prefer to see tests on both a balanced and an imbalanced scenario?


val sparkConf = new SparkConf()
// Safe, even with streaming, because we're using the direct API.
Expand All @@ -380,11 +408,11 @@ class DirectKafkaStreamSuite
val kafkaStream = withClue("Error creating direct stream") {
val kc = new KafkaCluster(kafkaParams)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
val m = kc.getEarliestLeaderOffsets(Set(topicPartition))
val m = kc.getEarliestLeaderOffsets(topicPartitions)
.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))

new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, m, messageHandler) {
ssc, kafkaParams, m, messageHandler) {
override protected[streaming] val rateController =
Some(new DirectKafkaRateController(id, estimator))
}
Expand All @@ -405,13 +433,12 @@ class DirectKafkaStreamSuite
ssc.start()

// Try different rate limits.
// Send data to Kafka and wait for arrays of data to appear matching the rate.
// Wait for arrays of data to appear matching the rate.
Seq(100, 50, 20).foreach { rate =>
collectedData.clear() // Empty this buffer on each pass.
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
kafkaTestUtils.sendMessages(topic, messages)
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
Expand All @@ -430,6 +457,25 @@ class DirectKafkaStreamSuite
rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
}.toSeq.sortBy { _._1 }
}

private def getDirectKafkaStream(topic: String, mockRateController: Option[RateController]) = {
val batchIntervalMilliseconds = 100

val sparkConf = new SparkConf()
.setMaster("local[1]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.kafka.maxRatePerPartition", "100")

// Setup the streaming context
ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))

val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, Map[String, String](), earliestOffsets, messageHandler) {
override protected[streaming] val rateController = mockRateController
}
}
}

object DirectKafkaStreamSuite {
Expand Down Expand Up @@ -468,3 +514,9 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long)
processingDelay: Long,
schedulingDelay: Long): Option[Double] = Some(rate)
}

private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit = ()
override def getLatestRate(): Long = rate
}
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry$")
) ++ Seq(
// SPARK-12073: backpressure rate controller consumes events preferentially from lagging partitions
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.KafkaTestUtils.createTopic"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.DirectKafkaInputDStream.maxMessagesPerPartition")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down