Skip to content

Commit 9c9e71e

Browse files
committed
Merge pull request alteryx#241 from pwendell/branch-0.8
Fix race condition in JobLoggerSuite [0.8 branch] I found this when running the tests locally. It's similar to a race condition found when making the 0.8.0 release.
2 parents d0b9fce + 295734f commit 9c9e71e

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import org.apache.spark.rdd.RDD
3131

3232

3333
class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
34+
/** Length of time to wait while draining listener events. */
35+
val WAIT_TIMEOUT_MILLIS = 10000
3436

3537
test("inner method") {
3638
sc = new SparkContext("local", "joblogger")
@@ -91,6 +93,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
9193
sc.addSparkListener(joblogger)
9294
val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
9395
rdd.reduceByKey(_+_).collect()
96+
assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
9497

9598
val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
9699

@@ -119,8 +122,9 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
119122
}
120123
sc.addSparkListener(joblogger)
121124
val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
122-
rdd.reduceByKey(_+_).collect()
123-
125+
rdd.reduceByKey(_+_).collect()
126+
assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
127+
124128
joblogger.onJobStartCount should be (1)
125129
joblogger.onJobEndCount should be (1)
126130
joblogger.onTaskEndCount should be (8)

0 commit comments

Comments
 (0)