Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1046,9 +1047,12 @@ private void closeTaskClean(final Task task,
}
}

/**
* @throws StreamsException if fetching committed offsets timed out often enough to exceed task timeout
*/
private void transitRestoredTaskToRunning(final Task task,
final long now,
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) throws StreamsException {
try {
task.completeRestoration(offsetResetter);
tasks.addTask(task);
Expand Down Expand Up @@ -1134,8 +1138,22 @@ public Map<TaskId, RuntimeException> collectExceptionsAndFailedTasksFromStateUpd
private void handleRestoredTasksFromStateUpdater(final long now,
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
final Duration timeout = Duration.ZERO;
for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) {
transitRestoredTaskToRunning(task, now, offsetResetter);
// Create a mutable copy to support iterator.remove()
final Set<StreamTask> restoredTasks = new LinkedHashSet<>(stateUpdater.drainRestoredActiveTasks(timeout));
final Iterator<StreamTask> iterator = restoredTasks.iterator();

try {
while (iterator.hasNext()) {
final Task task = iterator.next();
transitRestoredTaskToRunning(task, now, offsetResetter);
iterator.remove(); // Remove successfully transitioned tasks
}
} finally {
// Add back any tasks that we drained but didn't successfully transition
// from the state updater, so that they are closed during shutdown.
for (final Task task : restoredTasks) {
stateUpdater.add(task);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,52 @@ public void shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
verifyNoInteractions(consumer);
}

@Test
public void shouldAddFailedRestoredTasksBackToStateUpdaterOnException() {
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId01Partitions).build();
final StreamTask task3 = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId02Partitions).build();

// Use LinkedHashSet to ensure predictable iteration order
final Set<StreamTask> restoredTasks = new java.util.LinkedHashSet<>();
restoredTasks.add(task1);
restoredTasks.add(task2);
restoredTasks.add(task3);

final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(restoredTasks, tasks);

// task1 completes successfully, task2 throws StreamsException from maybeInitTaskTimeoutOrThrow
// task3 is never processed because task2 throws
final TimeoutException timeoutException = new TimeoutException();
doThrow(timeoutException).when(task2).completeRestoration(noOpResetter);
doThrow(new StreamsException("Task timeout exceeded", task2.id())).when(task2).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException));

assertThrows(StreamsException.class, () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));

// task1 should be successfully transitioned
verify(tasks).addTask(task1);
verify(consumer).resume(task1.inputPartitions());
verify(task1).clearTaskTimeout();

// task2 should be added back to state updater once in the finally block
// (the add in the catch block doesn't execute because maybeInitTaskTimeoutOrThrow throws)
verify(stateUpdater).add(task2);
verify(tasks, never()).addTask(task2);
verify(task2, never()).clearTaskTimeout();

// task3 should also be added back to state updater in the finally block
verify(stateUpdater).add(task3);
verify(tasks, never()).addTask(task3);
verify(task3, never()).clearTaskTimeout();
}

private TaskManager setUpTransitionToRunningOfRestoredTask(final Set<StreamTask> statefulTasks,
final TasksRegistry tasks) {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
Expand Down
Loading