Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
reset bytecode change
  • Loading branch information
rxin committed Feb 24, 2016
commit 9efd83cc43c74c7742de7e4f3b9dd4e3e365d6fc
12 changes: 2 additions & 10 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ abstract class TaskContext extends Serializable {
* This will be called in all situations - success, failure, or cancellation.
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
})
}
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext

/**
* Adds a listener to be executed on task failure.
Expand All @@ -130,11 +126,7 @@ abstract class TaskContext extends Serializable {
* Adds a listener to be executed on task failure.
* Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times.
*/
def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext = {
addTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error)
})
}
def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext

/**
* The ID of the stage that this task belong to.
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,23 @@ private[spark] class TaskContextImpl(
this
}

override def addTaskCompletionListener(f: (TaskContext) => Unit): this.type = {
addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
})
}

override def addTaskFailureListener(listener: TaskFailureListener): this.type = {
onFailureCallbacks += listener
this
}

override def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): this.type = {
addTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error)
})
}

/** Marks the task as completed and triggers the failure listeners. */
private[spark] def markTaskFailed(error: Throwable): Unit = {
val errorMsgs = new ArrayBuffer[String](2)
Expand Down