Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "reset bytecode change"
This reverts commit 9efd83c.
  • Loading branch information
rxin committed Feb 24, 2016
commit e31017ef2f8b959124a50fa4d7fe21fea49ec446
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ abstract class TaskContext extends Serializable {
* This will be called in all situations - success, failure, or cancellation.
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
})
}

/**
* Adds a listener to be executed on task failure.
Expand All @@ -126,7 +130,11 @@ abstract class TaskContext extends Serializable {
* Adds a listener to be executed on task failure.
* Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times.
*/
def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext
def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext = {
addTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error)
})
}

/**
* The ID of the stage that this task belong to.
Expand Down
12 changes: 0 additions & 12 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,11 @@ private[spark] class TaskContextImpl(
this
}

override def addTaskCompletionListener(f: (TaskContext) => Unit): this.type = {
addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
})
}

override def addTaskFailureListener(listener: TaskFailureListener): this.type = {
onFailureCallbacks += listener
this
}

override def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): this.type = {
addTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error)
})
}

/** Marks the task as completed and triggers the failure listeners. */
private[spark] def markTaskFailed(error: Throwable): Unit = {
val errorMsgs = new ArrayBuffer[String](2)
Expand Down