Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import hudson.model.Job;
import hudson.model.Label;
import hudson.model.Node;
import hudson.model.PeriodicWork;
import hudson.model.Queue;
import hudson.model.ResourceList;
import hudson.model.Result;
Expand All @@ -41,12 +42,15 @@
import hudson.slaves.WorkspaceList;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -246,6 +250,75 @@ public void stop(@NonNull Throwable cause) throws Exception {

}

/**
* Looks for executions whose {@link #getStatus} would be neither running nor scheduled, and cancels them.
*/
@Extension public static final class AnomalousStatus extends PeriodicWork {

@Override public long getRecurrencePeriod() {
return Duration.ofMinutes(30).toMillis();
}

@Override public long getInitialDelay() {
// Do not run too soon after startup, in case things are still loading, agents are still reattaching, etc.
return Duration.ofMinutes(15).toMillis();
}

/**
* Tasks considered to be in an anomalous status the last time we ran.
*/
private Set<StepContext> anomalous = Set.of();

@Override protected void doRun() throws Exception {
LOGGER.fine("checking");
Set<StepContext> knownTasks = new HashSet<>();
for (Queue.Item item : Queue.getInstance().getItems()) {
if (item.task instanceof PlaceholderTask) {
LOGGER.fine(() -> "pending " + item);
knownTasks.add(((PlaceholderTask) item.task).context);
}
}
Jenkins j = Jenkins.getInstanceOrNull();
if (j != null) {
for (Computer c : j.getComputers()) {
for (Executor e : c.getExecutors()) {
Queue.Executable exec = e.getCurrentExecutable();
if (exec instanceof PlaceholderTask.PlaceholderExecutable) {
LOGGER.fine(() -> "running " + exec);
knownTasks.add(((PlaceholderTask.PlaceholderExecutable) exec).getParent().context);
}
}
}
}
Set<StepContext> newAnomalous = new HashSet<>();
StepExecution.applyAll(ExecutorStepExecution.class, exec -> {
StepContext ctx = exec.getContext();
if (!knownTasks.contains(ctx)) {
LOGGER.warning(() -> "do not know about " + ctx);
if (anomalous.contains(ctx)) {
try {
ctx.get(TaskListener.class).error("node block still appears to be neither running nor scheduled; cancelling");
} catch (IOException | InterruptedException x) {
LOGGER.log(Level.WARNING, null, x);
}
ctx.onFailure(new FlowInterruptedException(Result.ABORTED, false, new QueueTaskCancelled()));
} else {
newAnomalous.add(ctx);
}
} else {
LOGGER.fine(() -> "know about " + ctx);
}
return null;
}).get();
for (StepContext ctx : newAnomalous) {
ctx.get(TaskListener.class).error("node block appears to be neither running nor scheduled; will cancel if this condition persists");
}
LOGGER.fine(() -> "done checking: " + anomalous + " → " + newAnomalous);
anomalous = newAnomalous;
}

}

public static final class QueueTaskCancelled extends CauseOfInterruption {
@Override public String getShortDescription() {
return Messages.ExecutorStepExecution_queue_task_cancelled();
Expand Down