Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eleven-apples-yell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@browserbasehq/stagehand": patch
---

Add support for aborting / stopping an agent run & continuing an agent run using messages from prior runs
92 changes: 92 additions & 0 deletions packages/core/lib/v3/agent/utils/validateExperimentalFeatures.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import {
ExperimentalNotConfiguredError,
StagehandInvalidArgumentError,
} from "../../types/public/sdkErrors";
import type { AgentConfig, AgentExecuteOptionsBase } from "../../types/public";

export interface AgentValidationOptions {
/** Whether experimental mode is enabled */
isExperimental: boolean;
/** Agent config options (integrations, tools, stream, cua, etc.) */
agentConfig?: Partial<AgentConfig>;
/** Execute options (callbacks, signal, messages, etc.) */
executeOptions?:
| (Partial<AgentExecuteOptionsBase> & { callbacks?: unknown })
| null;
/** Whether this is streaming mode (can be derived from agentConfig.stream) */
isStreaming?: boolean;
}

/**
* Validates agent configuration and experimental feature usage.
*
* This utility consolidates all validation checks for both CUA and non-CUA agent paths:
* - Invalid argument errors for CUA (streaming, abort signal, message continuation are not supported)
* - Experimental feature checks for integrations and tools (both CUA and non-CUA)
* - Experimental feature checks for non-CUA only (callbacks, signal, messages, streaming)
*
* Throws StagehandInvalidArgumentError for invalid/unsupported configurations.
* Throws ExperimentalNotConfiguredError if experimental features are used without experimental mode.
*/
export function validateExperimentalFeatures(
options: AgentValidationOptions,
): void {
const { isExperimental, agentConfig, executeOptions, isStreaming } = options;

// CUA-specific validation: certain features are not available at all
if (agentConfig?.cua) {
const unsupportedFeatures: string[] = [];

if (agentConfig?.stream) {
unsupportedFeatures.push("streaming");
}
if (executeOptions?.signal) {
unsupportedFeatures.push("abort signal");
}
if (executeOptions?.messages) {
unsupportedFeatures.push("message continuation");
}

if (unsupportedFeatures.length > 0) {
throw new StagehandInvalidArgumentError(
`${unsupportedFeatures.join(", ")} ${unsupportedFeatures.length === 1 ? "is" : "are"} not supported with CUA (Computer Use Agent) mode.`,
);
}
}

// Skip experimental checks if already in experimental mode
if (isExperimental) return;

const features: string[] = [];

// Check agent config features (check array length to avoid false positives for empty arrays)
const hasIntegrations =
agentConfig?.integrations && agentConfig.integrations.length > 0;
const hasTools =
agentConfig?.tools && Object.keys(agentConfig.tools).length > 0;
if (hasIntegrations || hasTools) {
features.push("MCP integrations and custom tools");
}

// Check streaming mode (either explicit or derived from config) - only for non-CUA
if (!agentConfig?.cua && (isStreaming || agentConfig?.stream)) {
features.push("streaming");
}

// Check execute options features - only for non-CUA
if (executeOptions && !agentConfig?.cua) {
if (executeOptions.callbacks) {
features.push("callbacks");
}
if (executeOptions.signal) {
features.push("abort signal");
}
if (executeOptions.messages) {
features.push("message continuation");
}
}

if (features.length > 0) {
throw new ExperimentalNotConfiguredError(`Agent ${features.join(", ")}`);
}
}
142 changes: 101 additions & 41 deletions packages/core/lib/v3/handlers/v3AgentHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ import { mapToolResultToActions } from "../agent/utils/actionMapping";
import {
MissingLLMConfigurationError,
StreamingCallbacksInNonStreamingModeError,
AgentAbortError,
} from "../types/public/sdkErrors";

function getErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

export class V3AgentHandler {
private v3: V3;
private logger: (message: LogLine) => void;
Expand Down Expand Up @@ -72,9 +77,11 @@ export class V3AgentHandler {
);
const tools = this.createTools();
const allTools: ToolSet = { ...tools, ...this.mcpTools };
const messages: ModelMessage[] = [
{ role: "user", content: options.instruction },
];

// Use provided messages for continuation, or start fresh with the instruction
const messages: ModelMessage[] = options.messages?.length
? [...options.messages, { role: "user", content: options.instruction }]
: [{ role: "user", content: options.instruction }];

if (!this.llmClient?.getLanguageModel) {
throw new MissingLLMConfigurationError();
Expand Down Expand Up @@ -176,41 +183,52 @@ export class V3AgentHandler {
instructionOrOptions: string | AgentExecuteOptions,
): Promise<AgentResult> {
const startTime = Date.now();
const {
maxSteps,
systemPrompt,
allTools,
messages,
wrappedModel,
initialPageUrl,
} = await this.prepareAgent(instructionOrOptions);

const callbacks = (instructionOrOptions as AgentExecuteOptions).callbacks;

if (callbacks) {
const streamingOnlyCallbacks = [
"onChunk",
"onFinish",
"onError",
"onAbort",
];
const invalidCallbacks = streamingOnlyCallbacks.filter(
(name) => callbacks[name as keyof typeof callbacks] != null,
);
if (invalidCallbacks.length > 0) {
throw new StreamingCallbacksInNonStreamingModeError(invalidCallbacks);
}
}
const signal =
typeof instructionOrOptions === "object"
? instructionOrOptions.signal
: undefined;

const state: AgentState = {
collectedReasoning: [],
actions: [],
finalMessage: "",
completed: false,
currentPageUrl: initialPageUrl,
currentPageUrl: "",
};

let messages: ModelMessage[] = [];

try {
const {
options,
maxSteps,
systemPrompt,
allTools,
messages: preparedMessages,
wrappedModel,
initialPageUrl,
} = await this.prepareAgent(instructionOrOptions);

messages = preparedMessages;
state.currentPageUrl = initialPageUrl;

const callbacks = (instructionOrOptions as AgentExecuteOptions).callbacks;

if (callbacks) {
const streamingOnlyCallbacks = [
"onChunk",
"onFinish",
"onError",
"onAbort",
];
const invalidCallbacks = streamingOnlyCallbacks.filter(
(name) => callbacks[name as keyof typeof callbacks] != null,
);
if (invalidCallbacks.length > 0) {
throw new StreamingCallbacksInNonStreamingModeError(invalidCallbacks);
}
}

const result = await this.llmClient.generateText({
model: wrappedModel,
system: systemPrompt,
Expand All @@ -221,21 +239,41 @@ export class V3AgentHandler {
toolChoice: "auto",
prepareStep: callbacks?.prepareStep,
onStepFinish: this.createStepHandler(state, callbacks?.onStepFinish),
abortSignal: options.signal,
});

return this.consolidateMetricsAndResult(startTime, state, result);
return this.consolidateMetricsAndResult(
startTime,
state,
messages,
result,
);
} catch (error) {
const errorMessage = error?.message ?? String(error);
// Re-throw validation errors that should propagate to the caller
if (error instanceof StreamingCallbacksInNonStreamingModeError) {
throw error;
}

// Re-throw abort errors wrapped in AgentAbortError for consistent error typing
if (signal?.aborted) {
const reason = signal.reason ? String(signal.reason) : "aborted";
throw new AgentAbortError(reason);
}

const errorMessage = getErrorMessage(error);
this.logger({
category: "agent",
message: `Error executing agent task: ${errorMessage}`,
level: 0,
});

// For non-abort errors, return a failure result instead of throwing
return {
success: false,
actions: state.actions,
message: `Failed to execute task: ${errorMessage}`,
completed: false,
messages,
};
}
}
Expand All @@ -244,6 +282,7 @@ export class V3AgentHandler {
instructionOrOptions: string | AgentStreamExecuteOptions,
): Promise<AgentStreamResult> {
const {
options,
maxSteps,
systemPrompt,
allTools,
Expand Down Expand Up @@ -303,17 +342,25 @@ export class V3AgentHandler {
if (callbacks?.onFinish) {
callbacks.onFinish(event);
}
try {
const result = this.consolidateMetricsAndResult(
startTime,
state,
event,
);
resolveResult(result);
} catch (error) {
handleError(error);
const result = this.consolidateMetricsAndResult(
startTime,
state,
messages,
event,
);
resolveResult(result);
},
onAbort: (event) => {
if (callbacks?.onAbort) {
callbacks.onAbort(event);
}
// Reject the result promise with AgentAbortError when stream is aborted
const reason = options.signal?.reason
? String(options.signal.reason)
: "Stream was aborted";
rejectResult(new AgentAbortError(reason));
},
abortSignal: options.signal,
});

const agentStreamResult = streamResult as AgentStreamResult;
Expand All @@ -324,7 +371,12 @@ export class V3AgentHandler {
private consolidateMetricsAndResult(
startTime: number,
state: AgentState,
result: { text?: string; usage?: LanguageModelUsage },
inputMessages: ModelMessage[],
result: {
text?: string;
usage?: LanguageModelUsage;
response?: { messages?: ModelMessage[] };
},
): AgentResult {
if (!state.finalMessage) {
const allReasoning = state.collectedReasoning.join(" ").trim();
Expand All @@ -344,6 +396,13 @@ export class V3AgentHandler {
);
}

// Combine input messages with response messages for full conversation history
const responseMessages = result.response?.messages || [];
const fullMessages: ModelMessage[] = [
...inputMessages,
...responseMessages,
];

return {
success: state.completed,
message: state.finalMessage || "Task execution completed",
Expand All @@ -358,6 +417,7 @@ export class V3AgentHandler {
inference_time_ms: inferenceTimeMs,
}
: undefined,
messages: fullMessages,
};
}

Expand Down
Loading