Skip to content
Next Next commit
Add failing integration test.
  • Loading branch information
JoshRosen committed Sep 1, 2015
commit c6220fd14486e89f6e85642f8c5eeee997338c56
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.{Span, Seconds}

import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, SparkFunSuite, TaskContext}
import org.apache.spark.util.Utils

/**
* Integration tests for the OutputCommitCoordinator.
*
* See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
*/
class OutputCommitCoordinatorIntegrationSuite
extends SparkFunSuite
with LocalSparkContext
with Timeouts {

override def beforeAll(): Unit = {
super.beforeAll()
val conf = new SparkConf()
.set("master", "local[2,4]")
.set("spark.speculation", "true")
.set("spark.hadoop.mapred.output.committer.class",
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
sc = new SparkContext("local[2, 4]", "test", conf)
}

test("exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
failAfter(Span(60, Seconds)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failure mode caused by this bug was an infinite loop, hence this timeout. The fact that the loop does not break after some maximum number of retries is a distinct bug which should be fixed as part of a separate patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any insight on what the other bug is? Is it worth filing another minimal jira for it so we don't lose track of it? (I didn't find anything ... sorry if I just missed it.)

I'm slightly concerned (perhaps just irrationally paranoid) that maybe we'll fix that bug, then regress on this bug, but this unit test won't catch it. this test doesn't have any asserts, it just checks that the job completes and doesn't throw exceptions (not that adding assertions would necessarily help us avoid that scenario either). a minimal jira would at least let us add a note to take another look at this while fixing that issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other bug is due to the fact that DAGScheduler treats failures due to CommitDenied separately from other failures: they don't count towards the typical count of maximum task failures which can trigger a job failure. The correct fix is to add an upper-bound on the number of times that a commit can be denied as a last-ditch safety net to avoid infinite loop behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will follow a followup JIRA.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks Josh!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val tempDir = Utils.createTempDir()
try {
sc.parallelize(1 to 4, 4).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
} finally {
Utils.deleteRecursively(tempDir)
}
}
}
}

private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
override def commitTask(context: TaskAttemptContext): Unit = {
val ctx = TaskContext.get()
if (ctx.attemptNumber < 1) {
throw new java.io.FileNotFoundException("Intentional exception")
}
super.commitTask(context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ import scala.language.postfixOps
* was not in SparkHadoopWriter, the tests would still pass because only one of the
* increments would be captured even though the commit in both tasks was executed
* erroneously.
*
* See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
* not use mocks.
*/
class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

Expand Down