Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Do not reregister BlockManager with Executor is shutting down
  • Loading branch information
sumeetgajjar committed Apr 3, 2021
commit 5b224a7a248dfdb1eede40e76b48b7438bab1cc7
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ private[spark] class Executor(
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
if (response.reregisterBlockManager) {
if (!executorShutdown.get && response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
Expand Down
68 changes: 52 additions & 16 deletions core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,29 +270,36 @@ class ExecutorSuite extends SparkFunSuite
heartbeatZeroAccumulatorUpdateTest(false)
}

private def withMockHeartbeatReceiverRef(executor: Executor)
(func: RpcEndpointRef => Unit): Unit = {
val executorClass = classOf[Executor]
val mockReceiverRef = mock[RpcEndpointRef]
val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
receiverRef.setAccessible(true)
receiverRef.set(executor, mockReceiverRef)

func(mockReceiverRef)
}

private def withHeartbeatExecutor(confs: (String, String)*)
(f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
(f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
val conf = new SparkConf
confs.foreach { case (k, v) => conf.set(k, v) }
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
withExecutor("id", "localhost", SparkEnv.get) { executor =>
val executorClass = classOf[Executor]

// Save all heartbeats sent into an ArrayBuffer for verification
val heartbeats = ArrayBuffer[Heartbeat]()
val mockReceiver = mock[RpcEndpointRef]
when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
.thenAnswer((invocation: InvocationOnMock) => {
val args = invocation.getArguments()
heartbeats += args(0).asInstanceOf[Heartbeat]
HeartbeatResponse(false)
})
val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
receiverRef.setAccessible(true)
receiverRef.set(executor, mockReceiver)
withMockHeartbeatReceiverRef(executor) { mockReceiverRef =>
// Save all heartbeats sent into an ArrayBuffer for verification
val heartbeats = ArrayBuffer[Heartbeat]()
when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any))
.thenAnswer((invocation: InvocationOnMock) => {
val args = invocation.getArguments()
heartbeats += args(0).asInstanceOf[Heartbeat]
HeartbeatResponse(false)
})

f(executor, heartbeats)
f(executor, heartbeats)
}
}
}

Expand Down Expand Up @@ -416,6 +423,35 @@ class ExecutorSuite extends SparkFunSuite
assert(taskMetrics.getMetricValue("JVMHeapMemory") > 0)
}

test("SPARK-34949: do not re-register BlockManager when executor is shutting down") {
val reregisterInvoked = new AtomicBoolean(false)
val mockBlockManager = mock[BlockManager]
when(mockBlockManager.reregister()).thenAnswer { (_: InvocationOnMock) =>
reregisterInvoked.getAndSet(true)
}
val conf = new SparkConf(false).setAppName("test").setMaster("local[2]")
val mockEnv = createMockEnv(conf, new JavaSerializer(conf))
when(mockEnv.blockManager).thenReturn(mockBlockManager)

withExecutor("id", "localhost", mockEnv) { executor =>
withMockHeartbeatReceiverRef(executor) { mockReceiverRef =>
when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any)).thenAnswer {
(_: InvocationOnMock) => HeartbeatResponse(reregisterBlockManager = true)
}
val reportHeartbeat = PrivateMethod[Unit](Symbol("reportHeartBeat"))
executor.invokePrivate(reportHeartbeat())
assert(reregisterInvoked.get(),
"BlockManager.reregister not invoked when reregisterBlockManager was true")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumeetgajjar . Is this correct, BlockManager.reregister not invoked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dongjoon-hyun ,

I am slightly confused here about the question.
if you are pointing to the assertion then yes, I am asserting that when the executor shutdown is not initiated, BlockManager.reregister is invoked successfully by Executor.reportHeartbeat.

However, if you pointing to the assertion clue, then I can see how that can be ambiguous, I can change the message to
"BlockManager.reregister not invoked on Heartbeat response when executor is not shutting down"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming @dongjoon-hyun meant to change the message around - "expected foo when bar but was baz".
As stated currently, it is a bit confusing, and I had to double check the logic as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I meant the message (assertion clue).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clarification guys, have changed the message and updated the PR 😄.


reregisterInvoked.getAndSet(false)
executor.stop()
executor.invokePrivate(reportHeartbeat())
assert(!reregisterInvoked.get(),
"BlockManager.reregister should not be invoked when executor is stopping")
}
}
}

test("SPARK-33587: isFatalError") {
def errorInThreadPool(e: => Throwable): Throwable = {
intercept[Throwable] {
Expand Down