Skip to content

Commit 29ae310

Browse files
sumeetgajjardongjoon-hyun
authored andcommitted
[SPARK-34949][CORE] Prevent BlockManager reregister when Executor is shutting down
### What changes were proposed in this pull request? This PR prevents reregistering BlockManager when a Executor is shutting down. It is achieved by checking `executorShutdown` before calling `env.blockManager.reregister()`. ### Why are the changes needed? This change is required since Spark reports executors as active, even they are removed. I was testing Dynamic Allocation on K8s with about 300 executors. While doing so, when the executors were torn down due to `spark.dynamicAllocation.executorIdleTimeout`, I noticed all the executor pods being removed from K8s, however, under the "Executors" tab in SparkUI, I could see some executors listed as alive. [spark.sparkContext.statusTracker.getExecutorInfos.length](https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala#L105) also returned a value greater than 1. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new test. ## Logs Following are the logs of the executor(Id:303) which re-registers `BlockManager` ``` 21/04/02 21:33:28 INFO CoarseGrainedExecutorBackend: Got assigned task 1076 21/04/02 21:33:28 INFO Executor: Running task 4.0 in stage 3.0 (TID 1076) 21/04/02 21:33:28 INFO MapOutputTrackerWorker: Updating epoch to 302 and clearing cache 21/04/02 21:33:28 INFO TorrentBroadcast: Started reading broadcast variable 3 21/04/02 21:33:28 INFO TransportClientFactory: Successfully created connection to /100.100.195.227:33703 after 76 ms (62 ms spent in bootstraps) 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.4 KB, free 168.0 MB) 21/04/02 21:33:28 INFO TorrentBroadcast: Reading broadcast variable 3 took 168 ms 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 168.0 MB) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTrackerda-lite-test-4-7a57e478947d206d-driver-svc.dex-app-n5ttnbmg.svc:7078) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Got the output locations 21/04/02 21:33:29 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 1 local blocks and 1 remote blocks 21/04/02 21:33:30 INFO TransportClientFactory: Successfully created connection to /100.100.80.103:40971 after 660 ms (528 ms spent in bootstraps) 21/04/02 21:33:30 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1042 ms 21/04/02 21:33:31 INFO Executor: Finished task 4.0 in stage 3.0 (TID 1076). 1276 bytes result sent to driver . . . 21/04/02 21:34:16 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 21/04/02 21:34:16 INFO Executor: Told to re-register on heartbeat 21/04/02 21:34:16 INFO BlockManager: BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) re-registering with master 21/04/02 21:34:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManager: Reporting 0 blocks to the master. 21/04/02 21:34:16 INFO MemoryStore: MemoryStore cleared 21/04/02 21:34:16 INFO BlockManager: BlockManager stopped 21/04/02 21:34:16 INFO FileDataSink: Closing sink with output file = /tmp/safari-events/.des_analysis/safari-events/hdp_spark_monitoring_random-container-037caf27-6c77-433f-820f-03cd9c7d9b6e-spark-8a492407d60b401bbf4309a14ea02ca2_events.tsv 21/04/02 21:34:16 INFO HonestProfilerBasedThreadSnapshotProvider: Stopping agent 21/04/02 21:34:16 INFO HonestProfilerHandler: Stopping honest profiler agent 21/04/02 21:34:17 INFO ShutdownHookManager: Shutdown hook called 21/04/02 21:34:17 INFO ShutdownHookManager: Deleting directory /var/data/spark-d886588c-2a7e-491d-bbcb-4f58b3e31001/spark-4aa337a0-60c0-45da-9562-8c50eaff3cea ``` Closes apache#32043 from sumeetgajjar/SPARK-34949. Authored-by: Sumeet Gajjar <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a9ca197) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
1 parent b2cc74b commit 29ae310

File tree

2 files changed

+52
-16
lines changed

2 files changed

+52
-16
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -995,7 +995,7 @@ private[spark] class Executor(
995995
try {
996996
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
997997
message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
998-
if (response.reregisterBlockManager) {
998+
if (!executorShutdown.get && response.reregisterBlockManager) {
999999
logInfo("Told to re-register on heartbeat")
10001000
env.blockManager.reregister()
10011001
}

core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -270,29 +270,36 @@ class ExecutorSuite extends SparkFunSuite
270270
heartbeatZeroAccumulatorUpdateTest(false)
271271
}
272272

273+
private def withMockHeartbeatReceiverRef(executor: Executor)
274+
(func: RpcEndpointRef => Unit): Unit = {
275+
val executorClass = classOf[Executor]
276+
val mockReceiverRef = mock[RpcEndpointRef]
277+
val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
278+
receiverRef.setAccessible(true)
279+
receiverRef.set(executor, mockReceiverRef)
280+
281+
func(mockReceiverRef)
282+
}
283+
273284
private def withHeartbeatExecutor(confs: (String, String)*)
274285
(f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
275286
val conf = new SparkConf
276287
confs.foreach { case (k, v) => conf.set(k, v) }
277288
val serializer = new JavaSerializer(conf)
278289
val env = createMockEnv(conf, serializer)
279290
withExecutor("id", "localhost", SparkEnv.get) { executor =>
280-
val executorClass = classOf[Executor]
281-
282-
// Save all heartbeats sent into an ArrayBuffer for verification
283-
val heartbeats = ArrayBuffer[Heartbeat]()
284-
val mockReceiver = mock[RpcEndpointRef]
285-
when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
286-
.thenAnswer((invocation: InvocationOnMock) => {
287-
val args = invocation.getArguments()
288-
heartbeats += args(0).asInstanceOf[Heartbeat]
289-
HeartbeatResponse(false)
290-
})
291-
val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
292-
receiverRef.setAccessible(true)
293-
receiverRef.set(executor, mockReceiver)
291+
withMockHeartbeatReceiverRef(executor) { mockReceiverRef =>
292+
// Save all heartbeats sent into an ArrayBuffer for verification
293+
val heartbeats = ArrayBuffer[Heartbeat]()
294+
when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any))
295+
.thenAnswer((invocation: InvocationOnMock) => {
296+
val args = invocation.getArguments()
297+
heartbeats += args(0).asInstanceOf[Heartbeat]
298+
HeartbeatResponse(false)
299+
})
294300

295-
f(executor, heartbeats)
301+
f(executor, heartbeats)
302+
}
296303
}
297304
}
298305

@@ -416,6 +423,35 @@ class ExecutorSuite extends SparkFunSuite
416423
assert(taskMetrics.getMetricValue("JVMHeapMemory") > 0)
417424
}
418425

426+
test("SPARK-34949: do not re-register BlockManager when executor is shutting down") {
427+
val reregisterInvoked = new AtomicBoolean(false)
428+
val mockBlockManager = mock[BlockManager]
429+
when(mockBlockManager.reregister()).thenAnswer { (_: InvocationOnMock) =>
430+
reregisterInvoked.getAndSet(true)
431+
}
432+
val conf = new SparkConf(false).setAppName("test").setMaster("local[2]")
433+
val mockEnv = createMockEnv(conf, new JavaSerializer(conf))
434+
when(mockEnv.blockManager).thenReturn(mockBlockManager)
435+
436+
withExecutor("id", "localhost", mockEnv) { executor =>
437+
withMockHeartbeatReceiverRef(executor) { mockReceiverRef =>
438+
when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any)).thenAnswer {
439+
(_: InvocationOnMock) => HeartbeatResponse(reregisterBlockManager = true)
440+
}
441+
val reportHeartbeat = PrivateMethod[Unit](Symbol("reportHeartBeat"))
442+
executor.invokePrivate(reportHeartbeat())
443+
assert(reregisterInvoked.get(), "BlockManager.reregister should be invoked " +
444+
"on HeartbeatResponse(reregisterBlockManager = true) when executor is not shutting down")
445+
446+
reregisterInvoked.getAndSet(false)
447+
executor.stop()
448+
executor.invokePrivate(reportHeartbeat())
449+
assert(!reregisterInvoked.get(),
450+
"BlockManager.reregister should not be invoked when executor is shutting down")
451+
}
452+
}
453+
}
454+
419455
test("SPARK-33587: isFatalError") {
420456
def errorInThreadPool(e: => Throwable): Throwable = {
421457
intercept[Throwable] {

0 commit comments

Comments
 (0)