Skip to content

Commit efa3035

Browse files
gaborgsomogyidongjoon-hyun
authored andcommitted
[SPARK-27687][SS] Rename Kafka consumer cache capacity conf and document caching
## What changes were proposed in this pull request? Kafka related Spark parameters has to start with `spark.kafka.` and not with `spark.sql.`. Because of this I've renamed `spark.sql.kafkaConsumerCache.capacity`. Since Kafka consumer caching is not documented I've added this also. ## How was this patch tested? Existing + added unit test. ``` cd docs SKIP_API=1 jekyll build ``` and manual webpage check. Closes apache#24590 from gaborgsomogyi/SPARK-27687. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent d14e2d7 commit efa3035

File tree

4 files changed

+54
-3
lines changed

4 files changed

+54
-3
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,9 @@ private[spark] object SparkConf extends Logging {
714714
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")),
715715
KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq(
716716
AlternateConfig("spark.yarn.access.namenodes", "2.2"),
717-
AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0"))
717+
AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0")),
718+
"spark.kafka.consumer.cache.capacity" -> Seq(
719+
AlternateConfig("spark.sql.kafkaConsumerCache.capacity", "3.0"))
718720
)
719721

720722
/**

docs/structured-streaming-kafka-integration.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,24 @@ The following configurations are optional:
416416
</tr>
417417
</table>
418418

419+
### Consumer Caching
420+
421+
It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor.
422+
Because of this, Spark caches Kafka consumers on executors. The caching key is built up from the following information:
423+
* Topic name
424+
* Topic partition
425+
* Group ID
426+
427+
The size of the cache is limited by <code>spark.kafka.consumer.cache.capacity</code> (default: 64).
428+
If this threshold is reached, it tries to remove the least-used entry that is currently not in use.
429+
If it cannot be removed, then the cache will keep growing. In the worst case, the cache will grow to
430+
the max number of concurrent tasks that can run in the executor (that is, number of tasks slots),
431+
after which it will never reduce.
432+
433+
If a task fails for any reason the new task is executed with a newly created Kafka consumer for safety reasons.
434+
At the same time the cached Kafka consumer which was used in the failed execution will be invalidated. Here it has to
435+
be emphasized it will not be closed if any other task is using it.
436+
419437
## Writing Data to Kafka
420438

421439
Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ package object kafka010 { // scalastyle:ignore
3333
.createWithDefaultString("10m")
3434

3535
private[kafka010] val CONSUMER_CACHE_CAPACITY =
36-
ConfigBuilder("spark.sql.kafkaConsumerCache.capacity")
37-
.doc("The size of consumers cached.")
36+
ConfigBuilder("spark.kafka.consumer.cache.capacity")
37+
.doc("The maximum number of consumers cached. Please note it's a soft limit" +
38+
" (check Structured Streaming Kafka integration guide for further details).")
3839
.intConf
3940
.createWithDefault(64)
4041
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.kafka010
19+
20+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
21+
import org.apache.spark.util.ResetSystemProperties
22+
23+
class KafkaSparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties {
24+
test("deprecated configs") {
25+
val conf = new SparkConf()
26+
27+
conf.set("spark.sql.kafkaConsumerCache.capacity", "32")
28+
assert(conf.get(CONSUMER_CACHE_CAPACITY) === 32)
29+
}
30+
}

0 commit comments

Comments
 (0)