Skip to content

Commit 594cb56

Browse files
committed
[SPARK-32100][CORE][TESTS] Add WorkerDecommissionExtendedSuite
### What changes were proposed in this pull request? This PR aims to add `WorkerDecomissionExtendedSuite` for various worker decommission combinations. ### Why are the changes needed? This will improve the test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the Jenkins. Closes apache#28929 from dongjoon-hyun/SPARK-WD-TEST. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 1af19a7 commit 594cb56

File tree

1 file changed

+73
-0
lines changed

1 file changed

+73
-0
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import scala.concurrent.duration._
21+
22+
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
23+
24+
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils}
25+
import org.apache.spark.LocalSparkContext.withSpark
26+
import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED}
27+
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
28+
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
29+
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
30+
31+
/** This test suite aims to test worker decommission with various configurations. */
32+
class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext {
33+
private val conf = new org.apache.spark.SparkConf()
34+
.setAppName(getClass.getName)
35+
.set(SPARK_MASTER, "local-cluster[20,1,512]")
36+
.set(EXECUTOR_MEMORY, "512m")
37+
.set(DYN_ALLOCATION_ENABLED, true)
38+
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
39+
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 20)
40+
.set(WORKER_DECOMMISSION_ENABLED, true)
41+
42+
test("Worker decommission and executor idle timeout") {
43+
sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s"))
44+
withSpark(sc) { sc =>
45+
TestUtils.waitUntilExecutorsUp(sc, 20, 60000)
46+
val rdd1 = sc.parallelize(1 to 10, 2)
47+
val rdd2 = rdd1.map(x => (1, x))
48+
val rdd3 = rdd2.reduceByKey(_ + _)
49+
val rdd4 = rdd3.sortByKey()
50+
assert(rdd4.count() === 1)
51+
eventually(timeout(20.seconds), interval(1.seconds)) {
52+
assert(sc.getExecutorIds().length < 5)
53+
}
54+
}
55+
}
56+
57+
test("Decommission 19 executors from 20 executors in total") {
58+
sc = new SparkContext(conf)
59+
withSpark(sc) { sc =>
60+
TestUtils.waitUntilExecutorsUp(sc, 20, 60000)
61+
val rdd1 = sc.parallelize(1 to 100000, 200)
62+
val rdd2 = rdd1.map(x => (x % 100, x))
63+
val rdd3 = rdd2.reduceByKey(_ + _)
64+
assert(rdd3.count() === 100)
65+
66+
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
67+
sc.getExecutorIds().tail.foreach { id =>
68+
sched.decommissionExecutor(id)
69+
assert(rdd3.sortByKey().collect().length === 100)
70+
}
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)