Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class HeartbeatReceiverSuite
with PrivateMethodTester
with LocalSparkContext {

private val executorId1 = "executor-1"
private val executorId2 = "executor-2"
private val executorId1 = "1"
private val executorId2 = "2"

// Shared state that must be reset before and after each test
private var scheduler: TaskSchedulerImpl = null
Expand Down Expand Up @@ -93,12 +93,12 @@ class HeartbeatReceiverSuite

test("task scheduler is set correctly") {
assert(heartbeatReceiver.scheduler === null)
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
assert(heartbeatReceiver.scheduler !== null)
}

test("normal heartbeat") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
Expand All @@ -116,14 +116,14 @@ class HeartbeatReceiverSuite
}

test("reregister if heartbeat from unregistered executor") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
// Received heartbeat from unknown executor, so we ask it to re-register
triggerHeartbeat(executorId1, executorShouldReregister = true)
assert(getTrackedExecutors.isEmpty)
}

test("reregister if heartbeat from removed executor") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
// Remove the second executor but not the first
Expand All @@ -140,7 +140,7 @@ class HeartbeatReceiverSuite

test("expire dead hosts") {
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
Expand All @@ -149,7 +149,7 @@ class HeartbeatReceiverSuite
heartbeatReceiverClock.advance(executorTimeout / 2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
heartbeatReceiverClock.advance(executorTimeout)
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
// Only the second executor should be expired as a dead host
verify(scheduler).executorLost(Matchers.eq(executorId2), any())
val trackedExecutors = getTrackedExecutors
Expand All @@ -173,11 +173,11 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
Expand All @@ -195,7 +195,7 @@ class HeartbeatReceiverSuite
// Here we use a timeout of O(seconds), but in practice this whole test takes O(10ms).
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
heartbeatReceiverClock.advance(executorTimeout * 2)
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
val killThread = heartbeatReceiver.invokePrivate(_killExecutorThread())
killThread.shutdown() // needed for awaitTermination
killThread.awaitTermination(10L, TimeUnit.SECONDS)
Expand All @@ -213,7 +213,7 @@ class HeartbeatReceiverSuite
executorShouldReregister: Boolean): Unit = {
val metrics = TaskMetrics.empty
val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId))
if (executorShouldReregister) {
assert(response.reregisterBlockManager)
Expand Down