Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
debugging stream converter
  • Loading branch information
lusu-msft committed Jan 5, 2026
commit e453cc88e8ea58412f609c382e5e685729d3c8cf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
AgentFrameworkOutputNonStreamingConverter,
)
from .models.agent_framework_output_streaming_converter import AgentFrameworkOutputStreamingConverter
from .models.human_in_the_loop_helper import HumanInTheLoopHelper
from .models.constants import Constants
from .tool_client import ToolClient

Expand Down Expand Up @@ -223,6 +224,7 @@ async def agent_run( # pylint: disable=too-many-statements

logger.info(f"Starting agent_run with stream={context.stream}")
request_input = context.request.get("input")
hitl_helper = HumanInTheLoopHelper()

input_converter = AgentFrameworkInputConverter()
message = input_converter.transform_input(request_input)
Expand Down Expand Up @@ -255,7 +257,7 @@ async def stream_updates():

# Non-streaming path
logger.info("Running agent in non-streaming mode")
non_streaming_converter = AgentFrameworkOutputNonStreamingConverter(context)
non_streaming_converter = AgentFrameworkOutputNonStreamingConverter(context, hitl_helper=hitl_helper)
result = await agent.run(message)
logger.debug(f"Agent run completed, result type: {type(result)}")
transformed_result = non_streaming_converter.transform_output_for_response(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@

from .agent_id_generator import AgentIdGenerator
from .constants import Constants
from .human_in_the_loop_helper import HumanInTheLoopHelper

logger = get_logger()


class AgentFrameworkOutputNonStreamingConverter: # pylint: disable=name-too-long
"""Non-streaming converter: AgentRunResponse -> OpenAIResponse."""

def __init__(self, context: AgentRunContext):
def __init__(self, context: AgentRunContext, *, hitl_helper: HumanInTheLoopHelper):
self._context = context
self._response_id = None
self._response_created_at = None
self.hitl_helper = None
self._hitl_helper = hitl_helper

def _ensure_response_started(self) -> None:
if not self._response_id:
Expand Down Expand Up @@ -218,7 +219,7 @@ def _append_function_result_content(self, content: FunctionResultContent, sink:

def _append_user_input_request_contents(self, content: UserInputRequestContents, sink: List[dict], author_name: str) -> None:
item_id = self._context.id_generator.generate_message_id()
content = self.hitl_helper.convert_user_input_request_content(content)
content = self._hitl_helper.convert_user_input_request_content(content)
if not content:
logger.warning("UserInputRequestContents conversion returned empty content, skipping.")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
ErrorContent,
FunctionCallContent,
TextContent,
UserInputRequestContents,
)

from azure.ai.agentserver.core import AgentRunContext
from azure.ai.agentserver.core.logger import get_logger
from azure.ai.agentserver.core.models import (
Response as OpenAIResponse,
ResponseStreamEvent,
Expand All @@ -43,8 +45,10 @@
)

from .agent_id_generator import AgentIdGenerator
from .human_in_the_loop_helper import HumanInTheLoopHelper
from .utils.async_iter import chunk_on_change, peek

logger = get_logger()

class _BaseStreamingState:
"""Base interface for streaming state handlers."""
Expand Down Expand Up @@ -198,6 +202,86 @@ async def convert_contents(
self._parent.add_completed_output_item(item) # pylint: disable=protected-access


class _UserInputRequestState(_BaseStreamingState):
"""State handler for function_call content during streaming."""

def __init__(self,
parent: AgentFrameworkOutputStreamingConverter,
hitl_helper: HumanInTheLoopHelper):
self._parent = parent
self._hitl_helper = hitl_helper

async def convert_contents(
self, contents: AsyncIterable[UserInputRequestContents], author_name: str
) -> AsyncIterable[ResponseStreamEvent]:
content_by_call_id = {}
ids_by_call_id = {}

async for content in contents:
content = self._hitl_helper.convert_user_input_request_content(content)
if not content:
logger.warning("UserInputRequestContents conversion returned empty content, skipping.")
return

if content["call_id"] not in content_by_call_id:
item_id = self._parent.context.id_generator.generate_function_call_id()
output_index = self._parent.next_output_index()

content_by_call_id[content["call_id"]] = content
ids_by_call_id[content["call_id"]] = (item_id, output_index)

yield ResponseOutputItemAddedEvent(
sequence_number=self._parent.next_sequence(),
output_index=output_index,
item=FunctionToolCallItemResource(
id=item_id,
status="in_progress",
call_id=content["call_id"],
name=content["name"],
arguments="",
created_by=self._parent._build_created_by(author_name),
),
)
else:
prev_content = content_by_call_id[content["call_id"]]
prev_content["arguments"] = prev_content["arguments"] + content["arguments"]
item_id, output_index = ids_by_call_id[content["call_id"]]

args_delta = content["arguments"] if isinstance(content["arguments"], str) else ""
yield ResponseFunctionCallArgumentsDeltaEvent(
sequence_number=self._parent.next_sequence(),
item_id=item_id,
output_index=output_index,
delta=args_delta,
)

for call_id, content in content_by_call_id.items():
item_id, output_index = ids_by_call_id[call_id]
args = content["arguments"]
yield ResponseFunctionCallArgumentsDoneEvent(
sequence_number=self._parent.next_sequence(),
item_id=item_id,
output_index=output_index,
arguments=args,
)

item = FunctionToolCallItemResource(
id=item_id,
status="completed",
call_id=call_id,
name=content["name"],
arguments=args,
created_by=self._parent._build_created_by(author_name),
)
yield ResponseOutputItemDoneEvent(
sequence_number=self._parent.next_sequence(),
output_index=output_index,
item=item,
)

self._parent.add_completed_output_item(item) # pylint: disable=protected-access


class _FunctionCallOutputStreamingState(_BaseStreamingState):
"""Handles function_call_output items streaming (non-chunked simple output)."""

Expand Down Expand Up @@ -255,14 +339,15 @@ def _to_output(cls, result: Any) -> str:
class AgentFrameworkOutputStreamingConverter:
"""Streaming converter using content-type-specific state handlers."""

def __init__(self, context: AgentRunContext) -> None:
def __init__(self, context: AgentRunContext, *, hitl_helper: HumanInTheLoopHelper=None) -> None:
self._context = context
# sequence numbers must start at 0 for first emitted event
self._sequence = -1
self._next_output_index = -1
self._response_id = self._context.response_id
self._response_created_at = None
self._completed_output_items: List[ItemResource] = []
self._hitl_helper = hitl_helper

def next_sequence(self) -> int:
self._sequence += 1
Expand Down Expand Up @@ -294,7 +379,10 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async
)

is_changed = (
lambda a, b: a is not None and b is not None and a.message_id != b.message_id # pylint: disable=unnecessary-lambda-assignment
lambda a, b: a is not None \
and b is not None \
and (a.message_id != b.message_id \
or type(a) != type(b)) # pylint: disable=unnecessary-lambda-assignment
)
async for group in chunk_on_change(updates, is_changed):
has_value, first_tuple, contents_with_author = await peek(self._read_updates(group))
Expand All @@ -304,12 +392,16 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async
first, author_name = first_tuple # Extract content and author_name from tuple

state = None
logger.info(f"First content type in group: {type(first).__name__}")
logger.info(f"First content type in group: {first.to_dict()}")
if isinstance(first, TextContent):
state = _TextContentStreamingState(self)
elif isinstance(first, (FunctionCallContent, FunctionApprovalRequestContent)):
elif isinstance(first, FunctionCallContent):
state = _FunctionCallStreamingState(self)
elif isinstance(first, FunctionResultContent):
state = _FunctionCallOutputStreamingState(self)
elif isinstance(first, UserInputRequestContents):
state = _UserInputRequestState(self, self._hitl_helper)
elif isinstance(first, ErrorContent):
raise ValueError(f"ErrorContent received: code={first.error_code}, message={first.message}")
if not state:
Expand Down Expand Up @@ -355,6 +447,7 @@ async def _read_updates(self, updates: AsyncIterable[AgentRunResponseUpdate]) ->
ErrorContent)
for content in update.contents:
if isinstance(content, accepted_types):
logger.info(f"Yield update {type(content)}: {content.to_dict()}")
yield (content, author_name)

def _ensure_response_started(self) -> None:
Expand Down