Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are ful…
…ly processed before checking recorded values

This patch ensures accessing recorded values in listener is always after letting listeners fully process all events. To ensure this, this patch adds new class to hide these values and access with methods which will ensure above condition. Without this guard, two threads are running concurrently - 1) listeners process thread 2) test main thread - and race condition would occur.

That's why we also see very odd thing, error message saying condition is met but test failed:
```
- Barrier task failures from the same stage attempt don't trigger multiple stage retries *** FAILED ***
  ArrayBuffer(0) did not equal List(0) (DAGSchedulerSuite.scala:2656)
```
which means verification failed, and condition is met just before constructing error message.

The guard is properly placed in many spots, but missed in some places. This patch enforces that it can't be missed.

UT fails intermittently and this patch will address the flakyness.

No

Modified UT.

Also made the flaky tests artificially failing via applying 50ms of sleep on each onXXX method.

![Screen Shot 2019-09-07 at 7 44 15 AM](https://user-images.githubusercontent.com/1317309/64465178-1747ad00-d146-11e9-92f6-f4ed4a1f4b08.png)

I found 3 methods being failed. (They've marked as X. Just ignore ! as they failed on waiting listener in given timeout and these tests don't deal with these recorded values - it uses other timeout value 1000ms than 10000ms for this listener so affected via side-effect.)

When I applied same in this patch all tests marked as X passed.

Closes #25706 from HeartSaVioR/SPARK-26989.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
HeartSaVioR committed Sep 15, 2019
commit ed5005558d10c19e997be04166639fa9fe45c264
109 changes: 56 additions & 53 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,32 +162,67 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
val sparkListener = new SparkListener() {
val submittedStageInfos = new HashSet[StageInfo]
val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]
val endedTasks = new HashSet[Long]

/**
* Listeners which records some information to verify in UTs. Getter-kind methods in this class
* ensures the value is returned after ensuring there's no event to process, as well as the
* value is immutable: prevent showing odd result by race condition.
*/
class EventInfoRecordingListener extends SparkListener {
private val _submittedStageInfos = new HashSet[StageInfo]
private val _successfulStages = new HashSet[Int]
private val _failedStages = new ArrayBuffer[Int]
private val _stageByOrderOfExecution = new ArrayBuffer[Int]
private val _endedTasks = new HashSet[Long]

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
submittedStageInfos += stageSubmitted.stageInfo
_submittedStageInfos += stageSubmitted.stageInfo
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageInfo = stageCompleted.stageInfo
stageByOrderOfExecution += stageInfo.stageId
_stageByOrderOfExecution += stageInfo.stageId
if (stageInfo.failureReason.isEmpty) {
successfulStages += stageInfo.stageId
_successfulStages += stageInfo.stageId
} else {
failedStages += stageInfo.stageId
_failedStages += stageInfo.stageId
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
endedTasks += taskEnd.taskInfo.taskId
_endedTasks += taskEnd.taskInfo.taskId
}

def submittedStageInfos: Set[StageInfo] = {
waitForListeners()
_submittedStageInfos.toSet
}

def successfulStages: Set[Int] = {
waitForListeners()
_successfulStages.toSet
}

def failedStages: List[Int] = {
waitForListeners()
_failedStages.toList
}

def stageByOrderOfExecution: List[Int] = {
waitForListeners()
_stageByOrderOfExecution.toList
}

def endedTasks: Set[Long] = {
waitForListeners()
_endedTasks.toSet
}

private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
}

var sparkListener: EventInfoRecordingListener = null

var mapOutputTracker: MapOutputTrackerMaster = null
var broadcastManager: BroadcastManager = null
var securityMgr: SecurityManager = null
Expand Down Expand Up @@ -236,10 +271,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

private def init(testConf: SparkConf): Unit = {
sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf)
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
sparkListener.endedTasks.clear()
sparkListener = new EventInfoRecordingListener
failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
Expand Down Expand Up @@ -361,11 +393,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}

test("[SPARK-3353] parent stage should have lower stage id") {
sparkListener.stageByOrderOfExecution.clear()
sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.stageByOrderOfExecution.length === 2)
assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution
assert(stageByOrderOfExecution.length === 2)
assert(stageByOrderOfExecution(0) < stageByOrderOfExecution(1))
}

/**
Expand Down Expand Up @@ -606,19 +637,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
submit(unserializableRdd, Array(0))
assert(failure.getMessage.startsWith(
"Job aborted due to stage failure: Task not serializable:"))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assert(sparkListener.failedStages === Seq(0))
assertDataStructuresEmpty()
}

test("trivial job failure") {
submit(new MyRDD(sc, 1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted due to stage failure: some failure")
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assert(sparkListener.failedStages === Seq(0))
assertDataStructuresEmpty()
}

Expand All @@ -627,9 +654,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled ")
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
assert(sparkListener.failedStages === Seq(0))
assertDataStructuresEmpty()
}

Expand Down Expand Up @@ -683,7 +708,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.isEmpty)
assert(sparkListener.successfulStages.contains(0))
}
Expand Down Expand Up @@ -1068,7 +1092,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(1))

// The second ResultTask fails, with a fetch failure for the output from the second mapper.
Expand All @@ -1077,8 +1100,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
null))
// The SparkListener should not receive redundant failure events.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.size == 1)
assert(sparkListener.failedStages.size === 1)
}

test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") {
Expand Down Expand Up @@ -1183,7 +1205,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}

// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)

complete(taskSets(0), Seq(
Expand All @@ -1200,12 +1221,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(1))

// Trigger resubmission of the failed map stage.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

// Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
assert(countSubmittedMapStageAttempts() === 2)
Expand All @@ -1222,7 +1241,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// shouldn't effect anything -- our calling it just makes *SURE* it gets called between the
// desired event and our check.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 2)

}
Expand All @@ -1247,7 +1265,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}

// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)

// Complete the map stage.
Expand All @@ -1256,7 +1273,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
(Success, makeMapStatus("hostB", 2))))

// The reduce stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedReduceStageAttempts() === 1)

// The first result task fails, with a fetch failure for the output from the first mapper.
Expand All @@ -1271,7 +1287,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

// Because the map stage finished, another attempt for the reduce stage should have been
// submitted, resulting in 2 total attempts for each the map and the reduce stage.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 2)
assert(countSubmittedReduceStageAttempts() === 2)

Expand Down Expand Up @@ -1301,20 +1316,17 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
runEvent(makeCompletionEvent(
taskSets(0).tasks(1), Success, 42,
Seq.empty, createFakeTaskInfoWithId(1)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// verify stage exists
assert(scheduler.stageIdToStage.contains(0))
assert(sparkListener.endedTasks.size == 2)

assert(sparkListener.endedTasks.size === 2)
// finish other 2 tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(2), Success, 42,
Seq.empty, createFakeTaskInfoWithId(2)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty, createFakeTaskInfoWithId(3)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 4)
assert(sparkListener.endedTasks.size === 4)

// verify the stage is done
assert(!scheduler.stageIdToStage.contains(0))
Expand All @@ -1324,14 +1336,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty, createFakeTaskInfoWithId(5)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 5)

// make sure non successful tasks also send out event
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), UnknownReason, 42,
Seq.empty, createFakeTaskInfoWithId(6)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 6)
}

Expand Down Expand Up @@ -1405,7 +1415,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

// Listener bus should get told about the map stage failing, but not the reduce stage
// (since the reduce stage hasn't been started yet).
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.toSet === Set(0))

assertDataStructuresEmpty()
Expand Down Expand Up @@ -1649,7 +1658,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(cancelledStages.toSet === Set(0, 2))

// Make sure the listeners got told about both failed stages.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.successfulStages.isEmpty)
assert(sparkListener.failedStages.toSet === Set(0, 2))

Expand Down Expand Up @@ -2607,7 +2615,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}

// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)

// The first map task fails with TaskKilled.
Expand All @@ -2625,7 +2632,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

// Trigger resubmission of the failed map stage.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

// Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
assert(countSubmittedMapStageAttempts() === 2)
Expand All @@ -2644,7 +2650,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}

// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)

// The first map task fails with TaskKilled.
Expand All @@ -2656,7 +2661,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

// Trigger resubmission of the failed map stage.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

// Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
assert(countSubmittedMapStageAttempts() === 2)
Expand All @@ -2669,7 +2673,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

// The second map task failure doesn't trigger stage retry.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 2)
}

Expand Down