Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix(core): defer tool execution cache commit to prevent state polluti…
…on in parallel calls

- Change _record_query to buffer successful queries instead of immediate commit
- Commit buffer only if the entire turn completes without triggering a rollback
- Fixes the 'death spiral' issue where a partial failure in parallel tool calling leaves dirty cache records that cause subsequent retries to incorrectly fail with 'Duplicate query detected'
  • Loading branch information
eviaaaaa committed Mar 3, 2026
commit b11fbd52a0f92a8e0b3b50c8d636722403236f37
22 changes: 16 additions & 6 deletions apps/miroflow-agent/src/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ async def run_sub_agent(
tool_calls_data = []
all_tool_results_content_with_id = []
should_rollback_turn = False
successful_queries_buffer = []

for call in tool_calls:
server_name = call["server_name"]
Expand Down Expand Up @@ -531,9 +532,9 @@ async def run_sub_agent(
sub_agent_name
].execute_tool_call(server_name, tool_name, arguments)

# Update query count if successful
# Buffer query if successful (deferred commit)
if "error" not in tool_result:
await self._record_query(cache_name, tool_name, arguments)
successful_queries_buffer.append((cache_name, tool_name, arguments))

# Post-process result
tool_result = self.tool_executor.post_process_tool_call_result(
Expand Down Expand Up @@ -617,6 +618,10 @@ async def run_sub_agent(
if should_rollback_turn:
continue

# Commit buffered successful queries to cache
for cache_name, tool_name, arguments in successful_queries_buffer:
await self._record_query(cache_name, tool_name, arguments)

# Reset consecutive rollbacks on successful execution
if consecutive_rollbacks > 0:
self.task_log.log_step(
Expand Down Expand Up @@ -905,6 +910,7 @@ async def run_main_agent(
tool_calls_data = []
all_tool_results_content_with_id = []
should_rollback_turn = False
successful_queries_buffer = []
main_agent_last_call_tokens = self.llm_client.last_call_tokens

for call in tool_calls:
Expand Down Expand Up @@ -954,8 +960,8 @@ async def run_main_agent(
arguments["subtask"],
)

# Update query count
await self._record_query(cache_name, tool_name, arguments)
# Buffer sub-agent query if successful (deferred commit)
successful_queries_buffer.append((cache_name, tool_name, arguments))

tool_result = {
"server_name": server_name,
Expand Down Expand Up @@ -1002,9 +1008,9 @@ async def run_main_agent(
)
)

# Update query count if successful
# Buffer query if successful (deferred commit)
if "error" not in tool_result:
await self._record_query(cache_name, tool_name, arguments)
successful_queries_buffer.append((cache_name, tool_name, arguments))

# Post-process result
tool_result = self.tool_executor.post_process_tool_call_result(
Expand Down Expand Up @@ -1092,6 +1098,10 @@ async def run_main_agent(
if should_rollback_turn:
continue

# Commit buffered successful queries to cache
for cache_name, tool_name, arguments in successful_queries_buffer:
await self._record_query(cache_name, tool_name, arguments)

# Reset consecutive rollbacks on successful execution
if consecutive_rollbacks > 0:
self.task_log.log_step(
Expand Down