diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index bf3dbaa786a7..4dc10a2e46c3 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -2805,9 +2805,9 @@ public void addTask(RemoteTask remoteTask, SpoolingOutputBuffers outputBuffers, public SpoolingOutputStats.Snapshot taskFinished(TaskId taskId) { RemoteTask remoteTask = tasks.get(taskId); + checkState(runningTasks.remove(taskId), "task %s already marked as finished", taskId); checkArgument(remoteTask != null, "task not found: %s", taskId); SpoolingOutputStats.Snapshot outputStats = remoteTask.retrieveAndDropSpoolingOutputStats(); - runningTasks.remove(taskId); tasks.values().forEach(RemoteTask::abort); finished = true; // task descriptor has been created @@ -2819,7 +2819,7 @@ public SpoolingOutputStats.Snapshot taskFinished(TaskId taskId) public void taskFailed(TaskId taskId) { - runningTasks.remove(taskId); + checkState(runningTasks.remove(taskId), "task %s already marked as finished", taskId); failureObserved = true; remainingAttempts--; } diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java b/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java index ea3b37a4441f..9ad69e4739c8 100644 --- a/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java +++ b/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java @@ -272,10 +272,14 @@ synchronized void updateTaskInfo(TaskInfo newTaskInfo) newTaskInfo = newTaskInfo.withEstimatedMemory(estimatedMemory.get()); } + boolean missingSpoolingOutputStats = false; if (newTaskInfo.taskStatus().getState().isDone()) { boolean wasSet = spoolingOutputStats.compareAndSet(null, newTaskInfo.outputBuffers().getSpoolingOutputStats().orElse(null)); - if (retryPolicy == TASK && wasSet && spoolingOutputStats.get() == null) { - log.debug("Task %s was updated to null spoolingOutputStats. Future calls to retrieveAndDropSpoolingOutputStats will fail.", taskId); + if (newTaskInfo.taskStatus().getState() == TaskState.FINISHED && retryPolicy == TASK && wasSet && spoolingOutputStats.get() == null) { + missingSpoolingOutputStats = true; + if (log.isDebugEnabled()) { + log.debug("Task %s was updated to null spoolingOutputStats. Future calls to retrieveAndDropSpoolingOutputStats will fail; taskInfo=%s", taskId, taskInfoCodec.toJson(newTaskInfo)); + } } newTaskInfo = newTaskInfo.pruneSpoolingOutputStats(); } @@ -294,7 +298,10 @@ synchronized void updateTaskInfo(TaskInfo newTaskInfo) if (updated && newValue.taskStatus().getState().isDone()) { taskStatusFetcher.updateTaskStatus(newTaskInfo.taskStatus()); - finalTaskInfo.compareAndSet(Optional.empty(), Optional.of(newValue)); + boolean finalTaskInfoUpdated = finalTaskInfo.compareAndSet(Optional.empty(), Optional.of(newValue)); + if (missingSpoolingOutputStats && finalTaskInfoUpdated) { + log.debug("Updated finalTaskInfo for task %s to one with missing spoolingOutputStats", taskId); + } stop(); } }