Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More conservatively, cancel only if we see the same task in this stat…
…us twice in a row
  • Loading branch information
jglick committed Mar 7, 2023
commit 2edabfda859279ca6ff99ea7f77f4f2a5da6502f
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,25 @@ public void stop(@NonNull Throwable cause) throws Exception {
@Extension public static final class AnomalousStatus extends PeriodicWork {

@Override public long getRecurrencePeriod() {
return Duration.ofHours(1).toMillis();
return Duration.ofMinutes(30).toMillis();
}

@Override public long getInitialDelay() {
// Do not run too soon after startup, in case things are still loading, agents are still reattaching, etc.
return Duration.ofMinutes(15).toMillis();
}

/**
* Tasks considered to be in an anomalous status the last time we ran.
*/
private Set<StepContext> anomalous = Set.of();

@Override protected void doRun() throws Exception {
LOGGER.fine("checking");
Set<StepContext> knownTasks = new HashSet<>();
for (Queue.Item item : Queue.getInstance().getItems()) {
if (item.task instanceof PlaceholderTask) {
LOGGER.fine(() -> "pending " + item);
knownTasks.add(((PlaceholderTask) item.task).context);
}
}
Expand All @@ -272,23 +284,37 @@ public void stop(@NonNull Throwable cause) throws Exception {
for (Executor e : c.getExecutors()) {
Queue.Executable exec = e.getCurrentExecutable();
if (exec instanceof PlaceholderTask.PlaceholderExecutable) {
LOGGER.fine(() -> "running " + exec);
knownTasks.add(((PlaceholderTask.PlaceholderExecutable) exec).getParent().context);
}
}
}
}
Set<StepContext> newAnomalous = new HashSet<>();
StepExecution.applyAll(ExecutorStepExecution.class, exec -> {
StepContext ctx = exec.getContext();
if (!knownTasks.contains(ctx)) {
try {
ctx.get(TaskListener.class).error("node block appears to be neither running nor scheduled; cancelling");
} catch (IOException | InterruptedException x) {
LOGGER.log(Level.WARNING, null, x);
LOGGER.warning(() -> "do not know about " + ctx);
if (anomalous.contains(ctx)) {
try {
ctx.get(TaskListener.class).error("node block still appears to be neither running nor scheduled; cancelling");
} catch (IOException | InterruptedException x) {
LOGGER.log(Level.WARNING, null, x);
}
ctx.onFailure(new FlowInterruptedException(Result.ABORTED, false, new QueueTaskCancelled()));
} else {
newAnomalous.add(ctx);
}
ctx.onFailure(new FlowInterruptedException(Result.ABORTED, false, new QueueTaskCancelled()));
} else {
LOGGER.fine(() -> "know about " + ctx);
}
return null;
});
}).get();
for (StepContext ctx : newAnomalous) {
ctx.get(TaskListener.class).error("node block appears to be neither running nor scheduled; will cancel if this condition persists");
}
LOGGER.fine(() -> "done checking: " + anomalous + " → " + newAnomalous);
anomalous = newAnomalous;
}

}
Expand Down