Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add regression test for async tool streaming deadlock
Co-authored-by: TomeHirata <[email protected]>
  • Loading branch information
Copilot and TomeHirata committed Oct 14, 2025
commit 55935b907e50aaaf1e895db1f728a67be5fd7938
62 changes: 62 additions & 0 deletions tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,68 @@ async def aforward(self, question, **kwargs):
assert timestamps[1] - timestamps[0] >= 1


@pytest.mark.anyio
async def test_async_tool_streaming_no_deadlock():
"""Test that async tools with streaming don't cause deadlock.

This is a regression test for the issue where sync_send_to_stream would block
the event loop when async tool callbacks tried to send status messages, causing
the stream to freeze after "Tool calling finished!" message.
"""
async def async_tool_with_delay(x: str):
"""An async tool that simulates MCP or ReAct tool calls."""
await asyncio.sleep(0.3)
return f"Processed: {x}"

class ProgramWithAsyncTools(dspy.Module):
def __init__(self):
super().__init__()
self.async_tool = dspy.Tool(async_tool_with_delay, name="async_processor")
self.predict = dspy.Predict("input->output")

async def aforward(self, input_text: str, **kwargs):
# Call async tool - this should trigger on_tool_start and on_tool_end callbacks
result = await self.async_tool.acall(x=input_text)
# Then call predict
return await self.predict.acall(input=result, **kwargs)

lm = dspy.utils.DummyLM([{"output": "final result"}])

with dspy.context(lm=lm):
program = dspy.streamify(
ProgramWithAsyncTools(),
status_message_provider=StatusMessageProvider(),
is_async_program=True
)

output = program(input_text="test")

status_messages = []
final_prediction = None
start_time = time.time()

# This should complete without deadlock
async for value in output:
if isinstance(value, dspy.streaming.StatusMessage):
status_messages.append(value.message)
elif isinstance(value, dspy.Prediction):
final_prediction = value

duration = time.time() - start_time

# Verify we got both tool start and end messages
assert len(status_messages) == 2, f"Expected 2 status messages, got {len(status_messages)}: {status_messages}"
assert "async_processor" in status_messages[0], f"First message should mention tool: {status_messages[0]}"
assert "finished" in status_messages[1].lower(), f"Second message should indicate completion: {status_messages[1]}"

# Verify we got the final prediction (stream didn't freeze)
assert final_prediction is not None, "Should receive final prediction without deadlock"
assert final_prediction.output == "final result"

# Verify the async delay worked (tool actually executed)
assert duration >= 0.3, f"Duration {duration} suggests tool didn't execute"


@pytest.mark.anyio
async def test_stream_listener_allow_reuse():
class MyProgram(dspy.Module):
Expand Down