Skip to content

Commit 21b7d23

Browse files
skynetigorkibanamachineelasticmachine
authored
[One Workflow] refactor: Split WorkflowExecutionRuntimeManager into workflow and step concerns (elastic#237904)
## Summary Closes: elastic/security-team#14137 ## 🔄 **CHANGELOG** ### ✨ **New Features** #### **Step Execution Runtime Extraction** - **Created `StepExecutionRuntime` class** - Extracted step-specific execution logic from `WorkflowExecutionRuntimeManager` - **Added `StepExecutionRuntimeFactory`** - Factory pattern for creating isolated step execution contexts - **Enhanced scope management** - Added guard conditions for proper scope entry/exit validation #### **Improved Workflow Architecture** - **Separated concerns** - Clear separation between workflow-level and step-level execution management - **Better error handling** - Enhanced error propagation with proper step failure tracking ### 🛠 **Breaking Changes** #### **Interface Updates** - **`NodeImplementation` interface** - Now accepts `StepExecutionRuntime` instead of `WorkflowContextManager` - **Node factory pattern** - Updated to work with new step execution runtime pattern - **Workflow execution loop** - Refactored to use factory pattern for step runtime creation ### 🔧 **Refactoring** #### **WorkflowExecutionRuntimeManager Cleanup** - **Removed step-specific methods**: - `getCurrentStepExecutionId()` - `getCurrentStepResult()` / `setCurrentStepResult()` - `getCurrentStepState()` / `setCurrentStepState()` - `startStep()` / `finishStep()` / `failStep()` / `setWaitStep()` - **Simplified navigation** - Navigation methods now use internal `nextNodeId` tracking - **Enhanced scope management** - Added comprehensive documentation and validation guards #### **Step Implementation Updates** - **All step implementations** updated to use `StepExecutionRuntime` instead of `WorkflowContextManager` - **Improved abort handling** - Better support for step cancellation via `AbortController` - **Centralized step logging** - Step-specific logging moved to `StepExecutionRuntime` ### 🧰 **Utilities** #### **New Helper Functions** - **`stringifyStackFrames()`** - Utility for debugging workflow execution paths ### 📋 **Technical Improvements** - **Reduced coupling** - Clear separation between workflow and step execution concerns - **Enhanced maintainability** - Factory pattern improves code organization and testability - **Better error isolation** - Step failures are now properly contained within step execution context - **Improved debugging** - Enhanced logging and tracing capabilities for workflow execution analysis --- **Migration Impact**: This is an internal refactoring that improves code architecture without changing public APIs. No external changes required for workflow definitions or execution. ### Checklist Check the PR satisfies following conditions. Reviewers should verify this PR satisfies this list as well. - [ ] Any text added follows [EUI's writing guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses sentence case text and includes [i18n support](https://github.com/elastic/kibana/blob/main/src/platform/packages/shared/kbn-i18n/README.md) - [ ] [Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html) was added for features that require explanation or tutorials - [ ] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [ ] If a plugin configuration key changed, check if it needs to be allowlisted in the cloud and added to the [docker list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker) - [ ] This was checked for breaking HTTP API changes, and any breaking changes have been approved by the breaking-change committee. The `release_note:breaking` label should be applied in these situations. - [ ] [Flaky Test Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was used on any tests changed - [ ] The PR description includes the appropriate Release Notes section, and the correct `release_note:*` label is applied per the [guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process) - [ ] Review the [backport guidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing) and apply applicable `backport:*` labels. ### Identify risks Does this PR introduce any risks? For example, consider risks like hard to test bugs, performance regression, potential of data loss. Describe the risk, its severity, and mitigation for each identified risk. Invite stakeholders and evaluate how to proceed before merging. - [ ] [See some risk examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) - [ ] ... --------- Co-authored-by: kibanamachine <[email protected]> Co-authored-by: Elastic Machine <[email protected]>
1 parent 7c0a23e commit 21b7d23

File tree

65 files changed

+2093
-1647
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+2093
-1647
lines changed

src/platform/packages/shared/kbn-workflows/graph/build_execution_graph/build_execution_graph.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,7 @@ function createFallback(
595595
const enterTryBlockNodeId = `enterTryBlock_${stepId}`;
596596
const exitTryBlockNodeId = `exitTryBlock_${stepId}`;
597597
const enterNormalPathNodeId = `enterNormalPath_${stepId}`;
598+
const enterFallbackPathNodeId = `enterFallbackPath_${stepId}`;
598599

599600
const enterTryBlockNode: EnterTryBlockNode = {
600601
id: enterTryBlockNodeId,
@@ -603,6 +604,7 @@ function createFallback(
603604
stepType: 'fallback',
604605
type: 'enter-try-block',
605606
enterNormalPathNodeId,
607+
enterFallbackPathNodeId,
606608
};
607609
graph.setNode(enterTryBlockNodeId, enterTryBlockNode);
608610
const exitTryBlockNode: ExitTryBlockNode = {

src/platform/packages/shared/kbn-workflows/graph/build_execution_graph/tests/on_failure_graph.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ describe('on_failure graph', () => {
255255
stepType: 'fallback',
256256
type: 'enter-try-block',
257257
enterNormalPathNodeId: 'enterNormalPath_testRetryConnectorStep',
258+
enterFallbackPathNodeId: 'enterFallbackPath_testRetryConnectorStep',
258259
});
259260
});
260261

src/platform/packages/shared/kbn-workflows/graph/types/nodes/on_failure_nodes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export const EnterTryBlockNodeSchema = GraphNodeSchema.extend({
4444
id: z.string(),
4545
type: z.literal('enter-try-block'),
4646
enterNormalPathNodeId: z.string(),
47+
enterFallbackPathNodeId: z.string(),
4748
exitNodeId: z.string(),
4849
});
4950
export type EnterTryBlockNode = z.infer<typeof EnterTryBlockNodeSchema>;

src/platform/plugins/shared/workflows_execution_engine/server/plugin.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import type {
4848
StartWorkflowExecutionParams,
4949
} from './workflow_task_manager/types';
5050
import { WorkflowTaskManager } from './workflow_task_manager/workflow_task_manager';
51+
import { StepExecutionRuntimeFactory } from './workflow_context_manager/step_execution_runtime_factory';
5152

5253
export class WorkflowsExecutionEnginePlugin
5354
implements Plugin<WorkflowsExecutionEnginePluginSetup, WorkflowsExecutionEnginePluginStart>
@@ -88,6 +89,7 @@ export class WorkflowsExecutionEnginePlugin
8889

8990
const {
9091
workflowRuntime,
92+
stepExecutionRuntimeFactory,
9193
workflowExecutionState,
9294
workflowLogger,
9395
nodesFactory,
@@ -111,6 +113,7 @@ export class WorkflowsExecutionEnginePlugin
111113

112114
await workflowExecutionLoop({
113115
workflowRuntime,
116+
stepExecutionRuntimeFactory,
114117
workflowExecutionState,
115118
workflowExecutionRepository,
116119
workflowLogger,
@@ -152,6 +155,7 @@ export class WorkflowsExecutionEnginePlugin
152155

153156
const {
154157
workflowRuntime,
158+
stepExecutionRuntimeFactory,
155159
workflowExecutionState,
156160
workflowLogger,
157161
nodesFactory,
@@ -175,6 +179,7 @@ export class WorkflowsExecutionEnginePlugin
175179

176180
await workflowExecutionLoop({
177181
workflowRuntime,
182+
stepExecutionRuntimeFactory,
178183
workflowExecutionState,
179184
workflowExecutionRepository,
180185
workflowLogger,
@@ -241,6 +246,7 @@ export class WorkflowsExecutionEnginePlugin
241246

242247
const {
243248
workflowRuntime,
249+
stepExecutionRuntimeFactory,
244250
workflowExecutionState,
245251
workflowLogger,
246252
nodesFactory,
@@ -264,6 +270,7 @@ export class WorkflowsExecutionEnginePlugin
264270
await workflowRuntime.start();
265271
await workflowExecutionLoop({
266272
workflowRuntime,
273+
stepExecutionRuntimeFactory,
267274
workflowExecutionState,
268275
workflowExecutionRepository,
269276
workflowLogger,
@@ -487,19 +494,29 @@ async function createContainer(
487494
allowedHosts: config.http.allowedHosts,
488495
});
489496

497+
const stepExecutionRuntimeFactory = new StepExecutionRuntimeFactory({
498+
workflowExecutionGraph,
499+
workflowExecutionState,
500+
workflowLogger,
501+
esClient,
502+
fakeRequest,
503+
coreStart,
504+
});
505+
490506
const nodesFactory = new NodesFactory(
491507
connectorExecutor,
492508
workflowRuntime,
493-
workflowExecutionState,
494509
workflowLogger,
495510
workflowTaskManager,
496511
urlValidator,
497-
workflowExecutionGraph
512+
workflowExecutionGraph,
513+
stepExecutionRuntimeFactory
498514
);
499515

500516
return {
501517
workflowExecutionGraph,
502518
workflowRuntime,
519+
stepExecutionRuntimeFactory,
503520
workflowExecutionState,
504521
connectorExecutor,
505522
workflowLogger,

src/platform/plugins/shared/workflows_execution_engine/server/step/atomic_step/atomic_step_impl.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import type { AtomicGraphNode } from '@kbn/workflows/graph';
1010
import type { NodeImplementation } from '../node_implementation';
1111
import { ConnectorStepImpl } from '../connector_step';
12-
import type { WorkflowContextManager } from '../../workflow_context_manager/workflow_context_manager';
12+
import type { StepExecutionRuntime } from '../../workflow_context_manager/step_execution_runtime';
1313
import type { ConnectorExecutor } from '../../connector_executor';
1414
import type { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager';
1515
import type { IWorkflowEventLogger } from '../../workflow_event_logger/workflow_event_logger';
@@ -33,7 +33,7 @@ import type { IWorkflowEventLogger } from '../../workflow_event_logger/workflow_
3333
export class AtomicStepImpl implements NodeImplementation {
3434
constructor(
3535
private node: AtomicGraphNode,
36-
private contextManager: WorkflowContextManager,
36+
private stepExecutionRuntime: StepExecutionRuntime,
3737
private connectorExecutor: ConnectorExecutor,
3838
private workflowState: WorkflowExecutionRuntimeManager,
3939
private workflowLogger: IWorkflowEventLogger
@@ -45,7 +45,7 @@ export class AtomicStepImpl implements NodeImplementation {
4545
// for now it only calls ConnectorStepImpl
4646
await new ConnectorStepImpl(
4747
this.node.configuration,
48-
this.contextManager,
48+
this.stepExecutionRuntime,
4949
this.connectorExecutor,
5050
this.workflowState,
5151
this.workflowLogger

src/platform/plugins/shared/workflows_execution_engine/server/step/connector_step.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
*/
99

1010
import type { ConnectorExecutor } from '../connector_executor';
11-
import type { WorkflowContextManager } from '../workflow_context_manager/workflow_context_manager';
1211
import type { WorkflowExecutionRuntimeManager } from '../workflow_context_manager/workflow_execution_runtime_manager';
1312
import type { IWorkflowEventLogger } from '../workflow_event_logger/workflow_event_logger';
1413
import type { RunStepResult, BaseStep } from './node_implementation';
1514
import { BaseAtomicNodeImplementation } from './node_implementation';
15+
import type { StepExecutionRuntime } from '../workflow_context_manager/step_execution_runtime';
1616

1717
// Extend BaseStep for connector-specific properties
1818
export interface ConnectorStep extends BaseStep {
@@ -23,17 +23,17 @@ export interface ConnectorStep extends BaseStep {
2323
export class ConnectorStepImpl extends BaseAtomicNodeImplementation<ConnectorStep> {
2424
constructor(
2525
step: ConnectorStep,
26-
contextManager: WorkflowContextManager,
26+
stepExecutionRuntime: StepExecutionRuntime,
2727
connectorExecutor: ConnectorExecutor,
2828
workflowState: WorkflowExecutionRuntimeManager,
2929
private workflowLogger: IWorkflowEventLogger
3030
) {
31-
super(step, contextManager, connectorExecutor, workflowState);
31+
super(step, stepExecutionRuntime, connectorExecutor, workflowState);
3232
}
3333

3434
public getInput() {
3535
// Get current context for templating
36-
const context = this.contextManager.getContext();
36+
const context = this.stepExecutionRuntime.contextManager.getContext();
3737
// Render inputs from 'with'
3838
return Object.entries(this.step.with ?? {}).reduce((acc: Record<string, any>, [key, value]) => {
3939
if (typeof value === 'string') {

src/platform/plugins/shared/workflows_execution_engine/server/step/elasticsearch_action_step.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
*/
99

1010
import { buildRequestFromConnector } from '@kbn/workflows';
11-
import type { WorkflowContextManager } from '../workflow_context_manager/workflow_context_manager';
11+
import type { StepExecutionRuntime } from '../workflow_context_manager/step_execution_runtime';
1212
import type { WorkflowExecutionRuntimeManager } from '../workflow_context_manager/workflow_execution_runtime_manager';
1313
import type { IWorkflowEventLogger } from '../workflow_event_logger/workflow_event_logger';
1414
import type { RunStepResult, BaseStep } from './node_implementation';
@@ -23,7 +23,7 @@ export interface ElasticsearchActionStep extends BaseStep {
2323
export class ElasticsearchActionStepImpl extends BaseAtomicNodeImplementation<ElasticsearchActionStep> {
2424
constructor(
2525
step: ElasticsearchActionStep,
26-
contextManager: WorkflowContextManager,
26+
contextManager: StepExecutionRuntime,
2727
workflowRuntime: WorkflowExecutionRuntimeManager,
2828
private workflowLogger: IWorkflowEventLogger
2929
) {
@@ -32,7 +32,7 @@ export class ElasticsearchActionStepImpl extends BaseAtomicNodeImplementation<El
3232

3333
public getInput() {
3434
// Get current context for templating
35-
const context = this.contextManager.getContext();
35+
const context = this.stepExecutionRuntime.contextManager.getContext();
3636
// Render inputs from 'with' - support both direct step.with and step.configuration.with
3737
const stepWith = this.step.with || (this.step as any).configuration?.with || {};
3838
return this.renderObjectTemplate(stepWith, context);
@@ -78,7 +78,7 @@ export class ElasticsearchActionStepImpl extends BaseAtomicNodeImplementation<El
7878
});
7979

8080
// Get ES client (user-scoped if available, fallback otherwise)
81-
const esClient = this.contextManager.getEsClientAsUser();
81+
const esClient = this.stepExecutionRuntime.contextManager.getEsClientAsUser();
8282

8383
// Generic approach like Dev Console - just forward the request to ES
8484
const result = await this.executeElasticsearchRequest(esClient, stepType, stepWith);

src/platform/plugins/shared/workflows_execution_engine/server/step/foreach_step/enter_foreach_node_impl.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,27 @@ import type { EnterForeachNode } from '@kbn/workflows/graph';
1111
import type { NodeImplementation } from '../node_implementation';
1212
import type { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager';
1313
import type { IWorkflowEventLogger } from '../../workflow_event_logger/workflow_event_logger';
14-
import type { WorkflowContextManager } from '../../workflow_context_manager/workflow_context_manager';
14+
import type { StepExecutionRuntime } from '../../workflow_context_manager/step_execution_runtime';
1515

1616
export class EnterForeachNodeImpl implements NodeImplementation {
1717
constructor(
1818
private node: EnterForeachNode,
1919
private wfExecutionRuntimeManager: WorkflowExecutionRuntimeManager,
20-
private contextManager: WorkflowContextManager,
20+
private stepExecutionRuntime: StepExecutionRuntime,
2121
private workflowLogger: IWorkflowEventLogger
2222
) {}
2323

2424
public async run(): Promise<void> {
25-
if (!this.wfExecutionRuntimeManager.getCurrentStepState()) {
25+
if (!this.stepExecutionRuntime.getCurrentStepState()) {
2626
await this.enterForeach();
2727
} else {
2828
await this.advanceIteration();
2929
}
3030
}
3131

3232
private async enterForeach(): Promise<void> {
33-
let foreachState = this.wfExecutionRuntimeManager.getCurrentStepState();
34-
await this.wfExecutionRuntimeManager.startStep();
33+
let foreachState = this.stepExecutionRuntime.getCurrentStepState();
34+
await this.stepExecutionRuntime.startStep();
3535
const evaluatedItems = this.getItems();
3636

3737
if (evaluatedItems.length === 0) {
@@ -41,11 +41,11 @@ export class EnterForeachNodeImpl implements NodeImplementation {
4141
workflow: { step_id: this.node.stepId },
4242
}
4343
);
44-
await this.wfExecutionRuntimeManager.setCurrentStepState({
44+
await this.stepExecutionRuntime.setCurrentStepState({
4545
items: [],
4646
total: 0,
4747
});
48-
await this.wfExecutionRuntimeManager.finishStep();
48+
await this.stepExecutionRuntime.finishStep();
4949
this.wfExecutionRuntimeManager.navigateToNode(this.node.exitNodeId);
5050
return;
5151
}
@@ -65,14 +65,14 @@ export class EnterForeachNodeImpl implements NodeImplementation {
6565
total: evaluatedItems.length,
6666
};
6767

68-
await this.wfExecutionRuntimeManager.setCurrentStepState(foreachState);
68+
await this.stepExecutionRuntime.setCurrentStepState(foreachState);
6969
// Enter a new scope for the first iteration
7070
this.wfExecutionRuntimeManager.enterScope(foreachState.index!.toString());
7171
this.wfExecutionRuntimeManager.navigateToNextNode();
7272
}
7373

7474
private async advanceIteration(): Promise<void> {
75-
let foreachState = this.wfExecutionRuntimeManager.getCurrentStepState()!;
75+
let foreachState = this.stepExecutionRuntime.getCurrentStepState()!;
7676
// Update items and index if they have changed
7777
const items = foreachState.items;
7878
const index = foreachState.index + 1;
@@ -85,7 +85,7 @@ export class EnterForeachNodeImpl implements NodeImplementation {
8585
total,
8686
};
8787
// Enter a new scope for the new iteration
88-
await this.wfExecutionRuntimeManager.setCurrentStepState(foreachState);
88+
await this.stepExecutionRuntime.setCurrentStepState(foreachState);
8989
this.wfExecutionRuntimeManager.enterScope(foreachState.index!.toString());
9090
this.wfExecutionRuntimeManager.navigateToNextNode();
9191
}
@@ -100,7 +100,7 @@ export class EnterForeachNodeImpl implements NodeImplementation {
100100
try {
101101
items = JSON.parse(this.node.configuration.foreach);
102102
} catch (error) {
103-
const { value, pathExists } = this.contextManager.readContextPath(
103+
const { value, pathExists } = this.stepExecutionRuntime.contextManager.readContextPath(
104104
this.node.configuration.foreach
105105
);
106106

src/platform/plugins/shared/workflows_execution_engine/server/step/foreach_step/exit_foreach_node_impl.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ import type { ExitForeachNode } from '@kbn/workflows/graph';
1111
import type { NodeImplementation } from '../node_implementation';
1212
import type { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager';
1313
import type { IWorkflowEventLogger } from '../../workflow_event_logger/workflow_event_logger';
14+
import type { StepExecutionRuntime } from '../../workflow_context_manager/step_execution_runtime';
1415

1516
export class ExitForeachNodeImpl implements NodeImplementation {
1617
constructor(
1718
private node: ExitForeachNode,
19+
private stepExecutionRuntime: StepExecutionRuntime,
1820
private wfExecutionRuntimeManager: WorkflowExecutionRuntimeManager,
1921
private workflowLogger: IWorkflowEventLogger
2022
) {}
2123

2224
public async run(): Promise<void> {
23-
// Exit the scope of the current iteration
24-
this.wfExecutionRuntimeManager.exitScope();
25-
const foreachState = this.wfExecutionRuntimeManager.getCurrentStepState();
25+
const foreachState = this.stepExecutionRuntime.getCurrentStepState();
2626

2727
if (!foreachState) {
2828
throw new Error(`Foreach state for step ${this.node.stepId} not found`);
@@ -33,7 +33,7 @@ export class ExitForeachNodeImpl implements NodeImplementation {
3333
return;
3434
}
3535

36-
await this.wfExecutionRuntimeManager.finishStep();
36+
await this.stepExecutionRuntime.finishStep();
3737
this.workflowLogger.logDebug(
3838
`Exiting foreach step ${this.node.stepId} after processing all items.`,
3939
{

0 commit comments

Comments
 (0)