Skip to content

Commit 629f38e

Browse files
committed
[SPARK-20887][CORE] support alternative keys in ConfigBuilder
## What changes were proposed in this pull request? `ConfigBuilder` builds `ConfigEntry` which can only read value with one key, if we wanna change the config name but still keep the old one, it's hard to do. This PR introduce `ConfigBuilder.withAlternative`, to support reading config value with alternative keys. And also rename `spark.scheduler.listenerbus.eventqueue.size` to `spark.scheduler.listenerbus.eventqueue.capacity` with this feature, according to #14269 (comment) ## How was this patch tested? a new test Author: Wenchen Fan <[email protected]> Closes #18110 from cloud-fan/config.
1 parent b6f2017 commit 629f38e

File tree

8 files changed

+92
-52
lines changed

8 files changed

+92
-52
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,8 @@ private[spark] object SparkConf extends Logging {
592592
*
593593
* The alternates are used in the order defined in this map. If deprecated configs are
594594
* present in the user's configuration, a warning is logged.
595+
*
596+
* TODO: consolidate it with `ConfigBuilder.withAlternative`.
595597
*/
596598
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
597599
"spark.executor.userClassPathFirst" -> Seq(

core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ private[spark] class TypedConfigBuilder[T](
126126

127127
/** Creates a [[ConfigEntry]] that does not have a default value. */
128128
def createOptional: OptionalConfigEntry[T] = {
129-
val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc,
130-
parent._public)
129+
val entry = new OptionalConfigEntry[T](parent.key, parent._alternatives, converter,
130+
stringConverter, parent._doc, parent._public)
131131
parent._onCreate.foreach(_(entry))
132132
entry
133133
}
@@ -140,17 +140,17 @@ private[spark] class TypedConfigBuilder[T](
140140
createWithDefaultString(default.asInstanceOf[String])
141141
} else {
142142
val transformedDefault = converter(stringConverter(default))
143-
val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
144-
stringConverter, parent._doc, parent._public)
143+
val entry = new ConfigEntryWithDefault[T](parent.key, parent._alternatives,
144+
transformedDefault, converter, stringConverter, parent._doc, parent._public)
145145
parent._onCreate.foreach(_(entry))
146146
entry
147147
}
148148
}
149149

150150
/** Creates a [[ConfigEntry]] with a function to determine the default value */
151151
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
152-
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, defaultFunc, converter,
153-
stringConverter, parent._doc, parent._public)
152+
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._alternatives, defaultFunc,
153+
converter, stringConverter, parent._doc, parent._public)
154154
parent._onCreate.foreach(_ (entry))
155155
entry
156156
}
@@ -160,8 +160,8 @@ private[spark] class TypedConfigBuilder[T](
160160
* [[String]] and must be a valid value for the entry.
161161
*/
162162
def createWithDefaultString(default: String): ConfigEntry[T] = {
163-
val entry = new ConfigEntryWithDefaultString[T](parent.key, default, converter, stringConverter,
164-
parent._doc, parent._public)
163+
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._alternatives, default,
164+
converter, stringConverter, parent._doc, parent._public)
165165
parent._onCreate.foreach(_(entry))
166166
entry
167167
}
@@ -180,6 +180,7 @@ private[spark] case class ConfigBuilder(key: String) {
180180
private[config] var _public = true
181181
private[config] var _doc = ""
182182
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
183+
private[config] var _alternatives = List.empty[String]
183184

184185
def internal(): ConfigBuilder = {
185186
_public = false
@@ -200,6 +201,11 @@ private[spark] case class ConfigBuilder(key: String) {
200201
this
201202
}
202203

204+
def withAlternative(key: String): ConfigBuilder = {
205+
_alternatives = _alternatives :+ key
206+
this
207+
}
208+
203209
def intConf: TypedConfigBuilder[Int] = {
204210
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
205211
}
@@ -229,7 +235,7 @@ private[spark] case class ConfigBuilder(key: String) {
229235
}
230236

231237
def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
232-
new FallbackConfigEntry(key, _doc, _public, fallback)
238+
new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
233239
}
234240

235241
def regexConf: TypedConfigBuilder[Regex] = {

core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ package org.apache.spark.internal.config
4141
*/
4242
private[spark] abstract class ConfigEntry[T] (
4343
val key: String,
44+
val alternatives: List[String],
4445
val valueConverter: String => T,
4546
val stringConverter: T => String,
4647
val doc: String,
@@ -52,70 +53,75 @@ private[spark] abstract class ConfigEntry[T] (
5253

5354
def defaultValueString: String
5455

56+
protected def readString(reader: ConfigReader): Option[String] = {
57+
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
58+
}
59+
5560
def readFrom(reader: ConfigReader): T
5661

5762
def defaultValue: Option[T] = None
5863

5964
override def toString: String = {
6065
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
6166
}
62-
6367
}
6468

6569
private class ConfigEntryWithDefault[T] (
6670
key: String,
71+
alternatives: List[String],
6772
_defaultValue: T,
6873
valueConverter: String => T,
6974
stringConverter: T => String,
7075
doc: String,
7176
isPublic: Boolean)
72-
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
77+
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
7378

7479
override def defaultValue: Option[T] = Some(_defaultValue)
7580

7681
override def defaultValueString: String = stringConverter(_defaultValue)
7782

7883
def readFrom(reader: ConfigReader): T = {
79-
reader.get(key).map(valueConverter).getOrElse(_defaultValue)
84+
readString(reader).map(valueConverter).getOrElse(_defaultValue)
8085
}
8186
}
8287

8388
private class ConfigEntryWithDefaultFunction[T] (
8489
key: String,
90+
alternatives: List[String],
8591
_defaultFunction: () => T,
8692
valueConverter: String => T,
8793
stringConverter: T => String,
8894
doc: String,
8995
isPublic: Boolean)
90-
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
96+
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
9197

9298
override def defaultValue: Option[T] = Some(_defaultFunction())
9399

94100
override def defaultValueString: String = stringConverter(_defaultFunction())
95101

96102
def readFrom(reader: ConfigReader): T = {
97-
reader.get(key).map(valueConverter).getOrElse(_defaultFunction())
103+
readString(reader).map(valueConverter).getOrElse(_defaultFunction())
98104
}
99105
}
100106

101107
private class ConfigEntryWithDefaultString[T] (
102108
key: String,
109+
alternatives: List[String],
103110
_defaultValue: String,
104111
valueConverter: String => T,
105112
stringConverter: T => String,
106113
doc: String,
107114
isPublic: Boolean)
108-
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
115+
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
109116

110117
override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
111118

112119
override def defaultValueString: String = _defaultValue
113120

114121
def readFrom(reader: ConfigReader): T = {
115-
val value = reader.get(key).getOrElse(reader.substitute(_defaultValue))
122+
val value = readString(reader).getOrElse(reader.substitute(_defaultValue))
116123
valueConverter(value)
117124
}
118-
119125
}
120126

121127

@@ -124,37 +130,39 @@ private class ConfigEntryWithDefaultString[T] (
124130
*/
125131
private[spark] class OptionalConfigEntry[T](
126132
key: String,
133+
alternatives: List[String],
127134
val rawValueConverter: String => T,
128135
val rawStringConverter: T => String,
129136
doc: String,
130137
isPublic: Boolean)
131-
extends ConfigEntry[Option[T]](key, s => Some(rawValueConverter(s)),
138+
extends ConfigEntry[Option[T]](key, alternatives,
139+
s => Some(rawValueConverter(s)),
132140
v => v.map(rawStringConverter).orNull, doc, isPublic) {
133141

134142
override def defaultValueString: String = "<undefined>"
135143

136144
override def readFrom(reader: ConfigReader): Option[T] = {
137-
reader.get(key).map(rawValueConverter)
145+
readString(reader).map(rawValueConverter)
138146
}
139-
140147
}
141148

142149
/**
143150
* A config entry whose default value is defined by another config entry.
144151
*/
145152
private class FallbackConfigEntry[T] (
146153
key: String,
154+
alternatives: List[String],
147155
doc: String,
148156
isPublic: Boolean,
149157
private[config] val fallback: ConfigEntry[T])
150-
extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
158+
extends ConfigEntry[T](key, alternatives,
159+
fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
151160

152161
override def defaultValueString: String = s"<value of ${fallback.key}>"
153162

154163
override def readFrom(reader: ConfigReader): T = {
155-
reader.get(key).map(valueConverter).getOrElse(fallback.readFrom(reader))
164+
readString(reader).map(valueConverter).getOrElse(fallback.readFrom(reader))
156165
}
157-
158166
}
159167

160168
private[spark] object ConfigEntry {

core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,28 +47,16 @@ private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvi
4747
}
4848

4949
/**
50-
* A config provider that only reads Spark config keys, and considers default values for known
51-
* configs when fetching configuration values.
50+
* A config provider that only reads Spark config keys.
5251
*/
5352
private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {
5453

55-
import ConfigEntry._
56-
5754
override def get(key: String): Option[String] = {
5855
if (key.startsWith("spark.")) {
59-
Option(conf.get(key)).orElse(defaultValueString(key))
56+
Option(conf.get(key))
6057
} else {
6158
None
6259
}
6360
}
6461

65-
private def defaultValueString(key: String): Option[String] = {
66-
findEntry(key) match {
67-
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
68-
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
69-
case e: FallbackConfigEntry[_] => get(e.fallback.key)
70-
case _ => None
71-
}
72-
}
73-
7462
}

core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
9292
require(!usedRefs.contains(ref), s"Circular reference in $input: $ref")
9393

9494
val replacement = bindings.get(prefix)
95-
.flatMap(_.get(name))
95+
.flatMap(getOrDefault(_, name))
9696
.map { v => substitute(v, usedRefs + ref) }
9797
.getOrElse(m.matched)
9898
Regex.quoteReplacement(replacement)
@@ -102,4 +102,20 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
102102
}
103103
}
104104

105+
/**
106+
* Gets the value of a config from the given `ConfigProvider`. If no value is found for this
107+
* config, and the `ConfigEntry` defines this config has default value, return the default value.
108+
*/
109+
private def getOrDefault(conf: ConfigProvider, key: String): Option[String] = {
110+
conf.get(key).orElse {
111+
ConfigEntry.findEntry(key) match {
112+
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
113+
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
114+
case e: ConfigEntryWithDefaultFunction[_] => Option(e.defaultValueString)
115+
case e: FallbackConfigEntry[_] => getOrDefault(conf, e.fallback.key)
116+
case _ => None
117+
}
118+
}
119+
}
120+
105121
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,11 @@ package object config {
151151
.createOptional
152152
// End blacklist confs
153153

154-
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
155-
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
154+
private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
155+
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
156+
.withAlternative("spark.scheduler.listenerbus.eventqueue.size")
156157
.intConf
158+
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
157159
.createWithDefault(10000)
158160

159161
// This property sets the root namespace for metrics reporting

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2222

2323
import scala.util.DynamicVariable
2424

25-
import org.apache.spark.{SparkContext, SparkException}
25+
import org.apache.spark.SparkContext
2626
import org.apache.spark.internal.config._
2727
import org.apache.spark.util.Utils
2828

@@ -34,23 +34,14 @@ import org.apache.spark.util.Utils
3434
* is stopped when `stop()` is called, and it will drop further events after stopping.
3535
*/
3636
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
37-
3837
self =>
3938

4039
import LiveListenerBus._
4140

4241
// Cap the capacity of the event queue so we get an explicit error (rather than
4342
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
44-
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
45-
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
46-
47-
private def validateAndGetQueueSize(): Int = {
48-
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
49-
if (queueSize <= 0) {
50-
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
51-
}
52-
queueSize
53-
}
43+
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
44+
sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
5445

5546
// Indicate if `start()` is called
5647
private val started = new AtomicBoolean(false)

core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,4 +261,31 @@ class ConfigEntrySuite extends SparkFunSuite {
261261
data = 2
262262
assert(conf.get(iConf) === 2)
263263
}
264+
265+
test("conf entry: alternative keys") {
266+
val conf = new SparkConf()
267+
val iConf = ConfigBuilder(testKey("a"))
268+
.withAlternative(testKey("b"))
269+
.withAlternative(testKey("c"))
270+
.intConf.createWithDefault(0)
271+
272+
// no key is set, return default value.
273+
assert(conf.get(iConf) === 0)
274+
275+
// the primary key is set, the alternative keys are not set, return the value of primary key.
276+
conf.set(testKey("a"), "1")
277+
assert(conf.get(iConf) === 1)
278+
279+
// the primary key and alternative keys are all set, return the value of primary key.
280+
conf.set(testKey("b"), "2")
281+
conf.set(testKey("c"), "3")
282+
assert(conf.get(iConf) === 1)
283+
284+
// the primary key is not set, (some of) the alternative keys are set, return the value of the
285+
// first alternative key that is set.
286+
conf.remove(testKey("a"))
287+
assert(conf.get(iConf) === 2)
288+
conf.remove(testKey("b"))
289+
assert(conf.get(iConf) === 3)
290+
}
264291
}

0 commit comments

Comments
 (0)