Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions dspy/streaming/messages.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import concurrent.futures
from dataclasses import dataclass
from typing import Any

Expand Down Expand Up @@ -33,18 +32,10 @@ async def _send():
try:
asyncio.get_running_loop()

# If we're in an event loop, offload to a new thread with its own event loop
def run_in_new_loop():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
return new_loop.run_until_complete(_send())
finally:
new_loop.close()

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(run_in_new_loop)
return future.result()
# If we're in an event loop, use send_nowait to avoid blocking
# The anyio MemoryObjectSendStream has a synchronous send_nowait method
stream.send_nowait(message)
return None
except RuntimeError:
# Not in an event loop, safe to use a new event loop in this thread
return syncify(_send)()
Expand Down
62 changes: 62 additions & 0 deletions tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,68 @@ async def aforward(self, question, **kwargs):
assert timestamps[1] - timestamps[0] >= 1


@pytest.mark.anyio
async def test_async_tool_streaming_no_deadlock():
"""Test that async tools with streaming don't cause deadlock.

This is a regression test for the issue where sync_send_to_stream would block
the event loop when async tool callbacks tried to send status messages, causing
the stream to freeze after "Tool calling finished!" message.
"""
async def async_tool_with_delay(x: str):
"""An async tool that simulates MCP or ReAct tool calls."""
await asyncio.sleep(0.3)
return f"Processed: {x}"

class ProgramWithAsyncTools(dspy.Module):
def __init__(self):
super().__init__()
self.async_tool = dspy.Tool(async_tool_with_delay, name="async_processor")
self.predict = dspy.Predict("input->output")

async def aforward(self, input_text: str, **kwargs):
# Call async tool - this should trigger on_tool_start and on_tool_end callbacks
result = await self.async_tool.acall(x=input_text)
# Then call predict
return await self.predict.acall(input=result, **kwargs)

lm = dspy.utils.DummyLM([{"output": "final result"}])

with dspy.context(lm=lm):
program = dspy.streamify(
ProgramWithAsyncTools(),
status_message_provider=StatusMessageProvider(),
is_async_program=True
)

output = program(input_text="test")

status_messages = []
final_prediction = None
start_time = time.time()

# This should complete without deadlock
async for value in output:
if isinstance(value, dspy.streaming.StatusMessage):
status_messages.append(value.message)
elif isinstance(value, dspy.Prediction):
final_prediction = value

duration = time.time() - start_time

# Verify we got both tool start and end messages
assert len(status_messages) == 2, f"Expected 2 status messages, got {len(status_messages)}: {status_messages}"
assert "async_processor" in status_messages[0], f"First message should mention tool: {status_messages[0]}"
assert "finished" in status_messages[1].lower(), f"Second message should indicate completion: {status_messages[1]}"

# Verify we got the final prediction (stream didn't freeze)
assert final_prediction is not None, "Should receive final prediction without deadlock"
assert final_prediction.output == "final result"

# Verify the async delay worked (tool actually executed)
assert duration >= 0.3, f"Duration {duration} suggests tool didn't execute"


@pytest.mark.anyio
async def test_stream_listener_allow_reuse():
class MyProgram(dspy.Module):
Expand Down