Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2805,9 +2805,9 @@ public void addTask(RemoteTask remoteTask, SpoolingOutputBuffers outputBuffers,
public SpoolingOutputStats.Snapshot taskFinished(TaskId taskId)
{
RemoteTask remoteTask = tasks.get(taskId);
checkState(runningTasks.remove(taskId), "task %s already marked as finished", taskId);
checkArgument(remoteTask != null, "task not found: %s", taskId);
SpoolingOutputStats.Snapshot outputStats = remoteTask.retrieveAndDropSpoolingOutputStats();
runningTasks.remove(taskId);
tasks.values().forEach(RemoteTask::abort);
finished = true;
// task descriptor has been created
Expand All @@ -2819,7 +2819,7 @@ public SpoolingOutputStats.Snapshot taskFinished(TaskId taskId)

public void taskFailed(TaskId taskId)
{
runningTasks.remove(taskId);
checkState(runningTasks.remove(taskId), "task %s already marked as finished", taskId);
failureObserved = true;
remainingAttempts--;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,14 @@ synchronized void updateTaskInfo(TaskInfo newTaskInfo)
newTaskInfo = newTaskInfo.withEstimatedMemory(estimatedMemory.get());
}

boolean missingSpoolingOutputStats = false;
if (newTaskInfo.taskStatus().getState().isDone()) {
boolean wasSet = spoolingOutputStats.compareAndSet(null, newTaskInfo.outputBuffers().getSpoolingOutputStats().orElse(null));
if (retryPolicy == TASK && wasSet && spoolingOutputStats.get() == null) {
log.debug("Task %s was updated to null spoolingOutputStats. Future calls to retrieveAndDropSpoolingOutputStats will fail.", taskId);
if (newTaskInfo.taskStatus().getState() == TaskState.FINISHED && retryPolicy == TASK && wasSet && spoolingOutputStats.get() == null) {
missingSpoolingOutputStats = true;
if (log.isDebugEnabled()) {
log.debug("Task %s was updated to null spoolingOutputStats. Future calls to retrieveAndDropSpoolingOutputStats will fail; taskInfo=%s", taskId, taskInfoCodec.toJson(newTaskInfo));
}
}
newTaskInfo = newTaskInfo.pruneSpoolingOutputStats();
}
Expand All @@ -294,7 +298,10 @@ synchronized void updateTaskInfo(TaskInfo newTaskInfo)

if (updated && newValue.taskStatus().getState().isDone()) {
taskStatusFetcher.updateTaskStatus(newTaskInfo.taskStatus());
finalTaskInfo.compareAndSet(Optional.empty(), Optional.of(newValue));
boolean finalTaskInfoUpdated = finalTaskInfo.compareAndSet(Optional.empty(), Optional.of(newValue));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we should just refuse to update finalTaskInfo here if missingSpoolingOutputStats is true. Updating it will trigger the listener registered here:

task.addFinalTaskInfoListener(taskInfo -> eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.taskStatus())));

which will eventually trigger this line:
SpoolingOutputStats.Snapshot outputStats = remoteTask.retrieveAndDropSpoolingOutputStats();

and that's forbidden when the spoolingOutputStats is missing.

Storing a null stats here guarantees that the null check will fail in retrieveAndDropSpoolingOutputStats().

if (missingSpoolingOutputStats && finalTaskInfoUpdated) {
log.debug("Updated finalTaskInfo for task %s to one with missing spoolingOutputStats", taskId);
}
stop();
}
}
Expand Down