Skip to content

Commit 016cc54

Browse files
authored
add workflow context and add history event iterator (cadence-workflow#27)
What changed? moved workflow package into internal added workflow context that will activate on process_decision function added a generator for HistoryEvent which will read pages to get full history. It's a util function Signed-off-by: Shijie Sheng <[email protected]>
1 parent 121c36c commit 016cc54

File tree

8 files changed

+271
-15
lines changed

8 files changed

+271
-15
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from cadence.client import Client
2+
from cadence.workflow import WorkflowContext, WorkflowInfo
3+
4+
5+
class Context(WorkflowContext):
6+
7+
def __init__(self, client: Client, info: WorkflowInfo):
8+
self._client = client
9+
self._info = info
10+
11+
def info(self) -> WorkflowInfo:
12+
return self._info
13+
14+
def client(self) -> Client:
15+
return self._client
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
3+
from cadence.api.v1.service_workflow_pb2 import GetWorkflowExecutionHistoryRequest, GetWorkflowExecutionHistoryResponse
4+
from cadence.client import Client
5+
6+
async def iterate_history_events(decision_task: PollForDecisionTaskResponse, client: Client):
7+
PAGE_SIZE = 1000
8+
9+
current_page = decision_task.history.events
10+
next_page_token = decision_task.next_page_token
11+
workflow_execution = decision_task.workflow_execution
12+
13+
while True:
14+
for event in current_page:
15+
yield event
16+
if not next_page_token:
17+
break
18+
response: GetWorkflowExecutionHistoryResponse = await client.workflow_stub.GetWorkflowExecutionHistory(GetWorkflowExecutionHistoryRequest(
19+
domain=client.domain,
20+
workflow_execution=workflow_execution,
21+
next_page_token=next_page_token,
22+
page_size=PAGE_SIZE,
23+
))
24+
current_page = response.history.events
25+
next_page_token = response.next_page_token
Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,21 @@
11
from dataclasses import dataclass
2-
from typing import Callable
32

3+
from cadence._internal.workflow.context import Context
44
from cadence.api.v1.decision_pb2 import Decision
55
from cadence.client import Client
6-
from cadence.data_converter import DataConverter
76
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
7+
from cadence.workflow import WorkflowInfo
88

9-
@dataclass
10-
class WorkflowContext:
11-
domain: str
12-
workflow_id: str
13-
run_id: str
14-
client: Client
15-
workflow_func: Callable
16-
data_converter: DataConverter
179

1810
@dataclass
1911
class DecisionResult:
2012
decisions: list[Decision]
2113

2214
class WorkflowEngine:
23-
def __init__(self, context: WorkflowContext):
24-
self._context = context
15+
def __init__(self, info: WorkflowInfo, client: Client):
16+
self._context = Context(client, info)
2517

2618
# TODO: Implement this
2719
def process_decision(self, decision_task: PollForDecisionTaskResponse) -> DecisionResult:
28-
return DecisionResult(decisions=[])
20+
with self._context._activate():
21+
return DecisionResult(decisions=[])

cadence/client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from cadence.api.v1.service_domain_pb2_grpc import DomainAPIStub
1111
from cadence.api.v1.service_worker_pb2_grpc import WorkerAPIStub
1212
from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel
13+
from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub
1314
from cadence.data_converter import DataConverter, DefaultDataConverter
1415

1516

@@ -42,6 +43,7 @@ def __init__(self, **kwargs: Unpack[ClientOptions]) -> None:
4243
self._channel = _create_channel(self._options)
4344
self._worker_stub = WorkerAPIStub(self._channel)
4445
self._domain_stub = DomainAPIStub(self._channel)
46+
self._workflow_stub = WorkflowAPIStub(self._channel)
4547

4648
@property
4749
def data_converter(self) -> DataConverter:
@@ -63,6 +65,10 @@ def domain_stub(self) -> DomainAPIStub:
6365
def worker_stub(self) -> WorkerAPIStub:
6466
return self._worker_stub
6567

68+
@property
69+
def workflow_stub(self) -> WorkflowAPIStub:
70+
return self._workflow_stub
71+
6672
async def ready(self) -> None:
6773
await self._channel.channel_ready()
6874

@@ -99,4 +105,4 @@ def _create_channel(options: ClientOptions) -> Channel:
99105
if options["credentials"]:
100106
return secure_channel(options["target"], options["credentials"], options["channel_arguments"], options["compression"], interceptors)
101107
else:
102-
return insecure_channel(options["target"], options["channel_arguments"], options["compression"], interceptors)
108+
return insecure_channel(options["target"], options["channel_arguments"], options["compression"], interceptors)

cadence/workflow.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from abc import ABC, abstractmethod
2+
from contextlib import contextmanager
3+
from contextvars import ContextVar
4+
from dataclasses import dataclass
5+
from typing import Iterator
6+
7+
from cadence.client import Client
8+
9+
@dataclass
10+
class WorkflowInfo:
11+
workflow_type: str
12+
workflow_domain: str
13+
workflow_id: str
14+
workflow_run_id: str
15+
16+
class WorkflowContext(ABC):
17+
_var: ContextVar['WorkflowContext'] = ContextVar("workflow")
18+
19+
@abstractmethod
20+
def info(self) -> WorkflowInfo:
21+
...
22+
23+
@abstractmethod
24+
def client(self) -> Client:
25+
...
26+
27+
@contextmanager
28+
def _activate(self) -> Iterator[None]:
29+
token = WorkflowContext._var.set(self)
30+
yield None
31+
WorkflowContext._var.reset(token)
32+
33+
@staticmethod
34+
def is_set() -> bool:
35+
return WorkflowContext._var.get(None) is not None
36+
37+
@staticmethod
38+
def get() -> 'WorkflowContext':
39+
return WorkflowContext._var.get()

tests/cadence/workflow/test_deterministic_event_loop.py renamed to tests/cadence/_internal/workflow/test_deterministic_event_loop.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pytest
22
import asyncio
3-
from cadence.workflow.deterministic_event_loop import DeterministicEventLoop
3+
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
44

55

66
async def coro_append(results: list, i: int):
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
import pytest
2+
from unittest.mock import Mock, AsyncMock
3+
4+
from cadence.client import Client
5+
from cadence.api.v1.common_pb2 import WorkflowExecution
6+
from cadence.api.v1.history_pb2 import HistoryEvent, History
7+
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
8+
from cadence.api.v1.service_workflow_pb2 import GetWorkflowExecutionHistoryResponse
9+
from cadence._internal.workflow.history_event_iterator import iterate_history_events
10+
11+
12+
@pytest.fixture
13+
def mock_client():
14+
"""Create a mock client with workflow_stub."""
15+
client = Mock(spec=Client)
16+
client.workflow_stub = AsyncMock()
17+
client.domain = "test-domain"
18+
return client
19+
20+
21+
@pytest.fixture
22+
def mock_workflow_execution():
23+
"""Create a mock workflow execution."""
24+
return WorkflowExecution(
25+
workflow_id="test-workflow-id",
26+
run_id="test-run-id"
27+
)
28+
29+
30+
def create_history_event(event_id: int) -> HistoryEvent:
31+
return HistoryEvent(event_id=event_id)
32+
33+
34+
async def test_iterate_history_events_single_page_no_next_token(mock_client, mock_workflow_execution):
35+
"""Test iterating over a single page of events with no next page token."""
36+
# Create test events
37+
events = [
38+
create_history_event(1),
39+
create_history_event(2),
40+
create_history_event(3)
41+
]
42+
43+
# Create decision task response with events but no next page token
44+
decision_task = PollForDecisionTaskResponse(
45+
history=History(events=events),
46+
next_page_token=b"", # Empty token means no more pages
47+
workflow_execution=mock_workflow_execution
48+
)
49+
50+
# Iterate and collect events
51+
result_events = [e async for e in iterate_history_events(decision_task, mock_client)]
52+
53+
# Verify all events were returned
54+
assert len(result_events) == 3
55+
assert result_events[0].event_id == 1
56+
assert result_events[1].event_id == 2
57+
assert result_events[2].event_id == 3
58+
59+
# Verify no additional API calls were made
60+
mock_client.workflow_stub.GetWorkflowExecutionHistory.assert_not_called()
61+
62+
63+
async def test_iterate_history_events_empty_events(mock_client, mock_workflow_execution):
64+
"""Test iterating over empty events list."""
65+
# Create decision task response with no events
66+
decision_task = PollForDecisionTaskResponse(
67+
history=History(events=[]),
68+
next_page_token=b"",
69+
workflow_execution=mock_workflow_execution
70+
)
71+
72+
# Iterate and collect events
73+
result_events = [e async for e in iterate_history_events(decision_task, mock_client)]
74+
75+
# Verify no events were returned
76+
assert len(result_events) == 0
77+
78+
# Verify no additional API calls were made
79+
mock_client.workflow_stub.GetWorkflowExecutionHistory.assert_not_called()
80+
81+
async def test_iterate_history_events_multiple_pages(mock_client, mock_workflow_execution):
82+
"""Test iterating over multiple pages of events."""
83+
84+
# Create decision task response with first page and next page token
85+
decision_task = PollForDecisionTaskResponse(
86+
history=History(events=[
87+
create_history_event(1),
88+
create_history_event(2)
89+
]),
90+
next_page_token=b"page2_token",
91+
workflow_execution=mock_workflow_execution
92+
)
93+
94+
# Mock the subsequent API calls
95+
second_response = GetWorkflowExecutionHistoryResponse(
96+
history=History(events=[
97+
create_history_event(3),
98+
create_history_event(4)
99+
]),
100+
next_page_token=b"page3_token"
101+
)
102+
103+
third_response = GetWorkflowExecutionHistoryResponse(
104+
history=History(events=[
105+
create_history_event(5)
106+
]),
107+
next_page_token=b"" # No more pages
108+
)
109+
110+
# Configure mock to return responses in sequence
111+
mock_client.workflow_stub.GetWorkflowExecutionHistory.side_effect = [
112+
second_response,
113+
third_response
114+
]
115+
116+
# Iterate and collect events
117+
result_events = [e async for e in iterate_history_events(decision_task, mock_client)]
118+
119+
# Verify all events from all pages were returned
120+
assert len(result_events) == 5
121+
assert result_events[0].event_id == 1
122+
assert result_events[1].event_id == 2
123+
assert result_events[2].event_id == 3
124+
assert result_events[3].event_id == 4
125+
assert result_events[4].event_id == 5
126+
127+
# Verify correct API calls were made
128+
assert mock_client.workflow_stub.GetWorkflowExecutionHistory.call_count == 2
129+
130+
# Verify first API call
131+
first_call = mock_client.workflow_stub.GetWorkflowExecutionHistory.call_args_list[0]
132+
first_request = first_call[0][0]
133+
assert first_request.domain == "test-domain"
134+
assert first_request.workflow_execution == mock_workflow_execution
135+
assert first_request.next_page_token == b"page2_token"
136+
assert first_request.page_size == 1000
137+
138+
# Verify second API call
139+
second_call = mock_client.workflow_stub.GetWorkflowExecutionHistory.call_args_list[1]
140+
second_request = second_call[0][0]
141+
assert second_request.domain == "test-domain"
142+
assert second_request.workflow_execution == mock_workflow_execution
143+
assert second_request.next_page_token == b"page3_token"
144+
assert second_request.page_size == 1000
145+
146+
async def test_iterate_history_events_single_page_with_next_token_then_empty(mock_client, mock_workflow_execution):
147+
"""Test case where first page has next token but second page is empty."""
148+
# Create first page of events
149+
first_page_events = [
150+
create_history_event(1),
151+
create_history_event(2)
152+
]
153+
154+
# Create decision task response with first page and next page token
155+
decision_task = PollForDecisionTaskResponse(
156+
history=History(events=first_page_events),
157+
next_page_token=b"page2_token",
158+
workflow_execution=mock_workflow_execution
159+
)
160+
161+
# Mock the second API call to return empty page
162+
second_response = GetWorkflowExecutionHistoryResponse(
163+
history=History(events=[]),
164+
next_page_token=b"" # No more pages
165+
)
166+
167+
mock_client.workflow_stub.GetWorkflowExecutionHistory.return_value = second_response
168+
169+
# Iterate and collect events
170+
result_events = [e async for e in iterate_history_events(decision_task, mock_client)]
171+
172+
# Verify only first page events were returned
173+
assert len(result_events) == 2
174+
assert result_events[0].event_id == 1
175+
assert result_events[1].event_id == 2
176+
177+
# Verify one API call was made
178+
assert mock_client.workflow_stub.GetWorkflowExecutionHistory.call_count == 1

0 commit comments

Comments
 (0)