From 82740d00ff084f0c50f96c9ef1e4acf5dd26a5ee Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Thu, 19 Sep 2019 16:42:48 +0800 Subject: [PATCH 1/5] fix zombie tasks after stage abort --- .../spark/scheduler/TaskResultGetter.scala | 4 +++ .../scheduler/TaskResultGetterSuite.scala | 34 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 9b7f901c55e0..bbf21e0b803b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -64,6 +64,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { case directResult: DirectTaskResult[_] => if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { + scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled( + "Tasks result size has exceeded maxResultSize")) return } // deserialize "value" without holding any lock so that it won't block other threads. @@ -75,6 +77,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul if (!taskSetManager.canFetchMoreResults(size)) { // dropped by executor if size is larger than maxResultSize sparkEnv.blockManager.master.removeBlock(blockId) + scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled( + "Tasks result size has exceeded maxResultSize")) return } logDebug("Fetching indirect task result for TID %s".format(tid)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index ae464352da44..a8f975609134 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.TaskState.TaskState import org.apache.spark.TestUtils.JavaSourceFromString import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.storage.TaskResultBlockId @@ -78,6 +79,16 @@ private class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: Task } } +private class DummyTaskSchedulerImpl(sc: SparkContext) + extends TaskSchedulerImpl(sc, 1, true) { + override def handleFailedTask( + taskSetManager: TaskSetManager, + tid: Long, + taskState: TaskState, + reason: TaskFailedReason): Unit = { + // do nothing + } +} /** * A [[TaskResultGetter]] that stores the [[DirectTaskResult]]s it receives from executors @@ -130,6 +141,29 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local "Expect result to be removed from the block manager.") } + test("handling total size of results larger than maxResultSize") { + sc = new SparkContext("local", "test", conf) + val scheduler = new DummyTaskSchedulerImpl(sc) + val spyScheduler = spy(scheduler) + val resultGetter = new TaskResultGetter(sc.env, spyScheduler) + spyScheduler.taskResultGetter = resultGetter + val myTsm = new TaskSetManager(spyScheduler, FakeTask.createTaskSet(2), 1) { + // always returns false + override def canFetchMoreResults(size: Long): Boolean = false + } + val indirectTaskResult = IndirectTaskResult(TaskResultBlockId(0), 0) + val directTaskResult = new DirectTaskResult(ByteBuffer.allocate(0), Nil, Array()) + val ser = sc.env.closureSerializer.newInstance() + val serializedIndirect = ser.serialize(indirectTaskResult) + val serializedDirect = ser.serialize(directTaskResult) + resultGetter.enqueueSuccessfulTask(myTsm, 0, serializedDirect) + resultGetter.enqueueSuccessfulTask(myTsm, 1, serializedIndirect) + verify(spyScheduler, times(1)).handleFailedTask( + myTsm, 0, TaskState.KILLED, TaskKilled("Tasks result size has exceeded maxResultSize")) + verify(spyScheduler, times(1)).handleFailedTask( + myTsm, 1, TaskState.KILLED, TaskKilled("Tasks result size has exceeded maxResultSize")) + } + test("task retried if result missing from block manager") { // Set the maximum number of task failures to > 0, so that the task set isn't aborted // after the result is missing. From d1e744eca601d75770011b08d39cc09fa8bcab89 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Thu, 19 Sep 2019 20:14:35 +0800 Subject: [PATCH 2/5] fix tests --- .../org/apache/spark/scheduler/TaskResultGetterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index a8f975609134..187bc2ae83e6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -146,7 +146,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local val scheduler = new DummyTaskSchedulerImpl(sc) val spyScheduler = spy(scheduler) val resultGetter = new TaskResultGetter(sc.env, spyScheduler) - spyScheduler.taskResultGetter = resultGetter + scheduler.taskResultGetter = resultGetter val myTsm = new TaskSetManager(spyScheduler, FakeTask.createTaskSet(2), 1) { // always returns false override def canFetchMoreResults(size: Long): Boolean = false From b9dc92b8500c0c9d2bcce9d45c739c53f54fb209 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Fri, 20 Sep 2019 11:19:26 +0800 Subject: [PATCH 3/5] sleep 1s --- .../scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 187bc2ae83e6..e54159325cf4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -158,6 +158,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local val serializedDirect = ser.serialize(directTaskResult) resultGetter.enqueueSuccessfulTask(myTsm, 0, serializedDirect) resultGetter.enqueueSuccessfulTask(myTsm, 1, serializedIndirect) + Thread.sleep(1000L) verify(spyScheduler, times(1)).handleFailedTask( myTsm, 0, TaskState.KILLED, TaskKilled("Tasks result size has exceeded maxResultSize")) verify(spyScheduler, times(1)).handleFailedTask( From fe3c674c54ea6203056d24d16c72f4f889b1978e Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Sat, 21 Sep 2019 20:03:19 +0800 Subject: [PATCH 4/5] refine tests --- .../spark/scheduler/TaskResultGetterSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index e54159325cf4..0d9f2b2df53d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -158,11 +158,12 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local val serializedDirect = ser.serialize(directTaskResult) resultGetter.enqueueSuccessfulTask(myTsm, 0, serializedDirect) resultGetter.enqueueSuccessfulTask(myTsm, 1, serializedIndirect) - Thread.sleep(1000L) - verify(spyScheduler, times(1)).handleFailedTask( - myTsm, 0, TaskState.KILLED, TaskKilled("Tasks result size has exceeded maxResultSize")) - verify(spyScheduler, times(1)).handleFailedTask( - myTsm, 1, TaskState.KILLED, TaskKilled("Tasks result size has exceeded maxResultSize")) + eventually(timeout(1.second)) { + verify(spyScheduler, times(1)).handleFailedTask( + myTsm, 0, TaskState.KILLED, TaskKilled("Tasks result size has exceeded maxResultSize")) + verify(spyScheduler, times(1)).handleFailedTask( + myTsm, 1, TaskState.KILLED, TaskKilled("Tasks result size has exceeded maxResultSize")) + } } test("task retried if result missing from block manager") { From aa4134810f777db452de729bc952793ea0d31b34 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Mon, 23 Sep 2019 16:15:52 +0800 Subject: [PATCH 5/5] Update TaskResultGetter.scala add comments --- .../scala/org/apache/spark/scheduler/TaskResultGetter.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index bbf21e0b803b..670742c65602 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -64,6 +64,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { case directResult: DirectTaskResult[_] => if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { + // kill the task so that it will not become zombie task scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled( "Tasks result size has exceeded maxResultSize")) return @@ -77,6 +78,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul if (!taskSetManager.canFetchMoreResults(size)) { // dropped by executor if size is larger than maxResultSize sparkEnv.blockManager.master.removeBlock(blockId) + // kill the task so that it will not become zombie task scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled( "Tasks result size has exceeded maxResultSize")) return