Skip to content

Commit 73618ce

Browse files
committed
[SPARK-26491][CORE][TEST] Use ConfigEntry for hardcoded configs for test categories
1 parent 240817b commit 73618ce

35 files changed

+170
-83
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2727

2828
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
30+
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
3031
import org.apache.spark.metrics.source.Source
3132
import org.apache.spark.scheduler._
3233
import org.apache.spark.storage.BlockManagerMaster
@@ -157,7 +158,7 @@ private[spark] class ExecutorAllocationManager(
157158

158159
// Polling loop interval (ms)
159160
private val intervalMillis: Long = if (Utils.isTesting) {
160-
conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
161+
conf.get(TEST_SCHEDULE_INTERVAL)
161162
} else {
162163
100
163164
}
@@ -899,5 +900,4 @@ private[spark] class ExecutorAllocationManager(
899900

900901
private object ExecutorAllocationManager {
901902
val NOT_SET = Long.MaxValue
902-
val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval"
903903
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
4545
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
4646
import org.apache.spark.internal.Logging
4747
import org.apache.spark.internal.config._
48+
import org.apache.spark.internal.config.Tests._
4849
import org.apache.spark.io.CompressionCodec
4950
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5051
import org.apache.spark.rdd._
@@ -471,7 +472,7 @@ class SparkContext(config: SparkConf) extends Logging {
471472

472473
// Convert java options to env vars as a work around
473474
// since we can't set env vars directly in sbt.
474-
for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
475+
for { (envKey, propKey) <- Seq(("SPARK_TESTING", IS_TESTING.key))
475476
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
476477
executorEnvs(envKey) = value
477478
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import org.apache.spark.internal.Logging
4545
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
4646
import org.apache.spark.internal.config.History._
4747
import org.apache.spark.internal.config.Status._
48+
import org.apache.spark.internal.config.Tests.IS_TESTING
4849
import org.apache.spark.io.CompressionCodec
4950
import org.apache.spark.scheduler._
5051
import org.apache.spark.scheduler.ReplayListenerBus._
@@ -267,7 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
267268
}
268269

269270
// Disable the background thread during tests.
270-
if (!conf.contains("spark.testing")) {
271+
if (!conf.contains(IS_TESTING)) {
271272
// A task that periodically checks for event log updates on disk.
272273
logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
273274
pool.scheduleWithFixedDelay(

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.apache.spark.deploy.ExternalShuffleService
3737
import org.apache.spark.deploy.master.{DriverState, Master}
3838
import org.apache.spark.deploy.worker.ui.WorkerWebUI
3939
import org.apache.spark.internal.{config, Logging}
40+
import org.apache.spark.internal.config.Tests.IS_TESTING
4041
import org.apache.spark.metrics.MetricsSystem
4142
import org.apache.spark.rpc._
4243
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -103,7 +104,7 @@ private[deploy] class Worker(
103104
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
104105
conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)
105106

106-
private val testing: Boolean = sys.props.contains("spark.testing")
107+
private val testing: Boolean = sys.props.contains(IS_TESTING.key)
107108
private var master: Option[RpcEndpointRef] = None
108109

109110
/**

core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import scala.util.Try
2828

2929
import org.apache.spark.{SparkEnv, SparkException}
3030
import org.apache.spark.internal.{config, Logging}
31+
import org.apache.spark.internal.config.Tests.IS_TESTING
3132
import org.apache.spark.util.Utils
3233

3334

@@ -43,7 +44,7 @@ private[spark] case class ProcfsMetrics(
4344
// project.
4445
private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends Logging {
4546
private val procfsStatFile = "stat"
46-
private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
47+
private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains(IS_TESTING.key)
4748
private val pageSize = computePageSize()
4849
private var isAvailable: Boolean = isProcfsAvailable
4950
private val pid = computePid()

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
2323
import org.apache.spark._
2424
import org.apache.spark.annotation.DeveloperApi
2525
import org.apache.spark.internal.Logging
26+
import org.apache.spark.internal.config.Tests.IS_TESTING
2627
import org.apache.spark.scheduler.AccumulableInfo
2728
import org.apache.spark.storage.{BlockId, BlockStatus}
2829
import org.apache.spark.util._
@@ -202,7 +203,7 @@ class TaskMetrics private[spark] () extends Serializable {
202203
}
203204

204205
// Only used for test
205-
private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new LongAccumulator)
206+
private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new LongAccumulator)
206207

207208

208209
import InternalAccumulator._
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal.config
19+
20+
private[spark] object Tests {
21+
val TEST_MEMORY = ConfigBuilder("spark.testing.memory")
22+
.longConf
23+
.createWithDefault(Runtime.getRuntime.maxMemory)
24+
25+
val TEST_SCHEDULE_INTERVAL =
26+
ConfigBuilder("spark.testing.dynamicAllocation.scheduleInterval")
27+
.longConf
28+
.createWithDefault(100)
29+
30+
val IS_TESTING = ConfigBuilder("spark.testing")
31+
.booleanConf
32+
.createOptional
33+
34+
val TEST_USE_COMPRESSED_OOPS = ConfigBuilder("spark.test.useCompressedOops")
35+
.booleanConf
36+
.createOptional
37+
38+
val TEST_NO_STAGE_RETRY = ConfigBuilder("spark.test.noStageRetry")
39+
.booleanConf
40+
.createWithDefault(false)
41+
42+
val TEST_RESERVED_MEMORY = ConfigBuilder("spark.testing.reservedMemory")
43+
.longConf
44+
.createOptional
45+
46+
val TEST_N_HOSTS = ConfigBuilder("spark.testing.nHosts")
47+
.intConf
48+
.createWithDefault(5)
49+
50+
val TEST_N_EXECUTORS_HOST = ConfigBuilder("spark.testing.nExecutorsPerHost")
51+
.intConf
52+
.createWithDefault(4)
53+
54+
val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
55+
.intConf
56+
.createWithDefault(2)
57+
}

core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.memory
1919

2020
import org.apache.spark.SparkConf
21+
import org.apache.spark.internal.config.Tests.TEST_MEMORY
2122
import org.apache.spark.storage.BlockId
2223

2324
/**
@@ -112,7 +113,7 @@ private[spark] object StaticMemoryManager {
112113
* Return the total amount of memory available for the storage region, in bytes.
113114
*/
114115
private def getMaxStorageMemory(conf: SparkConf): Long = {
115-
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
116+
val systemMaxMemory = conf.get(TEST_MEMORY)
116117
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
117118
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
118119
(systemMaxMemory * memoryFraction * safetyFraction).toLong
@@ -122,7 +123,7 @@ private[spark] object StaticMemoryManager {
122123
* Return the total amount of memory available for the execution region, in bytes.
123124
*/
124125
private def getMaxExecutionMemory(conf: SparkConf): Long = {
125-
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
126+
val systemMaxMemory = conf.get(TEST_MEMORY)
126127

127128
if (systemMaxMemory < MIN_MEMORY_BYTES) {
128129
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +

core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.memory
1919

2020
import org.apache.spark.SparkConf
21+
import org.apache.spark.internal.config.Tests._
2122
import org.apache.spark.storage.BlockId
2223

2324
/**
@@ -209,9 +210,9 @@ object UnifiedMemoryManager {
209210
* Return the total amount of memory shared between execution and storage, in bytes.
210211
*/
211212
private def getMaxMemory(conf: SparkConf): Long = {
212-
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
213-
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
214-
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
213+
val systemMemory = conf.get(TEST_MEMORY)
214+
val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
215+
if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
215216
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
216217
if (systemMemory < minSystemMemory) {
217218
throw new IllegalArgumentException(s"System memory $systemMemory must " +

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.broadcast.Broadcast
3838
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
3939
import org.apache.spark.internal.Logging
4040
import org.apache.spark.internal.config
41+
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
4142
import org.apache.spark.network.util.JavaUtils
4243
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
4344
import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
@@ -186,7 +187,7 @@ private[spark] class DAGScheduler(
186187
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
187188

188189
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
189-
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
190+
private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY)
190191

191192
/**
192193
* Whether to unregister all the outputs on the host in condition that we receive a FetchFailure,

0 commit comments

Comments
 (0)