From efd71ff44e75fa3bc68c1e86558e00c6f9563eaa Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Tue, 30 Dec 2025 17:10:16 -0800 Subject: [PATCH 01/11] add hitl sample --- .../.envtemplate | 3 + .../human_in_the_loop_guessing_game/README.md | 46 +++ .../human_in_the_loop_guessing_game/main.py | 262 ++++++++++++++++++ .../requirements.txt | 5 + 4 files changed, 316 insertions(+) create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/.envtemplate create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/README.md create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/main.py create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/requirements.txt diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/.envtemplate b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/.envtemplate new file mode 100644 index 000000000000..bd646f163bb7 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/.envtemplate @@ -0,0 +1,3 @@ +AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ +OPENAI_API_VERSION=2025-03-01-preview +AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/README.md b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/README.md new file mode 100644 index 000000000000..64f19cefcbcb --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/README.md @@ -0,0 +1,46 @@ +pip install -e src/adapter/python +# Agent Framework Sample + +This sample demonstrates how to use the agents hosting adapter with Microsoft Agent Framework. + +## Prerequisites + +> **Azure sign-in:** Run `az login` before starting the sample so `DefaultAzureCredential` can acquire a CLI token. + +### Environment Variables + +Copy `.envtemplate` to `.env` and supply: + +``` +AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ +OPENAI_API_VERSION=2025-03-01-preview +AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= +``` + +## Running the Sample + +Follow these steps from this folder: + +1) Start the agent server (defaults to 0.0.0.0:8088): + +```bash +python minimal_example.py +``` + +2) Send a non-streaming request (returns a single JSON response): + +```bash +curl -sS \ + -H "Content-Type: application/json" \ + -X POST http://localhost:8088/responses \ + -d "{\"input\":\"What's the weather like in Seattle?\",\"stream\":false}" +``` + +3) Send a streaming request (server-sent events). Use -N to disable curl buffering: + +```bash +curl -N \ + -H "Content-Type: application/json" \ + -X POST http://localhost:8088/responses \ + -d "{\"input\":\"What's the weather like in New York?\",\"stream\":true}" +``` \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/main.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/main.py new file mode 100644 index 000000000000..07aaf86c1bbe --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/main.py @@ -0,0 +1,262 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from dataclasses import dataclass + +from agent_framework import ( + AgentExecutorRequest, # Message bundle sent to an AgentExecutor + AgentExecutorResponse, + ChatAgent, # Result returned by an AgentExecutor + ChatMessage, # Chat message structure + Executor, # Base class for workflow executors + RequestInfoEvent, # Event emitted when human input is requested + Role, # Enum of chat roles (user, assistant, system) + WorkflowBuilder, # Fluent builder for assembling the graph + WorkflowContext, # Per run context and event bus + WorkflowOutputEvent, # Event emitted when workflow yields output + WorkflowRunState, # Enum of workflow run states + WorkflowStatusEvent, # Event emitted on run state changes + handler, + response_handler, # Decorator to expose an Executor method as a step +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential +from pydantic import BaseModel + +from azure.ai.agentserver.agentframework import from_agent_framework + +""" +Sample: Human in the loop guessing game + +An agent guesses a number, then a human guides it with higher, lower, or +correct. The loop continues until the human confirms correct, at which point +the workflow completes when idle with no pending work. + +Purpose: +Show how to integrate a human step in the middle of an LLM workflow by using +`request_info` and `send_responses_streaming`. + +Demonstrate: +- Alternating turns between an AgentExecutor and a human, driven by events. +- Using Pydantic response_format to enforce structured JSON output from the agent instead of regex parsing. +- Driving the loop in application code with run_stream and responses parameter. + +Prerequisites: +- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables. +- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample. +- Basic familiarity with WorkflowBuilder, executors, edges, events, and streaming runs. +""" + +# How human-in-the-loop is achieved via `request_info` and `send_responses_streaming`: +# - An executor (TurnManager) calls `ctx.request_info` with a payload (HumanFeedbackRequest). +# - The workflow run pauses and emits a RequestInfoEvent with the payload and the request_id. +# - The application captures the event, prompts the user, and collects replies. +# - The application calls `send_responses_streaming` with a map of request_ids to replies. +# - The workflow resumes, and the response is delivered to the executor method decorated with @response_handler. +# - The executor can then continue the workflow, e.g., by sending a new message to the agent. + + +@dataclass +class HumanFeedbackRequest: + """Request sent to the human for feedback on the agent's guess.""" + + prompt: str + + +class GuessOutput(BaseModel): + """Structured output from the agent. Enforced via response_format for reliable parsing.""" + + guess: int + + +class TurnManager(Executor): + """Coordinates turns between the agent and the human. + + Responsibilities: + - Kick off the first agent turn. + - After each agent reply, request human feedback with a HumanFeedbackRequest. + - After each human reply, either finish the game or prompt the agent again with feedback. + """ + + def __init__(self, id: str | None = None): + super().__init__(id=id or "turn_manager") + + @handler + async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None: + """Start the game by asking the agent for an initial guess. + + Contract: + - Input is a simple starter token (ignored here). + - Output is an AgentExecutorRequest that triggers the agent to produce a guess. + """ + user = ChatMessage(Role.USER, text="Start by making your first guess.") + await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True)) + + @handler + async def on_agent_response( + self, + result: AgentExecutorResponse, + ctx: WorkflowContext, + ) -> None: + """Handle the agent's guess and request human guidance. + + Steps: + 1) Parse the agent's JSON into GuessOutput for robustness. + 2) Request info with a HumanFeedbackRequest as the payload. + """ + # Parse structured model output + text = result.agent_run_response.text + last_guess = GuessOutput.model_validate_json(text).guess + + # Craft a precise human prompt that defines higher and lower relative to the agent's guess. + prompt = ( + f"The agent guessed: {last_guess}. " + "Type one of: higher (your number is higher than this guess), " + "lower (your number is lower than this guess), correct, or exit." + ) + # Send a request with a prompt as the payload and expect a string reply. + await ctx.request_info( + request_data=HumanFeedbackRequest(prompt=prompt), + response_type=str, + ) + + @response_handler + async def on_human_feedback( + self, + original_request: HumanFeedbackRequest, + feedback: str, + ctx: WorkflowContext[AgentExecutorRequest, str], + ) -> None: + """Continue the game or finish based on human feedback.""" + print(f"Feedback for prompt '{original_request.prompt}' received: {feedback}") + + reply = feedback.strip().lower() + + if reply == "correct": + await ctx.yield_output("Guessed correctly!") + return + + # Provide feedback to the agent to try again. + # We keep the agent's output strictly JSON to ensure stable parsing on the next turn. + user_msg = ChatMessage( + Role.USER, + text=(f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": }}.'), + ) + await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True)) + + +def create_guessing_agent() -> ChatAgent: + """Create the guessing agent with instructions to guess a number between 1 and 10.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + name="GuessingAgent", + instructions=( + "You guess a number between 1 and 10. " + "If the user says 'higher' or 'lower', adjust your next guess. " + 'You MUST return ONLY a JSON object exactly matching this schema: {"guess": }. ' + "No explanations or additional text." + ), + # response_format enforces that the model produces JSON compatible with GuessOutput. + response_format=GuessOutput, + ) + +def build_agent(): + return ( + WorkflowBuilder() + .register_agent(create_guessing_agent, name="guessing_agent") + .register_executor(lambda: TurnManager(id="turn_manager"), name="turn_manager") + .set_start_executor("turn_manager") + .add_edge("turn_manager", "guessing_agent") # Ask agent to make/adjust a guess + .add_edge("guessing_agent", "turn_manager") # Agent's response comes back to coordinator + ).build() + +async def main() -> None: + """Run the human-in-the-loop guessing game workflow.""" + + # Build a simple loop: TurnManager <-> AgentExecutor. + workflow = build_agent() + await from_agent_framework(workflow).run_async() + + # # Human in the loop run: alternate between invoking the workflow and supplying collected responses. + # pending_responses: dict[str, str] | None = None + # workflow_output: str | None = None + + # # User guidance printing: + # # If you want to instruct users up front, print a short banner before the loop. + # # Example: + # # print( + # # "Interactive mode. When prompted, type one of: higher, lower, correct, or exit. " + # # "The agent will keep guessing until you reply correct.", + # # flush=True, + # # ) + + # while workflow_output is None: + # # First iteration uses run_stream("start"). + # # Subsequent iterations use send_responses_streaming with pending_responses from the console. + # stream = ( + # workflow.send_responses_streaming(pending_responses) if pending_responses else workflow.run_stream("start") + # ) + # # Collect events for this turn. Among these you may see WorkflowStatusEvent + # # with state IDLE_WITH_PENDING_REQUESTS when the workflow pauses for + # # human input, preceded by IN_PROGRESS_PENDING_REQUESTS as requests are + # # emitted. + # events = [event async for event in stream] + # pending_responses = None + + # # Collect human requests, workflow outputs, and check for completion. + # requests: list[tuple[str, str]] = [] # (request_id, prompt) + # for event in events: + # if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest): + # # RequestInfoEvent for our HumanFeedbackRequest. + # requests.append((event.request_id, event.data.prompt)) + # elif isinstance(event, WorkflowOutputEvent): + # # Capture workflow output as they're yielded + # workflow_output = str(event.data) + + # # Detect run state transitions for a better developer experience. + # pending_status = any( + # isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS + # for e in events + # ) + # idle_with_requests = any( + # isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS + # for e in events + # ) + # if pending_status: + # print("State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)") + # if idle_with_requests: + # print("State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)") + + # # If we have any human requests, prompt the user and prepare responses. + # if requests: + # responses: dict[str, str] = {} + # for req_id, prompt in requests: + # # Simple console prompt for the sample. + # print(f"HITL> {prompt}") + # # Instructional print already appears above. The input line below is the user entry point. + # # If desired, you can add more guidance here, but keep it concise. + # answer = input("Enter higher/lower/correct/exit: ").lower() # noqa: ASYNC250 + # if answer == "exit": + # print("Exiting...") + # return + # responses[req_id] = answer + # pending_responses = responses + + # # Show final result from workflow output captured during streaming. + # print(f"Workflow output: {workflow_output}") + # """ + # Sample Output: + + # HITL> The agent guessed: 5. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. + # Enter higher/lower/correct/exit: higher + # HITL> The agent guessed: 8. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. + # Enter higher/lower/correct/exit: higher + # HITL> The agent guessed: 10. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. + # Enter higher/lower/correct/exit: lower + # HITL> The agent guessed: 9. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. + # Enter higher/lower/correct/exit: correct + # Workflow output: Guessed correctly: 9 + # """ # noqa: E501 + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/requirements.txt b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/requirements.txt new file mode 100644 index 000000000000..c044abf99eb1 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/requirements.txt @@ -0,0 +1,5 @@ +python-dotenv>=1.0.0 +azure-identity +agent-framework-azure-ai +azure-ai-agentserver-core +azure-ai-agentserver-agentframework From c6b6d071660b2ca010180e01dbc6bec2129455bd Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Sun, 4 Jan 2026 21:28:30 -0800 Subject: [PATCH 02/11] updated af hitl sample. implementing non stream resp converter --- ...ramework_output_non_streaming_converter.py | 31 ++- .../models/human_in_the_loop_helper.py | 25 ++ .../.envtemplate | 0 .../README.md | 0 .../samples/human_in_the_loop/main.py | 196 +++++++++++++ .../requirements.txt | 0 .../workflow_as_agent_reflection_pattern.py | 232 ++++++++++++++++ .../human_in_the_loop_guessing_game/main.py | 262 ------------------ .../core/server/common/constants.py | 6 + 9 files changed, 489 insertions(+), 263 deletions(-) create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py rename sdk/agentserver/azure-ai-agentserver-agentframework/samples/{human_in_the_loop_guessing_game => human_in_the_loop}/.envtemplate (100%) rename sdk/agentserver/azure-ai-agentserver-agentframework/samples/{human_in_the_loop_guessing_game => human_in_the_loop}/README.md (100%) create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py rename sdk/agentserver/azure-ai-agentserver-agentframework/samples/{human_in_the_loop_guessing_game => human_in_the_loop}/requirements.txt (100%) create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py delete mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/main.py create mode 100644 sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/server/common/constants.py diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py index fbece993305a..0eba87db1e03 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py @@ -7,7 +7,14 @@ import json from typing import Any, List -from agent_framework import AgentRunResponse, FunctionCallContent, FunctionResultContent, ErrorContent, TextContent +from agent_framework import ( + AgentRunResponse, + FunctionCallContent, + FunctionResultContent, + ErrorContent, + TextContent, +) +from agent_framework._types import UserInputRequestContents from azure.ai.agentserver.core import AgentRunContext from azure.ai.agentserver.core.logger import get_logger @@ -32,6 +39,7 @@ def __init__(self, context: AgentRunContext): self._context = context self._response_id = None self._response_created_at = None + self.hitl_helper = None def _ensure_response_started(self) -> None: if not self._response_id: @@ -120,6 +128,8 @@ def _append_content_item(self, content: Any, sink: List[dict], author_name: str) self._append_function_call_content(content, sink, author_name) elif isinstance(content, FunctionResultContent): self._append_function_result_content(content, sink, author_name) + elif isinstance(content, UserInputRequestContents): + self._append_user_input_request_contents(content, sink, author_name) elif isinstance(content, ErrorContent): raise ValueError(f"ErrorContent received: code={content.error_code}, message={content.message}") else: @@ -205,6 +215,25 @@ def _append_function_result_content(self, content: FunctionResultContent, sink: call_id, len(result), ) + + def _append_user_input_request_contents(self, content: UserInputRequestContents, sink: List[dict], author_name: str) -> None: + item_id = self._context.id_generator.generate_message_id() + content = self.hitl_helper.convert_user_input_request_content(content) + if not content: + logger.warning("UserInputRequestContents conversion returned empty content, skipping.") + return + sink.append( + { + "id": item_id, + "type": "function_call", + "status": "inprogress", + "call_id": content["call_id"], + "name": content["name"], + "arguments": content["arguments"], + "created_by": self._build_created_by(author_name), + } + ) + logger.debug(" added user_input_request item id=%s call_id=%s", item_id, content["call_id"]) # ------------- simple normalization helper ------------------------- def _coerce_result_text(self, value: Any) -> str | dict: diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py new file mode 100644 index 000000000000..b00b9675b682 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py @@ -0,0 +1,25 @@ +from typing import Any +import json + +from agent_framework._types import UserInputRequestContents + +from azure.ai.agentserver.core.server.common.constants import HUMAN_IN_THE_LOOP_FUNCTION_NAME + + +class HumanInTheLoopHelper: + def convert_user_input_request_content(self, content: UserInputRequestContents) -> dict: + call_id = getattr(content, "id") + arguments = self.convert_request_arguments(getattr(content, "arguments", "")) + return { + "call_id": call_id, + "name": HUMAN_IN_THE_LOOP_FUNCTION_NAME, + "arguments": arguments or "", + } + + def convert_request_arguments(self, arguments: Any) -> str: + if not isinstance(arguments, str): + try: + arguments = json.dumps(arguments) + except Exception: # pragma: no cover - fallback # pylint: disable=broad-exception-caught + arguments = str(arguments) + return arguments diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/.envtemplate b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/.envtemplate similarity index 100% rename from sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/.envtemplate rename to sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/.envtemplate diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/README.md b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/README.md similarity index 100% rename from sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/README.md rename to sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/README.md diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py new file mode 100644 index 000000000000..47c3fea8ad47 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py @@ -0,0 +1,196 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import sys +from collections.abc import Mapping +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +# # Ensure local getting_started package can be imported when running as a script. +# _SAMPLES_ROOT = Path(__file__).resolve().parents[3] +# if str(_SAMPLES_ROOT) not in sys.path: +# sys.path.insert(0, str(_SAMPLES_ROOT)) + +from agent_framework import ( # noqa: E402 + ChatMessage, + Executor, + FunctionCallContent, + FunctionResultContent, + Role, + WorkflowAgent, + WorkflowBuilder, + WorkflowContext, + handler, + response_handler, +) +from agent_framework._types import UserInputRequestContents +from workflow_as_agent_reflection_pattern import ( # noqa: E402 + ReviewRequest, + ReviewResponse, + Worker, +) + +from azure.ai.agentserver.agentframework import from_agent_framework + +""" +Sample: Workflow Agent with Human-in-the-Loop + +Purpose: +This sample demonstrates how to build a workflow agent that escalates uncertain +decisions to a human manager. A Worker generates results, while a Reviewer +evaluates them. When the Reviewer is not confident, it escalates the decision +to a human, receives the human response, and then forwards that response back +to the Worker. The workflow completes when idle. + +Prerequisites: +- OpenAI account configured and accessible for OpenAIChatClient. +- Familiarity with WorkflowBuilder, Executor, and WorkflowContext from agent_framework. +- Understanding of request-response message handling in executors. +- (Optional) Review of reflection and escalation patterns, such as those in + workflow_as_agent_reflection.py. +""" + + +@dataclass +class HumanReviewRequest: + """A request message type for escalation to a human reviewer.""" + + agent_request: ReviewRequest | None = None + + +class ReviewerWithHumanInTheLoop(Executor): + """Executor that always escalates reviews to a human manager.""" + + def __init__(self, worker_id: str, reviewer_id: str | None = None) -> None: + unique_id = reviewer_id or f"{worker_id}-reviewer" + super().__init__(id=unique_id) + self._worker_id = worker_id + + @handler + async def review(self, request: ReviewRequest, ctx: WorkflowContext) -> None: + # In this simplified example, we always escalate to a human manager. + # See workflow_as_agent_reflection.py for an implementation + # using an automated agent to make the review decision. + print(f"Reviewer: Evaluating response for request {request.request_id[:8]}...") + print("Reviewer: Escalating to human manager...") + + # Forward the request to a human manager by sending a HumanReviewRequest. + await ctx.request_info(request_data=HumanReviewRequest(agent_request=request), response_type=ReviewResponse) + + @response_handler + async def accept_human_review( + self, + original_request: HumanReviewRequest, + response: ReviewResponse, + ctx: WorkflowContext[ReviewResponse], + ) -> None: + # Accept the human review response and forward it back to the Worker. + print(f"Reviewer: Accepting human review for request {response.request_id[:8]}...") + print(f"Reviewer: Human feedback: {response.feedback}") + print(f"Reviewer: Human approved: {response.approved}") + print("Reviewer: Forwarding human review back to worker...") + await ctx.send_message(response, target_id=self._worker_id) + + +def build_agent(): + # Build a workflow with bidirectional communication between Worker and Reviewer, + # and escalation paths for human review. + agent = ( + WorkflowBuilder() + .register_executor( + lambda: Worker( + id="sub-worker", + chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + ), + name="worker", + ) + .register_executor( + lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"), + name="reviewer", + ) + .add_edge("worker", "reviewer") # Worker sends requests to Reviewer + .add_edge("reviewer", "worker") # Reviewer sends feedback to Worker + .set_start_executor("worker") + .build() + .as_agent() # Convert workflow into an agent interface + ) + return agent + +async def main() -> None: + print("Starting Workflow Agent with Human-in-the-Loop Demo") + print("=" * 50) + + print("Building workflow with Worker-Reviewer cycle...") + agent = build_agent() + + print("Running workflow agent with user query...") + print("Query: 'Write code for parallel reading 1 million files on disk and write to a sorted output file.'") + print("-" * 50) + + # Run the agent with an initial query. + response = await agent.run( + "Write code for parallel reading 1 million Files on disk and write to a sorted output file." + ) + + # Locate the human review function call in the response messages. + human_review_function_call: FunctionCallContent | None = None + for message in response.messages: + print(f"Message {type(message)}: {message.to_json()}") + for content in message.contents: + print(f"content {type(content)}: {content.to_json()}") + if isinstance(content, UserInputRequestContents): + print(f"User input requested: {content}") + if isinstance(content, FunctionCallContent) and content.name == WorkflowAgent.REQUEST_INFO_FUNCTION_NAME: + human_review_function_call = content + + # Handle the human review if required. + if human_review_function_call: + # Parse the human review request arguments. + human_request_args = human_review_function_call.arguments + if isinstance(human_request_args, str): + request: WorkflowAgent.RequestInfoFunctionArgs = WorkflowAgent.RequestInfoFunctionArgs.from_json( + human_request_args + ) + elif isinstance(human_request_args, Mapping): + request = WorkflowAgent.RequestInfoFunctionArgs.from_dict(dict(human_request_args)) + else: + raise TypeError("Unexpected argument type for human review function call.") + + request_payload: Any = request.data + if not isinstance(request_payload, HumanReviewRequest): + raise ValueError("Human review request payload must be a HumanReviewRequest.") + + agent_request = request_payload.agent_request + if agent_request is None: + raise ValueError("Human review request must include agent_request.") + + request_id = agent_request.request_id + # Mock a human response approval for demonstration purposes. + human_response = ReviewResponse(request_id=request_id, feedback="Approved", approved=True) + + # Create the function call result object to send back to the agent. + human_review_function_result = FunctionResultContent( + call_id=human_review_function_call.call_id, + result=human_response, + ) + # Send the human review result back to the agent. + response = await agent.run(ChatMessage(role=Role.TOOL, contents=[human_review_function_result])) + print(f"Agent Response: {response.to_dict()}") + print(f"📤 Agent Response: {response.messages[-1].text}") + + print("=" * 50) + print("Workflow completed!") + + +async def run_agent() -> None: + agent = build_agent() + await from_agent_framework(agent).run_async() + +if __name__ == "__main__": + print("Initializing Workflow as Agent Sample...") + asyncio.run(main()) + #asyncio.run(run_agent()) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/requirements.txt b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/requirements.txt similarity index 100% rename from sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/requirements.txt rename to sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/requirements.txt diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py new file mode 100644 index 000000000000..53947c7a4060 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py @@ -0,0 +1,232 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from dataclasses import dataclass +from uuid import uuid4 + +from agent_framework import ( + AgentRunResponseUpdate, + AgentRunUpdateEvent, + ChatClientProtocol, + ChatMessage, + Contents, + Executor, + Role, + WorkflowBuilder, + WorkflowContext, + handler, +) +from agent_framework.openai import OpenAIChatClient +from pydantic import BaseModel + +""" +Sample: Workflow as Agent with Reflection and Retry Pattern + +Purpose: +This sample demonstrates how to wrap a workflow as an agent using WorkflowAgent. +It uses a reflection pattern where a Worker executor generates responses and a +Reviewer executor evaluates them. If the response is not approved, the Worker +regenerates the output based on feedback until the Reviewer approves it. Only +approved responses are emitted to the external consumer. The workflow completes when idle. + +Key Concepts Demonstrated: +- WorkflowAgent: Wraps a workflow to behave like a regular agent. +- Cyclic workflow design (Worker ↔ Reviewer) for iterative improvement. +- AgentRunUpdateEvent: Mechanism for emitting approved responses externally. +- Structured output parsing for review feedback using Pydantic. +- State management for pending requests and retry logic. + +Prerequisites: +- OpenAI account configured and accessible for OpenAIChatClient. +- Familiarity with WorkflowBuilder, Executor, WorkflowContext, and event handling. +- Understanding of how agent messages are generated, reviewed, and re-submitted. +""" + + +@dataclass +class ReviewRequest: + """Structured request passed from Worker to Reviewer for evaluation.""" + + request_id: str + user_messages: list[ChatMessage] + agent_messages: list[ChatMessage] + + +@dataclass +class ReviewResponse: + """Structured response from Reviewer back to Worker.""" + + request_id: str + feedback: str + approved: bool + + +class Reviewer(Executor): + """Executor that reviews agent responses and provides structured feedback.""" + + def __init__(self, id: str, chat_client: ChatClientProtocol) -> None: + super().__init__(id=id) + self._chat_client = chat_client + + @handler + async def review(self, request: ReviewRequest, ctx: WorkflowContext[ReviewResponse]) -> None: + print(f"Reviewer: Evaluating response for request {request.request_id[:8]}...") + + # Define structured schema for the LLM to return. + class _Response(BaseModel): + feedback: str + approved: bool + + # Construct review instructions and context. + messages = [ + ChatMessage( + role=Role.SYSTEM, + text=( + "You are a reviewer for an AI agent. Provide feedback on the " + "exchange between a user and the agent. Indicate approval only if:\n" + "- Relevance: response addresses the query\n" + "- Accuracy: information is correct\n" + "- Clarity: response is easy to understand\n" + "- Completeness: response covers all aspects\n" + "Do not approve until all criteria are satisfied." + ), + ) + ] + # Add conversation history. + messages.extend(request.user_messages) + messages.extend(request.agent_messages) + + # Add explicit review instruction. + messages.append(ChatMessage(role=Role.USER, text="Please review the agent's responses.")) + + print("Reviewer: Sending review request to LLM...") + response = await self._chat_client.get_response(messages=messages, response_format=_Response) + + parsed = _Response.model_validate_json(response.messages[-1].text) + + print(f"Reviewer: Review complete - Approved: {parsed.approved}") + print(f"Reviewer: Feedback: {parsed.feedback}") + + # Send structured review result to Worker. + await ctx.send_message( + ReviewResponse(request_id=request.request_id, feedback=parsed.feedback, approved=parsed.approved) + ) + + +class Worker(Executor): + """Executor that generates responses and incorporates feedback when necessary.""" + + def __init__(self, id: str, chat_client: ChatClientProtocol) -> None: + super().__init__(id=id) + self._chat_client = chat_client + self._pending_requests: dict[str, tuple[ReviewRequest, list[ChatMessage]]] = {} + + @handler + async def handle_user_messages(self, user_messages: list[ChatMessage], ctx: WorkflowContext[ReviewRequest]) -> None: + print("Worker: Received user messages, generating response...") + + # Initialize chat with system prompt. + messages = [ChatMessage(role=Role.SYSTEM, text="You are a helpful assistant.")] + messages.extend(user_messages) + + print("Worker: Calling LLM to generate response...") + response = await self._chat_client.get_response(messages=messages) + print(f"Worker: Response generated: {response.messages[-1].text}") + + # Add agent messages to context. + messages.extend(response.messages) + + # Create review request and send to Reviewer. + request = ReviewRequest(request_id=str(uuid4()), user_messages=user_messages, agent_messages=response.messages) + print(f"Worker: Sending response for review (ID: {request.request_id[:8]})") + await ctx.send_message(request) + + # Track request for possible retry. + self._pending_requests[request.request_id] = (request, messages) + + @handler + async def handle_review_response(self, review: ReviewResponse, ctx: WorkflowContext[ReviewRequest]) -> None: + print(f"Worker: Received review for request {review.request_id[:8]} - Approved: {review.approved}") + + if review.request_id not in self._pending_requests: + raise ValueError(f"Unknown request ID in review: {review.request_id}") + + request, messages = self._pending_requests.pop(review.request_id) + + if review.approved: + print("Worker: Response approved. Emitting to external consumer...") + contents: list[Contents] = [] + for message in request.agent_messages: + contents.extend(message.contents) + + # Emit approved result to external consumer via AgentRunUpdateEvent. + await ctx.add_event( + AgentRunUpdateEvent(self.id, data=AgentRunResponseUpdate(contents=contents, role=Role.ASSISTANT)) + ) + return + + print(f"Worker: Response not approved. Feedback: {review.feedback}") + print("Worker: Regenerating response with feedback...") + + # Incorporate review feedback. + messages.append(ChatMessage(role=Role.SYSTEM, text=review.feedback)) + messages.append( + ChatMessage(role=Role.SYSTEM, text="Please incorporate the feedback and regenerate the response.") + ) + messages.extend(request.user_messages) + + # Retry with updated prompt. + response = await self._chat_client.get_response(messages=messages) + print(f"Worker: New response generated: {response.messages[-1].text}") + + messages.extend(response.messages) + + # Send updated request for re-review. + new_request = ReviewRequest( + request_id=review.request_id, user_messages=request.user_messages, agent_messages=response.messages + ) + await ctx.send_message(new_request) + + # Track new request for further evaluation. + self._pending_requests[new_request.request_id] = (new_request, messages) + + +async def main() -> None: + print("Starting Workflow Agent Demo") + print("=" * 50) + + print("Building workflow with Worker ↔ Reviewer cycle...") + agent = ( + WorkflowBuilder() + .register_executor( + lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")), + name="worker", + ) + .register_executor( + lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")), + name="reviewer", + ) + .add_edge("worker", "reviewer") # Worker sends responses to Reviewer + .add_edge("reviewer", "worker") # Reviewer provides feedback to Worker + .set_start_executor("worker") + .build() + .as_agent() # Wrap workflow as an agent + ) + + print("Running workflow agent with user query...") + print("Query: 'Write code for parallel reading 1 million files on disk and write to a sorted output file.'") + print("-" * 50) + + # Run agent in streaming mode to observe incremental updates. + async for event in agent.run_stream( + "Write code for parallel reading 1 million files on disk and write to a sorted output file." + ): + print(f"Agent Response: {event}") + + print("=" * 50) + print("Workflow completed!") + + +if __name__ == "__main__": + print("Initializing Workflow as Agent Sample...") + asyncio.run(main()) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/main.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/main.py deleted file mode 100644 index 07aaf86c1bbe..000000000000 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_guessing_game/main.py +++ /dev/null @@ -1,262 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - -import asyncio -from dataclasses import dataclass - -from agent_framework import ( - AgentExecutorRequest, # Message bundle sent to an AgentExecutor - AgentExecutorResponse, - ChatAgent, # Result returned by an AgentExecutor - ChatMessage, # Chat message structure - Executor, # Base class for workflow executors - RequestInfoEvent, # Event emitted when human input is requested - Role, # Enum of chat roles (user, assistant, system) - WorkflowBuilder, # Fluent builder for assembling the graph - WorkflowContext, # Per run context and event bus - WorkflowOutputEvent, # Event emitted when workflow yields output - WorkflowRunState, # Enum of workflow run states - WorkflowStatusEvent, # Event emitted on run state changes - handler, - response_handler, # Decorator to expose an Executor method as a step -) -from agent_framework.azure import AzureOpenAIChatClient -from azure.identity import AzureCliCredential -from pydantic import BaseModel - -from azure.ai.agentserver.agentframework import from_agent_framework - -""" -Sample: Human in the loop guessing game - -An agent guesses a number, then a human guides it with higher, lower, or -correct. The loop continues until the human confirms correct, at which point -the workflow completes when idle with no pending work. - -Purpose: -Show how to integrate a human step in the middle of an LLM workflow by using -`request_info` and `send_responses_streaming`. - -Demonstrate: -- Alternating turns between an AgentExecutor and a human, driven by events. -- Using Pydantic response_format to enforce structured JSON output from the agent instead of regex parsing. -- Driving the loop in application code with run_stream and responses parameter. - -Prerequisites: -- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables. -- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample. -- Basic familiarity with WorkflowBuilder, executors, edges, events, and streaming runs. -""" - -# How human-in-the-loop is achieved via `request_info` and `send_responses_streaming`: -# - An executor (TurnManager) calls `ctx.request_info` with a payload (HumanFeedbackRequest). -# - The workflow run pauses and emits a RequestInfoEvent with the payload and the request_id. -# - The application captures the event, prompts the user, and collects replies. -# - The application calls `send_responses_streaming` with a map of request_ids to replies. -# - The workflow resumes, and the response is delivered to the executor method decorated with @response_handler. -# - The executor can then continue the workflow, e.g., by sending a new message to the agent. - - -@dataclass -class HumanFeedbackRequest: - """Request sent to the human for feedback on the agent's guess.""" - - prompt: str - - -class GuessOutput(BaseModel): - """Structured output from the agent. Enforced via response_format for reliable parsing.""" - - guess: int - - -class TurnManager(Executor): - """Coordinates turns between the agent and the human. - - Responsibilities: - - Kick off the first agent turn. - - After each agent reply, request human feedback with a HumanFeedbackRequest. - - After each human reply, either finish the game or prompt the agent again with feedback. - """ - - def __init__(self, id: str | None = None): - super().__init__(id=id or "turn_manager") - - @handler - async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None: - """Start the game by asking the agent for an initial guess. - - Contract: - - Input is a simple starter token (ignored here). - - Output is an AgentExecutorRequest that triggers the agent to produce a guess. - """ - user = ChatMessage(Role.USER, text="Start by making your first guess.") - await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True)) - - @handler - async def on_agent_response( - self, - result: AgentExecutorResponse, - ctx: WorkflowContext, - ) -> None: - """Handle the agent's guess and request human guidance. - - Steps: - 1) Parse the agent's JSON into GuessOutput for robustness. - 2) Request info with a HumanFeedbackRequest as the payload. - """ - # Parse structured model output - text = result.agent_run_response.text - last_guess = GuessOutput.model_validate_json(text).guess - - # Craft a precise human prompt that defines higher and lower relative to the agent's guess. - prompt = ( - f"The agent guessed: {last_guess}. " - "Type one of: higher (your number is higher than this guess), " - "lower (your number is lower than this guess), correct, or exit." - ) - # Send a request with a prompt as the payload and expect a string reply. - await ctx.request_info( - request_data=HumanFeedbackRequest(prompt=prompt), - response_type=str, - ) - - @response_handler - async def on_human_feedback( - self, - original_request: HumanFeedbackRequest, - feedback: str, - ctx: WorkflowContext[AgentExecutorRequest, str], - ) -> None: - """Continue the game or finish based on human feedback.""" - print(f"Feedback for prompt '{original_request.prompt}' received: {feedback}") - - reply = feedback.strip().lower() - - if reply == "correct": - await ctx.yield_output("Guessed correctly!") - return - - # Provide feedback to the agent to try again. - # We keep the agent's output strictly JSON to ensure stable parsing on the next turn. - user_msg = ChatMessage( - Role.USER, - text=(f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": }}.'), - ) - await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True)) - - -def create_guessing_agent() -> ChatAgent: - """Create the guessing agent with instructions to guess a number between 1 and 10.""" - return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( - name="GuessingAgent", - instructions=( - "You guess a number between 1 and 10. " - "If the user says 'higher' or 'lower', adjust your next guess. " - 'You MUST return ONLY a JSON object exactly matching this schema: {"guess": }. ' - "No explanations or additional text." - ), - # response_format enforces that the model produces JSON compatible with GuessOutput. - response_format=GuessOutput, - ) - -def build_agent(): - return ( - WorkflowBuilder() - .register_agent(create_guessing_agent, name="guessing_agent") - .register_executor(lambda: TurnManager(id="turn_manager"), name="turn_manager") - .set_start_executor("turn_manager") - .add_edge("turn_manager", "guessing_agent") # Ask agent to make/adjust a guess - .add_edge("guessing_agent", "turn_manager") # Agent's response comes back to coordinator - ).build() - -async def main() -> None: - """Run the human-in-the-loop guessing game workflow.""" - - # Build a simple loop: TurnManager <-> AgentExecutor. - workflow = build_agent() - await from_agent_framework(workflow).run_async() - - # # Human in the loop run: alternate between invoking the workflow and supplying collected responses. - # pending_responses: dict[str, str] | None = None - # workflow_output: str | None = None - - # # User guidance printing: - # # If you want to instruct users up front, print a short banner before the loop. - # # Example: - # # print( - # # "Interactive mode. When prompted, type one of: higher, lower, correct, or exit. " - # # "The agent will keep guessing until you reply correct.", - # # flush=True, - # # ) - - # while workflow_output is None: - # # First iteration uses run_stream("start"). - # # Subsequent iterations use send_responses_streaming with pending_responses from the console. - # stream = ( - # workflow.send_responses_streaming(pending_responses) if pending_responses else workflow.run_stream("start") - # ) - # # Collect events for this turn. Among these you may see WorkflowStatusEvent - # # with state IDLE_WITH_PENDING_REQUESTS when the workflow pauses for - # # human input, preceded by IN_PROGRESS_PENDING_REQUESTS as requests are - # # emitted. - # events = [event async for event in stream] - # pending_responses = None - - # # Collect human requests, workflow outputs, and check for completion. - # requests: list[tuple[str, str]] = [] # (request_id, prompt) - # for event in events: - # if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest): - # # RequestInfoEvent for our HumanFeedbackRequest. - # requests.append((event.request_id, event.data.prompt)) - # elif isinstance(event, WorkflowOutputEvent): - # # Capture workflow output as they're yielded - # workflow_output = str(event.data) - - # # Detect run state transitions for a better developer experience. - # pending_status = any( - # isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS - # for e in events - # ) - # idle_with_requests = any( - # isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS - # for e in events - # ) - # if pending_status: - # print("State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)") - # if idle_with_requests: - # print("State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)") - - # # If we have any human requests, prompt the user and prepare responses. - # if requests: - # responses: dict[str, str] = {} - # for req_id, prompt in requests: - # # Simple console prompt for the sample. - # print(f"HITL> {prompt}") - # # Instructional print already appears above. The input line below is the user entry point. - # # If desired, you can add more guidance here, but keep it concise. - # answer = input("Enter higher/lower/correct/exit: ").lower() # noqa: ASYNC250 - # if answer == "exit": - # print("Exiting...") - # return - # responses[req_id] = answer - # pending_responses = responses - - # # Show final result from workflow output captured during streaming. - # print(f"Workflow output: {workflow_output}") - # """ - # Sample Output: - - # HITL> The agent guessed: 5. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. - # Enter higher/lower/correct/exit: higher - # HITL> The agent guessed: 8. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. - # Enter higher/lower/correct/exit: higher - # HITL> The agent guessed: 10. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. - # Enter higher/lower/correct/exit: lower - # HITL> The agent guessed: 9. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. - # Enter higher/lower/correct/exit: correct - # Workflow output: Guessed correctly: 9 - # """ # noqa: E501 - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/server/common/constants.py b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/server/common/constants.py new file mode 100644 index 000000000000..7d21ee7a31ff --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/server/common/constants.py @@ -0,0 +1,6 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# Reserved function name for HITL. +HUMAN_IN_THE_LOOP_FUNCTION_NAME = "__hosted_agent_adapter_hitl__" \ No newline at end of file From e453cc88e8ea58412f609c382e5e685729d3c8cf Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Mon, 5 Jan 2026 10:34:38 -0800 Subject: [PATCH 03/11] debugging stream converter --- .../agentframework/agent_framework.py | 4 +- ...ramework_output_non_streaming_converter.py | 7 +- ...nt_framework_output_streaming_converter.py | 99 ++++++++++++++++++- 3 files changed, 103 insertions(+), 7 deletions(-) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py index 233436ac84ea..9fee5dcabdd9 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py @@ -27,6 +27,7 @@ AgentFrameworkOutputNonStreamingConverter, ) from .models.agent_framework_output_streaming_converter import AgentFrameworkOutputStreamingConverter +from .models.human_in_the_loop_helper import HumanInTheLoopHelper from .models.constants import Constants from .tool_client import ToolClient @@ -223,6 +224,7 @@ async def agent_run( # pylint: disable=too-many-statements logger.info(f"Starting agent_run with stream={context.stream}") request_input = context.request.get("input") + hitl_helper = HumanInTheLoopHelper() input_converter = AgentFrameworkInputConverter() message = input_converter.transform_input(request_input) @@ -255,7 +257,7 @@ async def stream_updates(): # Non-streaming path logger.info("Running agent in non-streaming mode") - non_streaming_converter = AgentFrameworkOutputNonStreamingConverter(context) + non_streaming_converter = AgentFrameworkOutputNonStreamingConverter(context, hitl_helper=hitl_helper) result = await agent.run(message) logger.debug(f"Agent run completed, result type: {type(result)}") transformed_result = non_streaming_converter.transform_output_for_response(result) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py index 0eba87db1e03..1ca0477e8e92 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py @@ -28,6 +28,7 @@ from .agent_id_generator import AgentIdGenerator from .constants import Constants +from .human_in_the_loop_helper import HumanInTheLoopHelper logger = get_logger() @@ -35,11 +36,11 @@ class AgentFrameworkOutputNonStreamingConverter: # pylint: disable=name-too-long """Non-streaming converter: AgentRunResponse -> OpenAIResponse.""" - def __init__(self, context: AgentRunContext): + def __init__(self, context: AgentRunContext, *, hitl_helper: HumanInTheLoopHelper): self._context = context self._response_id = None self._response_created_at = None - self.hitl_helper = None + self._hitl_helper = hitl_helper def _ensure_response_started(self) -> None: if not self._response_id: @@ -218,7 +219,7 @@ def _append_function_result_content(self, content: FunctionResultContent, sink: def _append_user_input_request_contents(self, content: UserInputRequestContents, sink: List[dict], author_name: str) -> None: item_id = self._context.id_generator.generate_message_id() - content = self.hitl_helper.convert_user_input_request_content(content) + content = self._hitl_helper.convert_user_input_request_content(content) if not content: logger.warning("UserInputRequestContents conversion returned empty content, skipping.") return diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py index 92f1cb983e08..2ca46e33acaf 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py @@ -14,9 +14,11 @@ ErrorContent, FunctionCallContent, TextContent, + UserInputRequestContents, ) from azure.ai.agentserver.core import AgentRunContext +from azure.ai.agentserver.core.logger import get_logger from azure.ai.agentserver.core.models import ( Response as OpenAIResponse, ResponseStreamEvent, @@ -43,8 +45,10 @@ ) from .agent_id_generator import AgentIdGenerator +from .human_in_the_loop_helper import HumanInTheLoopHelper from .utils.async_iter import chunk_on_change, peek +logger = get_logger() class _BaseStreamingState: """Base interface for streaming state handlers.""" @@ -198,6 +202,86 @@ async def convert_contents( self._parent.add_completed_output_item(item) # pylint: disable=protected-access +class _UserInputRequestState(_BaseStreamingState): + """State handler for function_call content during streaming.""" + + def __init__(self, + parent: AgentFrameworkOutputStreamingConverter, + hitl_helper: HumanInTheLoopHelper): + self._parent = parent + self._hitl_helper = hitl_helper + + async def convert_contents( + self, contents: AsyncIterable[UserInputRequestContents], author_name: str + ) -> AsyncIterable[ResponseStreamEvent]: + content_by_call_id = {} + ids_by_call_id = {} + + async for content in contents: + content = self._hitl_helper.convert_user_input_request_content(content) + if not content: + logger.warning("UserInputRequestContents conversion returned empty content, skipping.") + return + + if content["call_id"] not in content_by_call_id: + item_id = self._parent.context.id_generator.generate_function_call_id() + output_index = self._parent.next_output_index() + + content_by_call_id[content["call_id"]] = content + ids_by_call_id[content["call_id"]] = (item_id, output_index) + + yield ResponseOutputItemAddedEvent( + sequence_number=self._parent.next_sequence(), + output_index=output_index, + item=FunctionToolCallItemResource( + id=item_id, + status="in_progress", + call_id=content["call_id"], + name=content["name"], + arguments="", + created_by=self._parent._build_created_by(author_name), + ), + ) + else: + prev_content = content_by_call_id[content["call_id"]] + prev_content["arguments"] = prev_content["arguments"] + content["arguments"] + item_id, output_index = ids_by_call_id[content["call_id"]] + + args_delta = content["arguments"] if isinstance(content["arguments"], str) else "" + yield ResponseFunctionCallArgumentsDeltaEvent( + sequence_number=self._parent.next_sequence(), + item_id=item_id, + output_index=output_index, + delta=args_delta, + ) + + for call_id, content in content_by_call_id.items(): + item_id, output_index = ids_by_call_id[call_id] + args = content["arguments"] + yield ResponseFunctionCallArgumentsDoneEvent( + sequence_number=self._parent.next_sequence(), + item_id=item_id, + output_index=output_index, + arguments=args, + ) + + item = FunctionToolCallItemResource( + id=item_id, + status="completed", + call_id=call_id, + name=content["name"], + arguments=args, + created_by=self._parent._build_created_by(author_name), + ) + yield ResponseOutputItemDoneEvent( + sequence_number=self._parent.next_sequence(), + output_index=output_index, + item=item, + ) + + self._parent.add_completed_output_item(item) # pylint: disable=protected-access + + class _FunctionCallOutputStreamingState(_BaseStreamingState): """Handles function_call_output items streaming (non-chunked simple output).""" @@ -255,7 +339,7 @@ def _to_output(cls, result: Any) -> str: class AgentFrameworkOutputStreamingConverter: """Streaming converter using content-type-specific state handlers.""" - def __init__(self, context: AgentRunContext) -> None: + def __init__(self, context: AgentRunContext, *, hitl_helper: HumanInTheLoopHelper=None) -> None: self._context = context # sequence numbers must start at 0 for first emitted event self._sequence = -1 @@ -263,6 +347,7 @@ def __init__(self, context: AgentRunContext) -> None: self._response_id = self._context.response_id self._response_created_at = None self._completed_output_items: List[ItemResource] = [] + self._hitl_helper = hitl_helper def next_sequence(self) -> int: self._sequence += 1 @@ -294,7 +379,10 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async ) is_changed = ( - lambda a, b: a is not None and b is not None and a.message_id != b.message_id # pylint: disable=unnecessary-lambda-assignment + lambda a, b: a is not None \ + and b is not None \ + and (a.message_id != b.message_id \ + or type(a) != type(b)) # pylint: disable=unnecessary-lambda-assignment ) async for group in chunk_on_change(updates, is_changed): has_value, first_tuple, contents_with_author = await peek(self._read_updates(group)) @@ -304,12 +392,16 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async first, author_name = first_tuple # Extract content and author_name from tuple state = None + logger.info(f"First content type in group: {type(first).__name__}") + logger.info(f"First content type in group: {first.to_dict()}") if isinstance(first, TextContent): state = _TextContentStreamingState(self) - elif isinstance(first, (FunctionCallContent, FunctionApprovalRequestContent)): + elif isinstance(first, FunctionCallContent): state = _FunctionCallStreamingState(self) elif isinstance(first, FunctionResultContent): state = _FunctionCallOutputStreamingState(self) + elif isinstance(first, UserInputRequestContents): + state = _UserInputRequestState(self, self._hitl_helper) elif isinstance(first, ErrorContent): raise ValueError(f"ErrorContent received: code={first.error_code}, message={first.message}") if not state: @@ -355,6 +447,7 @@ async def _read_updates(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> ErrorContent) for content in update.contents: if isinstance(content, accepted_types): + logger.info(f"Yield update {type(content)}: {content.to_dict()}") yield (content, author_name) def _ensure_response_started(self) -> None: From 87f9fff24f9e9ce6ddda041e6d1da352c766a6ba Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Mon, 5 Jan 2026 23:00:35 -0800 Subject: [PATCH 04/11] request converter --- .../agentframework/agent_framework.py | 10 +- .../agent_framework_input_converters.py | 43 +++- ...ramework_output_non_streaming_converter.py | 8 +- ...nt_framework_output_streaming_converter.py | 196 +++++++++--------- .../models/human_in_the_loop_helper.py | 29 ++- .../agentframework/models/utils/async_iter.py | 5 +- .../samples/human_in_the_loop/main.py | 26 ++- .../workflow_as_agent_reflection_pattern.py | 11 + 8 files changed, 209 insertions(+), 119 deletions(-) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py index 9fee5dcabdd9..18f0d20fb613 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py @@ -74,6 +74,8 @@ class AgentFrameworkCBAgent(FoundryCBAgent): def __init__(self, agent: Union[AgentProtocol, AgentFactory], credentials: "Optional[AsyncTokenCredential]" = None, + *, + hitl_helper: Optional[HumanInTheLoopHelper] = None, **kwargs: Any): """Initialize the AgentFrameworkCBAgent with an AgentProtocol or a factory function. @@ -86,6 +88,7 @@ def __init__(self, agent: Union[AgentProtocol, AgentFactory], super().__init__(credentials=credentials, **kwargs) # pylint: disable=unexpected-keyword-arg self._agent_or_factory: Union[AgentProtocol, AgentFactory] = agent self._resolved_agent: "Optional[AgentProtocol]" = None + self._hitl_helper = hitl_helper # If agent is already instantiated, use it directly if isinstance(agent, AgentProtocol): self._resolved_agent = agent @@ -224,16 +227,15 @@ async def agent_run( # pylint: disable=too-many-statements logger.info(f"Starting agent_run with stream={context.stream}") request_input = context.request.get("input") - hitl_helper = HumanInTheLoopHelper() - input_converter = AgentFrameworkInputConverter() + input_converter = AgentFrameworkInputConverter(agent=agent, hitl_helper=self._hitl_helper) message = input_converter.transform_input(request_input) logger.debug(f"Transformed input message type: {type(message)}") # Use split converters if context.stream: logger.info("Running agent in streaming mode") - streaming_converter = AgentFrameworkOutputStreamingConverter(context) + streaming_converter = AgentFrameworkOutputStreamingConverter(context, hitl_helper=self._hitl_helper) async def stream_updates(): try: @@ -257,7 +259,7 @@ async def stream_updates(): # Non-streaming path logger.info("Running agent in non-streaming mode") - non_streaming_converter = AgentFrameworkOutputNonStreamingConverter(context, hitl_helper=hitl_helper) + non_streaming_converter = AgentFrameworkOutputNonStreamingConverter(context, hitl_helper=self._hitl_helper) result = await agent.run(message) logger.debug(f"Agent run completed, result type: {type(result)}") transformed_result = non_streaming_converter.transform_output_for_response(result) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py index 993be43e85c8..052341d881ef 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py @@ -7,7 +7,7 @@ from typing import Dict, List -from agent_framework import ChatMessage, Role as ChatRole +from agent_framework import ChatMessage, RequestInfoEvent, Role as ChatRole from agent_framework._types import TextContent from azure.ai.agentserver.core.logger import get_logger @@ -21,6 +21,9 @@ class AgentFrameworkInputConverter: Accepts: str | List | None Returns: None | str | ChatMessage | list[str] | list[ChatMessage] """ + def __init__(self, *, agent, hitl_helper=None): + self._agent = agent + self._hitl_helper = hitl_helper def transform_input( self, @@ -33,7 +36,17 @@ def transform_input( if isinstance(input, str): return input + + pending_requests = getattr(self._agent, 'pending_requests', {}) + if self._hitl_helper and pending_requests: + return self._validate_hitl_response(pending_requests, input) + return self._transform_input_internal(input) + def _transform_input_internal( + self, + input: str | List[Dict] | None, + ) -> str | ChatMessage | list[str] | list[ChatMessage] | None: + logger.debug("Transforming input of type: %s", type(input)) try: if isinstance(input, list): messages: list[str | ChatMessage] = [] @@ -118,3 +131,31 @@ def _extract_input_text(self, content_item: Dict) -> str: if isinstance(text_content, str): return text_content return None # type: ignore + + def _validate_hitl_response( + self, + pending_request: Dict, + input: List[Dict], + ) -> List[ChatMessage]: + if not self._hitl_helper: + logger.warning("HitL helper not provided; cannot validate HitL response.") + return [] + if isinstance(input, str): + logger.warning("Expected list input for HitL response validation, got str.") + return [] + if not isinstance(input, list) or len(input) != 1: + logger.warning("Expected single-item list input for HitL response validation.") + return [] + item = input[0] + if item.get("type") != "function_call_output": + logger.warning("Expected function_call_output type for HitL response validation.") + return [] + call_id = item.get("call_id", None) + if not call_id or call_id not in pending_request: + logger.warning("Function call output missing valid call_id for HitL response validation.") + return [] + request_info = pending_request[call_id] + if not request_info or not isinstance(request_info, RequestInfoEvent): + logger.warning("No valid pending request info found for call_id: %s", call_id) + return [] + return self._hitl_helper.convert_response(request_info, item) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py index 1ca0477e8e92..e4256e73b531 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py @@ -219,15 +219,15 @@ def _append_function_result_content(self, content: FunctionResultContent, sink: def _append_user_input_request_contents(self, content: UserInputRequestContents, sink: List[dict], author_name: str) -> None: item_id = self._context.id_generator.generate_message_id() - content = self._hitl_helper.convert_user_input_request_content(content) - if not content: - logger.warning("UserInputRequestContents conversion returned empty content, skipping.") + if not self._hitl_helper: + logger.warning("No HITL helper configured; skipping UserInputRequestContents item.") return + content = self._hitl_helper.convert_user_input_request_content(content) sink.append( { "id": item_id, "type": "function_call", - "status": "inprogress", + "status": "in_progress", "call_id": content["call_id"], "name": content["name"], "arguments": content["arguments"], diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py index 2ca46e33acaf..77714b8b51af 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py @@ -5,9 +5,10 @@ # mypy: disable-error-code="call-overload,assignment,arg-type,override" from __future__ import annotations +from ast import arguments import datetime import json -from typing import Any, AsyncIterable, List +from typing import Any, AsyncIterable, List, Union from agent_framework import AgentRunResponseUpdate, BaseContent, FunctionApprovalRequestContent, FunctionResultContent from agent_framework._types import ( @@ -134,50 +135,60 @@ async def convert_contents(self, contents: AsyncIterable[TextContent], author_na class _FunctionCallStreamingState(_BaseStreamingState): """State handler for function_call content during streaming.""" - def __init__(self, parent: AgentFrameworkOutputStreamingConverter): + def __init__(self, + parent: AgentFrameworkOutputStreamingConverter, + hitl_helper: HumanInTheLoopHelper): self._parent = parent + self._hitl_helper = hitl_helper async def convert_contents( - self, contents: AsyncIterable[FunctionCallContent], author_name: str + self, contents: AsyncIterable[Union[FunctionCallContent, UserInputRequestContents]], author_name: str ) -> AsyncIterable[ResponseStreamEvent]: content_by_call_id = {} ids_by_call_id = {} + hitl_contents = [] async for content in contents: - if content.call_id not in content_by_call_id: - item_id = self._parent.context.id_generator.generate_function_call_id() - output_index = self._parent.next_output_index() - - content_by_call_id[content.call_id] = content - ids_by_call_id[content.call_id] = (item_id, output_index) - - yield ResponseOutputItemAddedEvent( - sequence_number=self._parent.next_sequence(), - output_index=output_index, - item=FunctionToolCallItemResource( - id=item_id, - status="in_progress", - call_id=content.call_id, - name=content.name, - arguments="", - created_by=self._parent._build_created_by(author_name), - ), - ) - else: - content_by_call_id[content.call_id] = content_by_call_id[content.call_id] + content - item_id, output_index = ids_by_call_id[content.call_id] - - args_delta = content.arguments if isinstance(content.arguments, str) else "" - yield ResponseFunctionCallArgumentsDeltaEvent( - sequence_number=self._parent.next_sequence(), - item_id=item_id, - output_index=output_index, - delta=args_delta, - ) + if isinstance(content, FunctionCallContent): + if content.call_id not in content_by_call_id: + item_id = self._parent.context.id_generator.generate_function_call_id() + output_index = self._parent.next_output_index() + + content_by_call_id[content.call_id] = content + ids_by_call_id[content.call_id] = (item_id, output_index) + + yield ResponseOutputItemAddedEvent( + sequence_number=self._parent.next_sequence(), + output_index=output_index, + item=FunctionToolCallItemResource( + id=item_id, + status="in_progress", + call_id=content.call_id, + name=content.name, + arguments="", + created_by=self._parent._build_created_by(author_name), + ), + ) + else: + content_by_call_id[content.call_id] = content_by_call_id[content.call_id] + content + item_id, output_index = ids_by_call_id[content.call_id] + + args_delta = content.arguments if isinstance(content.arguments, str) else "" + yield ResponseFunctionCallArgumentsDeltaEvent( + sequence_number=self._parent.next_sequence(), + item_id=item_id, + output_index=output_index, + delta=args_delta, + ) + + elif isinstance(content, UserInputRequestContents): + converted_hitl = self._hitl_helper.convert_user_input_request_content(content) + if converted_hitl: + hitl_contents.append(converted_hitl) for call_id, content in content_by_call_id.items(): item_id, output_index = ids_by_call_id[call_id] - args = content.arguments if isinstance(content.arguments, str) else json.dumps(content.arguments) + args = self._serialize_arguments(content.arguments) yield ResponseFunctionCallArgumentsDoneEvent( sequence_number=self._parent.next_sequence(), item_id=item_id, @@ -200,77 +211,43 @@ async def convert_contents( ) self._parent.add_completed_output_item(item) # pylint: disable=protected-access + + # process HITL contents after function calls + for content in hitl_contents: + item_id = self._parent.context.id_generator.generate_function_call_id() + output_index = self._parent.next_output_index() + yield ResponseOutputItemAddedEvent( + sequence_number=self._parent.next_sequence(), + output_index=output_index, + item=FunctionToolCallItemResource( + id=item_id, + status="in_progress", + call_id=content["call_id"], + name=content["name"], + arguments="", + created_by=self._parent._build_created_by(author_name), + ), + ) + yield ResponseFunctionCallArgumentsDeltaEvent( + sequence_number=self._parent.next_sequence(), + item_id=item_id, + output_index=output_index, + delta=content["arguments"], + ) -class _UserInputRequestState(_BaseStreamingState): - """State handler for function_call content during streaming.""" - - def __init__(self, - parent: AgentFrameworkOutputStreamingConverter, - hitl_helper: HumanInTheLoopHelper): - self._parent = parent - self._hitl_helper = hitl_helper - - async def convert_contents( - self, contents: AsyncIterable[UserInputRequestContents], author_name: str - ) -> AsyncIterable[ResponseStreamEvent]: - content_by_call_id = {} - ids_by_call_id = {} - - async for content in contents: - content = self._hitl_helper.convert_user_input_request_content(content) - if not content: - logger.warning("UserInputRequestContents conversion returned empty content, skipping.") - return - - if content["call_id"] not in content_by_call_id: - item_id = self._parent.context.id_generator.generate_function_call_id() - output_index = self._parent.next_output_index() - - content_by_call_id[content["call_id"]] = content - ids_by_call_id[content["call_id"]] = (item_id, output_index) - - yield ResponseOutputItemAddedEvent( - sequence_number=self._parent.next_sequence(), - output_index=output_index, - item=FunctionToolCallItemResource( - id=item_id, - status="in_progress", - call_id=content["call_id"], - name=content["name"], - arguments="", - created_by=self._parent._build_created_by(author_name), - ), - ) - else: - prev_content = content_by_call_id[content["call_id"]] - prev_content["arguments"] = prev_content["arguments"] + content["arguments"] - item_id, output_index = ids_by_call_id[content["call_id"]] - - args_delta = content["arguments"] if isinstance(content["arguments"], str) else "" - yield ResponseFunctionCallArgumentsDeltaEvent( - sequence_number=self._parent.next_sequence(), - item_id=item_id, - output_index=output_index, - delta=args_delta, - ) - - for call_id, content in content_by_call_id.items(): - item_id, output_index = ids_by_call_id[call_id] - args = content["arguments"] yield ResponseFunctionCallArgumentsDoneEvent( sequence_number=self._parent.next_sequence(), item_id=item_id, output_index=output_index, - arguments=args, + arguments=content["arguments"], ) - item = FunctionToolCallItemResource( id=item_id, - status="completed", - call_id=call_id, + status="in_progress", + call_id=content["call_id"], name=content["name"], - arguments=args, + arguments=content["arguments"], created_by=self._parent._build_created_by(author_name), ) yield ResponseOutputItemDoneEvent( @@ -278,8 +255,21 @@ async def convert_contents( output_index=output_index, item=item, ) - - self._parent.add_completed_output_item(item) # pylint: disable=protected-access + self._parent.add_completed_output_item(item) + + + def _serialize_arguments(self, arguments: Any) -> str: + if isinstance(arguments, str): + return arguments + if hasattr(arguments, "to_dict"): + arguments = arguments.to_dict() + if isinstance(arguments, dict): + for key, value in arguments.items(): + logger.info(f"Argument key: {key}, value type: {type(value)}, {value}") + try: + return json.dumps(arguments) + except Exception: # pragma: no cover - fallback # pylint: disable=broad-exception-caught + return str(arguments) class _FunctionCallOutputStreamingState(_BaseStreamingState): @@ -382,9 +372,10 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async lambda a, b: a is not None \ and b is not None \ and (a.message_id != b.message_id \ - or type(a) != type(b)) # pylint: disable=unnecessary-lambda-assignment + or type(a.content[0]) != type(b.content[0])) # pylint: disable=unnecessary-lambda-assignment ) - async for group in chunk_on_change(updates, is_changed): + + async for group in chunk_on_change(updates, is_changed, logger=logger): has_value, first_tuple, contents_with_author = await peek(self._read_updates(group)) if not has_value or first_tuple is None: continue @@ -396,12 +387,10 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async logger.info(f"First content type in group: {first.to_dict()}") if isinstance(first, TextContent): state = _TextContentStreamingState(self) - elif isinstance(first, FunctionCallContent): - state = _FunctionCallStreamingState(self) + elif isinstance(first, (FunctionCallContent, UserInputRequestContents)): + state = _FunctionCallStreamingState(self, self._hitl_helper) elif isinstance(first, FunctionResultContent): state = _FunctionCallOutputStreamingState(self) - elif isinstance(first, UserInputRequestContents): - state = _UserInputRequestState(self, self._hitl_helper) elif isinstance(first, ErrorContent): raise ValueError(f"ErrorContent received: code={first.error_code}, message={first.message}") if not state: @@ -410,6 +399,7 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async # Extract just the content from (content, author_name) tuples using async generator async def extract_contents(): async for content, _ in contents_with_author: + logger.info(f"Processing content: {type(content).__name__}: {content.to_dict()}") yield content async for content in state.convert_contents(extract_contents(), author_name): diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py index b00b9675b682..987e165f0343 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py @@ -1,15 +1,20 @@ -from typing import Any +from typing import Any, List, Dict import json +from agent_framework import ChatMessage, FunctionResultContent, RequestInfoEvent from agent_framework._types import UserInputRequestContents +from azure.ai.agentserver.core.logger import get_logger from azure.ai.agentserver.core.server.common.constants import HUMAN_IN_THE_LOOP_FUNCTION_NAME +logger = get_logger() class HumanInTheLoopHelper: + def convert_user_input_request_content(self, content: UserInputRequestContents) -> dict: - call_id = getattr(content, "id") - arguments = self.convert_request_arguments(getattr(content, "arguments", "")) + function_call = content.function_call + call_id = getattr(function_call, "call_id", "") + arguments = self.convert_request_arguments(getattr(function_call, "arguments", "")) return { "call_id": call_id, "name": HUMAN_IN_THE_LOOP_FUNCTION_NAME, @@ -17,9 +22,27 @@ def convert_user_input_request_content(self, content: UserInputRequestContents) } def convert_request_arguments(self, arguments: Any) -> str: + # convert data to payload if possible + if isinstance(arguments, dict): + data = arguments.get("data") + if data and hasattr(data, "convert_to_payload"): + return data.convert_to_payload() + if not isinstance(arguments, str): + if hasattr(arguments, "to_dict"): + arguments = arguments.to_dict() try: arguments = json.dumps(arguments) except Exception: # pragma: no cover - fallback # pylint: disable=broad-exception-caught arguments = str(arguments) return arguments + + def convert_response(self, hitl_request: RequestInfoEvent, input: Dict) -> List[ChatMessage]: + response_type = hitl_request.response_type + if response_type and hasattr(response_type, "convert_from_payload"): + response_result = response_type.convert_from_payload(input.get("output", "")) + response_content = FunctionResultContent( + call_id=hitl_request.request_id, + result=response_result, + ) + return [ChatMessage(role="tool", contents=[response_content])] \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/utils/async_iter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/utils/async_iter.py index fdf3b2fbb2a3..42a51cb59dd1 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/utils/async_iter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/utils/async_iter.py @@ -14,6 +14,7 @@ async def chunk_on_change( source: AsyncIterable[TSource], is_changed: Optional[Callable[[Optional[TSource], Optional[TSource]], bool]] = None, + logger=None, ) -> AsyncIterator[AsyncIterable[TSource]]: """ Chunks an async iterable into groups based on when consecutive elements change. @@ -36,7 +37,7 @@ async def chunk_on_change( def key_equal(a: TSource, b: TSource) -> bool: return not is_changed(a, b) - async for group in chunk_by_key(source, lambda x: x, key_equal=key_equal): + async for group in chunk_by_key(source, lambda x: x, key_equal=key_equal, logger=logger): yield group @@ -44,6 +45,7 @@ async def chunk_by_key( source: AsyncIterable[TSource], key_selector: Callable[[TSource], TKey], key_equal: Optional[Callable[[TKey, TKey], bool]] = None, + logger=None, ) -> AsyncIterator[AsyncIterable[TSource]]: """ Chunks the async iterable into groups based on a key selector. @@ -92,6 +94,7 @@ async def inner() -> AsyncIterator[TSource]: return k = key_selector(item) + logger.info(f"Considering item with key: {k.to_dict()}, current_key: {current_key.to_dict()}", ) if not key_equal(k, current_key): # Hand first item of next group back to outer loop pending = item diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py index 47c3fea8ad47..10807433e82d 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft. All rights reserved. import asyncio +import json import sys from collections.abc import Mapping from dataclasses import dataclass @@ -35,6 +36,7 @@ ) from azure.ai.agentserver.agentframework import from_agent_framework +from azure.ai.agentserver.agentframework.models.human_in_the_loop_helper import HumanInTheLoopHelper """ Sample: Workflow Agent with Human-in-the-Loop @@ -61,6 +63,23 @@ class HumanReviewRequest: agent_request: ReviewRequest | None = None + def convert_to_payload(self) -> str: + """Convert the HumanReviewRequest to a JSON payload string.""" + user_messages = [msg.to_dict() for msg in self.agent_request.user_messages + ] if self.agent_request else [] + agent_messages = [msg.to_dict() for msg in self.agent_request.agent_messages + ] if self.agent_request else [] + payload = { + "agent_request": { + "request_id": self.agent_request.request_id, + "user_messages": user_messages, + "agent_messages": agent_messages, + } + if self.agent_request + else None + } + return json.dumps(payload, indent=2) + class ReviewerWithHumanInTheLoop(Executor): """Executor that always escalates reviews to a human manager.""" @@ -188,9 +207,10 @@ async def main() -> None: async def run_agent() -> None: agent = build_agent() - await from_agent_framework(agent).run_async() + hitl_helper = HumanInTheLoopHelper() + await from_agent_framework(agent, hitl_helper=hitl_helper).run_async() if __name__ == "__main__": print("Initializing Workflow as Agent Sample...") - asyncio.run(main()) - #asyncio.run(run_agent()) \ No newline at end of file + # asyncio.run(main()) + asyncio.run(run_agent()) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py index 53947c7a4060..ef74766420a4 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py @@ -2,6 +2,7 @@ import asyncio from dataclasses import dataclass +import json from uuid import uuid4 from agent_framework import ( @@ -60,6 +61,16 @@ class ReviewResponse: feedback: str approved: bool + @staticmethod + def convert_from_payload(payload: str) -> "ReviewResponse": + """Convert a JSON payload string to a ReviewResponse instance.""" + data = json.loads(payload) + return ReviewResponse( + request_id=data["request_id"], + feedback=data["feedback"], + approved=data["approved"], + ) + class Reviewer(Executor): """Executor that reviews agent responses and provides structured feedback.""" From de6a24743da3fb72eb5490c8dae58322dc0236fb Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Tue, 6 Jan 2026 14:23:48 -0800 Subject: [PATCH 05/11] remove unused code --- .../agentframework/agent_framework.py | 4 +-- .../agent_framework_input_converters.py | 25 ++++++++++-------- ...ramework_output_non_streaming_converter.py | 5 +--- ...nt_framework_output_streaming_converter.py | 26 +++---------------- .../models/human_in_the_loop_helper.py | 4 +-- .../agentframework/models/utils/async_iter.py | 5 +--- 6 files changed, 21 insertions(+), 48 deletions(-) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py index 18f0d20fb613..0ef15435f090 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py @@ -74,8 +74,6 @@ class AgentFrameworkCBAgent(FoundryCBAgent): def __init__(self, agent: Union[AgentProtocol, AgentFactory], credentials: "Optional[AsyncTokenCredential]" = None, - *, - hitl_helper: Optional[HumanInTheLoopHelper] = None, **kwargs: Any): """Initialize the AgentFrameworkCBAgent with an AgentProtocol or a factory function. @@ -88,7 +86,7 @@ def __init__(self, agent: Union[AgentProtocol, AgentFactory], super().__init__(credentials=credentials, **kwargs) # pylint: disable=unexpected-keyword-arg self._agent_or_factory: Union[AgentProtocol, AgentFactory] = agent self._resolved_agent: "Optional[AgentProtocol]" = None - self._hitl_helper = hitl_helper + self._hitl_helper = HumanInTheLoopHelper() # If agent is already instantiated, use it directly if isinstance(agent, AgentProtocol): self._resolved_agent = agent diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py index 052341d881ef..f2e6226f6cb5 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py @@ -5,7 +5,7 @@ # mypy: disable-error-code="no-redef" from __future__ import annotations -from typing import Dict, List +from typing import Dict, List, Optional from agent_framework import ChatMessage, RequestInfoEvent, Role as ChatRole from agent_framework._types import TextContent @@ -39,14 +39,15 @@ def transform_input( pending_requests = getattr(self._agent, 'pending_requests', {}) if self._hitl_helper and pending_requests: - return self._validate_hitl_response(pending_requests, input) + hitl_response = self._validate_hitl_response(pending_requests, input) + if hitl_response: + return hitl_response return self._transform_input_internal(input) def _transform_input_internal( self, input: str | List[Dict] | None, ) -> str | ChatMessage | list[str] | list[ChatMessage] | None: - logger.debug("Transforming input of type: %s", type(input)) try: if isinstance(input, list): messages: list[str | ChatMessage] = [] @@ -132,30 +133,32 @@ def _extract_input_text(self, content_item: Dict) -> str: return text_content return None # type: ignore - def _validate_hitl_response( + def _validate_and_convert_hitl_response( self, pending_request: Dict, input: List[Dict], - ) -> List[ChatMessage]: + ) -> Optional[List[ChatMessage]]: if not self._hitl_helper: logger.warning("HitL helper not provided; cannot validate HitL response.") - return [] + return None if isinstance(input, str): logger.warning("Expected list input for HitL response validation, got str.") - return [] + return None if not isinstance(input, list) or len(input) != 1: logger.warning("Expected single-item list input for HitL response validation.") - return [] + return None + item = input[0] if item.get("type") != "function_call_output": logger.warning("Expected function_call_output type for HitL response validation.") - return [] + return None call_id = item.get("call_id", None) if not call_id or call_id not in pending_request: logger.warning("Function call output missing valid call_id for HitL response validation.") - return [] + return None request_info = pending_request[call_id] if not request_info or not isinstance(request_info, RequestInfoEvent): logger.warning("No valid pending request info found for call_id: %s", call_id) - return [] + return None + return self._hitl_helper.convert_response(request_info, item) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py index e4256e73b531..08db24adfae0 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py @@ -218,10 +218,7 @@ def _append_function_result_content(self, content: FunctionResultContent, sink: ) def _append_user_input_request_contents(self, content: UserInputRequestContents, sink: List[dict], author_name: str) -> None: - item_id = self._context.id_generator.generate_message_id() - if not self._hitl_helper: - logger.warning("No HITL helper configured; skipping UserInputRequestContents item.") - return + item_id = self._context.id_generator.generate_function_call_id() content = self._hitl_helper.convert_user_input_request_content(content) sink.append( { diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py index 77714b8b51af..23d8702e38ec 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py @@ -19,7 +19,6 @@ ) from azure.ai.agentserver.core import AgentRunContext -from azure.ai.agentserver.core.logger import get_logger from azure.ai.agentserver.core.models import ( Response as OpenAIResponse, ResponseStreamEvent, @@ -49,7 +48,6 @@ from .human_in_the_loop_helper import HumanInTheLoopHelper from .utils.async_iter import chunk_on_change, peek -logger = get_logger() class _BaseStreamingState: """Base interface for streaming state handlers.""" @@ -188,7 +186,7 @@ async def convert_contents( for call_id, content in content_by_call_id.items(): item_id, output_index = ids_by_call_id[call_id] - args = self._serialize_arguments(content.arguments) + args = content.arguments if isinstance(content.arguments, str) else json.dumps(content.arguments) yield ResponseFunctionCallArgumentsDoneEvent( sequence_number=self._parent.next_sequence(), item_id=item_id, @@ -256,20 +254,6 @@ async def convert_contents( item=item, ) self._parent.add_completed_output_item(item) - - - def _serialize_arguments(self, arguments: Any) -> str: - if isinstance(arguments, str): - return arguments - if hasattr(arguments, "to_dict"): - arguments = arguments.to_dict() - if isinstance(arguments, dict): - for key, value in arguments.items(): - logger.info(f"Argument key: {key}, value type: {type(value)}, {value}") - try: - return json.dumps(arguments) - except Exception: # pragma: no cover - fallback # pylint: disable=broad-exception-caught - return str(arguments) class _FunctionCallOutputStreamingState(_BaseStreamingState): @@ -375,7 +359,7 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async or type(a.content[0]) != type(b.content[0])) # pylint: disable=unnecessary-lambda-assignment ) - async for group in chunk_on_change(updates, is_changed, logger=logger): + async for group in chunk_on_change(updates, is_changed): has_value, first_tuple, contents_with_author = await peek(self._read_updates(group)) if not has_value or first_tuple is None: continue @@ -383,8 +367,6 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async first, author_name = first_tuple # Extract content and author_name from tuple state = None - logger.info(f"First content type in group: {type(first).__name__}") - logger.info(f"First content type in group: {first.to_dict()}") if isinstance(first, TextContent): state = _TextContentStreamingState(self) elif isinstance(first, (FunctionCallContent, UserInputRequestContents)): @@ -399,7 +381,6 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async # Extract just the content from (content, author_name) tuples using async generator async def extract_contents(): async for content, _ in contents_with_author: - logger.info(f"Processing content: {type(content).__name__}: {content.to_dict()}") yield content async for content in state.convert_contents(extract_contents(), author_name): @@ -432,12 +413,11 @@ async def _read_updates(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> accepted_types = (TextContent, FunctionCallContent, - FunctionApprovalRequestContent, + UserInputRequestContents, FunctionResultContent, ErrorContent) for content in update.contents: if isinstance(content, accepted_types): - logger.info(f"Yield update {type(content)}: {content.to_dict()}") yield (content, author_name) def _ensure_response_started(self) -> None: diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py index 987e165f0343..96737853e49b 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py @@ -4,10 +4,8 @@ from agent_framework import ChatMessage, FunctionResultContent, RequestInfoEvent from agent_framework._types import UserInputRequestContents -from azure.ai.agentserver.core.logger import get_logger from azure.ai.agentserver.core.server.common.constants import HUMAN_IN_THE_LOOP_FUNCTION_NAME -logger = get_logger() class HumanInTheLoopHelper: @@ -29,7 +27,7 @@ def convert_request_arguments(self, arguments: Any) -> str: return data.convert_to_payload() if not isinstance(arguments, str): - if hasattr(arguments, "to_dict"): + if hasattr(arguments, "to_dict"): # agentframework models have to_dict method arguments = arguments.to_dict() try: arguments = json.dumps(arguments) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/utils/async_iter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/utils/async_iter.py index 42a51cb59dd1..fdf3b2fbb2a3 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/utils/async_iter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/utils/async_iter.py @@ -14,7 +14,6 @@ async def chunk_on_change( source: AsyncIterable[TSource], is_changed: Optional[Callable[[Optional[TSource], Optional[TSource]], bool]] = None, - logger=None, ) -> AsyncIterator[AsyncIterable[TSource]]: """ Chunks an async iterable into groups based on when consecutive elements change. @@ -37,7 +36,7 @@ async def chunk_on_change( def key_equal(a: TSource, b: TSource) -> bool: return not is_changed(a, b) - async for group in chunk_by_key(source, lambda x: x, key_equal=key_equal, logger=logger): + async for group in chunk_by_key(source, lambda x: x, key_equal=key_equal): yield group @@ -45,7 +44,6 @@ async def chunk_by_key( source: AsyncIterable[TSource], key_selector: Callable[[TSource], TKey], key_equal: Optional[Callable[[TKey, TKey], bool]] = None, - logger=None, ) -> AsyncIterator[AsyncIterable[TSource]]: """ Chunks the async iterable into groups based on a key selector. @@ -94,7 +92,6 @@ async def inner() -> AsyncIterator[TSource]: return k = key_selector(item) - logger.info(f"Considering item with key: {k.to_dict()}, current_key: {current_key.to_dict()}", ) if not key_equal(k, current_key): # Hand first item of next group back to outer loop pending = item From abb3f388e090689af4200eaf8e8142dfaa24f1c4 Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Tue, 6 Jan 2026 23:23:55 -0800 Subject: [PATCH 06/11] updated sample --- .../agent_framework_input_converters.py | 2 +- .../samples/human_in_the_loop/README.md | 120 ++++++++++++---- .../samples/human_in_the_loop/main.py | 132 +++--------------- .../workflow_as_agent_reflection_pattern.py | 122 +--------------- 4 files changed, 114 insertions(+), 262 deletions(-) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py index f2e6226f6cb5..6507f88802e4 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py @@ -39,7 +39,7 @@ def transform_input( pending_requests = getattr(self._agent, 'pending_requests', {}) if self._hitl_helper and pending_requests: - hitl_response = self._validate_hitl_response(pending_requests, input) + hitl_response = self._validate_and_convert_hitl_response(pending_requests, input) if hitl_response: return hitl_response return self._transform_input_internal(input) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/README.md b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/README.md index 64f19cefcbcb..19f0335895e3 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/README.md +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/README.md @@ -1,46 +1,112 @@ -pip install -e src/adapter/python -# Agent Framework Sample +# Human-in-the-Loop Agent Framework Sample -This sample demonstrates how to use the agents hosting adapter with Microsoft Agent Framework. +This sample shows how to host a Microsoft Agent Framework workflow inside Azure AI Agent Server while escalating responses to a real human when the reviewer executor decides that manual approval is required. ## Prerequisites -> **Azure sign-in:** Run `az login` before starting the sample so `DefaultAzureCredential` can acquire a CLI token. +- Python 3.10+ and `pip` +- Azure CLI logged in with `az login` (used by `AzureCliCredential`) +- An Azure OpenAI chat deployment -### Environment Variables +### Environment configuration -Copy `.envtemplate` to `.env` and supply: +1. Copy `.envtemplate` to `.env` and fill in your Azure OpenAI details: + ``` + AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ + OPENAI_API_VERSION=2025-03-01-preview + AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= + ``` + +2. Create a virtual environment (optional but recommended) and install the sample dependencies: + + ```powershell + python -m venv .venv + . .venv/Scripts/Activate.ps1 + pip install -r requirements.txt + ``` + +`main.py` automatically loads the `.env` file before spinning up the server. + +## Run the workflow-hosted agent + +From this directory start the adapter host (defaults to `http://0.0.0.0:8088`): + +```powershell +python main.py ``` -AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ -OPENAI_API_VERSION=2025-03-01-preview -AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= + +The worker executor produces answers, the reviewer executor always escalates to a person, and the adapter exposes the whole workflow through the `/responses` endpoint. + +For Human-in-the-loop scenario, the `HumanReviewRequest` and `ReviewResponse` are classes provided by user. User should provide functions for these classes that allow adapter convert the data to request payloads. + + +## Send a user request + +Save the following payload to `request.json` (adjust the prompt as needed): + +```json +{ + "input": "Plan a 2-day Seattle trip that balances food and museums.", + "stream": false +} ``` -## Running the Sample +Then call the server (PowerShell example): -Follow these steps from this folder: +```pwsh +$body = Get-Content .\request.json -Raw +Invoke-RestMethod -Uri http://localhost:8088/responses -Method Post -ContentType "application/json" -Body $body ` + | ConvertTo-Json -Depth 8 +``` -1) Start the agent server (defaults to 0.0.0.0:8088): +A human-review interrupt looks like this (formatted for clarity): -```bash -python minimal_example.py +```json +{ + "conversation": {"id": "conv_xxx"}, + "output": [ + { + "type": "function_call", + "name": "__hosted_agent_adapter_hitl__", + "call_id": "call_xxx", + "arguments": "{\"agent_request\":{\"request_id\":\"req_xxx\",...}}" + } + ] +} ``` -2) Send a non-streaming request (returns a single JSON response): +Capture three values from the response: + +- `conversation.id` +- The `call_id` of the `__hosted_agent_adapter_hitl__` function call +- The `request_id` inside the serialized `agent_request` + +## Provide human feedback -```bash -curl -sS \ - -H "Content-Type: application/json" \ - -X POST http://localhost:8088/responses \ - -d "{\"input\":\"What's the weather like in Seattle?\",\"stream\":false}" +Respond by sending a `function_call_output` message that carries your review decision. Replace the placeholders before running the command: + +```pwsh +$payload = @{ + stream = $false + conversation = @{ id = "" } + input = @( + @{ + type = "function_call_output" + call_id = "" + output = '{"request_id":"","feedback":"Approved","approved":true}' + } + ) +} | ConvertTo-Json -Depth 5 + +Invoke-RestMethod -Uri http://localhost:8088/responses -Method Post -ContentType "application/json" -Body $payload ` + | ConvertTo-Json -Depth 8 ``` -3) Send a streaming request (server-sent events). Use -N to disable curl buffering: +Update the JSON string in `output` to reject a response: + +```json +{"request_id":"","feedback":"Missing safety disclaimers.","approved":false} +``` -```bash -curl -N \ - -H "Content-Type: application/json" \ - -X POST http://localhost:8088/responses \ - -d "{\"input\":\"What's the weather like in New York?\",\"stream\":true}" -``` \ No newline at end of file +Once the reviewer accepts the human feedback, the worker emits the approved assistant response and the HTTP call returns the final output. diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py index 10807433e82d..433a3161f9b4 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py @@ -2,33 +2,22 @@ import asyncio import json -import sys -from collections.abc import Mapping from dataclasses import dataclass from pathlib import Path from typing import Any from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential - -# # Ensure local getting_started package can be imported when running as a script. -# _SAMPLES_ROOT = Path(__file__).resolve().parents[3] -# if str(_SAMPLES_ROOT) not in sys.path: -# sys.path.insert(0, str(_SAMPLES_ROOT)) +from dotenv import load_dotenv from agent_framework import ( # noqa: E402 - ChatMessage, Executor, - FunctionCallContent, - FunctionResultContent, - Role, WorkflowAgent, WorkflowBuilder, WorkflowContext, handler, response_handler, ) -from agent_framework._types import UserInputRequestContents from workflow_as_agent_reflection_pattern import ( # noqa: E402 ReviewRequest, ReviewResponse, @@ -36,26 +25,8 @@ ) from azure.ai.agentserver.agentframework import from_agent_framework -from azure.ai.agentserver.agentframework.models.human_in_the_loop_helper import HumanInTheLoopHelper - -""" -Sample: Workflow Agent with Human-in-the-Loop - -Purpose: -This sample demonstrates how to build a workflow agent that escalates uncertain -decisions to a human manager. A Worker generates results, while a Reviewer -evaluates them. When the Reviewer is not confident, it escalates the decision -to a human, receives the human response, and then forwards that response back -to the Worker. The workflow completes when idle. - -Prerequisites: -- OpenAI account configured and accessible for OpenAIChatClient. -- Familiarity with WorkflowBuilder, Executor, and WorkflowContext from agent_framework. -- Understanding of request-response message handling in executors. -- (Optional) Review of reflection and escalation patterns, such as those in - workflow_as_agent_reflection.py. -""" +load_dotenv() @dataclass class HumanReviewRequest: @@ -64,20 +35,17 @@ class HumanReviewRequest: agent_request: ReviewRequest | None = None def convert_to_payload(self) -> str: - """Convert the HumanReviewRequest to a JSON payload string.""" - user_messages = [msg.to_dict() for msg in self.agent_request.user_messages - ] if self.agent_request else [] - agent_messages = [msg.to_dict() for msg in self.agent_request.agent_messages - ] if self.agent_request else [] - payload = { - "agent_request": { - "request_id": self.agent_request.request_id, - "user_messages": user_messages, - "agent_messages": agent_messages, + """Convert the HumanReviewRequest to a payload string.""" + request = self.agent_request + payload: dict[str, Any] = {"agent_request": None} + + if request: + payload["agent_request"] = { + "request_id": request.request_id, + "user_messages": [msg.to_dict() for msg in request.user_messages], + "agent_messages": [msg.to_dict() for msg in request.agent_messages], } - if self.agent_request - else None - } + return json.dumps(payload, indent=2) @@ -98,7 +66,10 @@ async def review(self, request: ReviewRequest, ctx: WorkflowContext) -> None: print("Reviewer: Escalating to human manager...") # Forward the request to a human manager by sending a HumanReviewRequest. - await ctx.request_info(request_data=HumanReviewRequest(agent_request=request), response_type=ReviewResponse) + await ctx.request_info( + request_data=HumanReviewRequest(agent_request=request), + response_type=ReviewResponse, + ) @response_handler async def accept_human_review( @@ -139,78 +110,11 @@ def build_agent(): ) return agent -async def main() -> None: - print("Starting Workflow Agent with Human-in-the-Loop Demo") - print("=" * 50) - - print("Building workflow with Worker-Reviewer cycle...") - agent = build_agent() - - print("Running workflow agent with user query...") - print("Query: 'Write code for parallel reading 1 million files on disk and write to a sorted output file.'") - print("-" * 50) - - # Run the agent with an initial query. - response = await agent.run( - "Write code for parallel reading 1 million Files on disk and write to a sorted output file." - ) - - # Locate the human review function call in the response messages. - human_review_function_call: FunctionCallContent | None = None - for message in response.messages: - print(f"Message {type(message)}: {message.to_json()}") - for content in message.contents: - print(f"content {type(content)}: {content.to_json()}") - if isinstance(content, UserInputRequestContents): - print(f"User input requested: {content}") - if isinstance(content, FunctionCallContent) and content.name == WorkflowAgent.REQUEST_INFO_FUNCTION_NAME: - human_review_function_call = content - - # Handle the human review if required. - if human_review_function_call: - # Parse the human review request arguments. - human_request_args = human_review_function_call.arguments - if isinstance(human_request_args, str): - request: WorkflowAgent.RequestInfoFunctionArgs = WorkflowAgent.RequestInfoFunctionArgs.from_json( - human_request_args - ) - elif isinstance(human_request_args, Mapping): - request = WorkflowAgent.RequestInfoFunctionArgs.from_dict(dict(human_request_args)) - else: - raise TypeError("Unexpected argument type for human review function call.") - - request_payload: Any = request.data - if not isinstance(request_payload, HumanReviewRequest): - raise ValueError("Human review request payload must be a HumanReviewRequest.") - - agent_request = request_payload.agent_request - if agent_request is None: - raise ValueError("Human review request must include agent_request.") - - request_id = agent_request.request_id - # Mock a human response approval for demonstration purposes. - human_response = ReviewResponse(request_id=request_id, feedback="Approved", approved=True) - - # Create the function call result object to send back to the agent. - human_review_function_result = FunctionResultContent( - call_id=human_review_function_call.call_id, - result=human_response, - ) - # Send the human review result back to the agent. - response = await agent.run(ChatMessage(role=Role.TOOL, contents=[human_review_function_result])) - print(f"Agent Response: {response.to_dict()}") - print(f"📤 Agent Response: {response.messages[-1].text}") - - print("=" * 50) - print("Workflow completed!") - async def run_agent() -> None: + """Run the workflow inside the agent server adapter.""" agent = build_agent() - hitl_helper = HumanInTheLoopHelper() - await from_agent_framework(agent, hitl_helper=hitl_helper).run_async() + await from_agent_framework(agent).run_async() if __name__ == "__main__": - print("Initializing Workflow as Agent Sample...") - # asyncio.run(main()) asyncio.run(run_agent()) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py index ef74766420a4..168d90cdd93d 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py @@ -1,6 +1,5 @@ # Copyright (c) Microsoft. All rights reserved. -import asyncio from dataclasses import dataclass import json from uuid import uuid4 @@ -13,36 +12,9 @@ Contents, Executor, Role, - WorkflowBuilder, WorkflowContext, handler, ) -from agent_framework.openai import OpenAIChatClient -from pydantic import BaseModel - -""" -Sample: Workflow as Agent with Reflection and Retry Pattern - -Purpose: -This sample demonstrates how to wrap a workflow as an agent using WorkflowAgent. -It uses a reflection pattern where a Worker executor generates responses and a -Reviewer executor evaluates them. If the response is not approved, the Worker -regenerates the output based on feedback until the Reviewer approves it. Only -approved responses are emitted to the external consumer. The workflow completes when idle. - -Key Concepts Demonstrated: -- WorkflowAgent: Wraps a workflow to behave like a regular agent. -- Cyclic workflow design (Worker ↔ Reviewer) for iterative improvement. -- AgentRunUpdateEvent: Mechanism for emitting approved responses externally. -- Structured output parsing for review feedback using Pydantic. -- State management for pending requests and retry logic. - -Prerequisites: -- OpenAI account configured and accessible for OpenAIChatClient. -- Familiarity with WorkflowBuilder, Executor, WorkflowContext, and event handling. -- Understanding of how agent messages are generated, reviewed, and re-submitted. -""" - @dataclass class ReviewRequest: @@ -72,56 +44,7 @@ def convert_from_payload(payload: str) -> "ReviewResponse": ) -class Reviewer(Executor): - """Executor that reviews agent responses and provides structured feedback.""" - - def __init__(self, id: str, chat_client: ChatClientProtocol) -> None: - super().__init__(id=id) - self._chat_client = chat_client - - @handler - async def review(self, request: ReviewRequest, ctx: WorkflowContext[ReviewResponse]) -> None: - print(f"Reviewer: Evaluating response for request {request.request_id[:8]}...") - - # Define structured schema for the LLM to return. - class _Response(BaseModel): - feedback: str - approved: bool - - # Construct review instructions and context. - messages = [ - ChatMessage( - role=Role.SYSTEM, - text=( - "You are a reviewer for an AI agent. Provide feedback on the " - "exchange between a user and the agent. Indicate approval only if:\n" - "- Relevance: response addresses the query\n" - "- Accuracy: information is correct\n" - "- Clarity: response is easy to understand\n" - "- Completeness: response covers all aspects\n" - "Do not approve until all criteria are satisfied." - ), - ) - ] - # Add conversation history. - messages.extend(request.user_messages) - messages.extend(request.agent_messages) - - # Add explicit review instruction. - messages.append(ChatMessage(role=Role.USER, text="Please review the agent's responses.")) - - print("Reviewer: Sending review request to LLM...") - response = await self._chat_client.get_response(messages=messages, response_format=_Response) - - parsed = _Response.model_validate_json(response.messages[-1].text) - - print(f"Reviewer: Review complete - Approved: {parsed.approved}") - print(f"Reviewer: Feedback: {parsed.feedback}") - - # Send structured review result to Worker. - await ctx.send_message( - ReviewResponse(request_id=request.request_id, feedback=parsed.feedback, approved=parsed.approved) - ) +PendingReviewState = tuple[ReviewRequest, list[ChatMessage]] class Worker(Executor): @@ -130,7 +53,7 @@ class Worker(Executor): def __init__(self, id: str, chat_client: ChatClientProtocol) -> None: super().__init__(id=id) self._chat_client = chat_client - self._pending_requests: dict[str, tuple[ReviewRequest, list[ChatMessage]]] = {} + self._pending_requests: dict[str, PendingReviewState] = {} @handler async def handle_user_messages(self, user_messages: list[ChatMessage], ctx: WorkflowContext[ReviewRequest]) -> None: @@ -200,44 +123,3 @@ async def handle_review_response(self, review: ReviewResponse, ctx: WorkflowCont # Track new request for further evaluation. self._pending_requests[new_request.request_id] = (new_request, messages) - - -async def main() -> None: - print("Starting Workflow Agent Demo") - print("=" * 50) - - print("Building workflow with Worker ↔ Reviewer cycle...") - agent = ( - WorkflowBuilder() - .register_executor( - lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")), - name="worker", - ) - .register_executor( - lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")), - name="reviewer", - ) - .add_edge("worker", "reviewer") # Worker sends responses to Reviewer - .add_edge("reviewer", "worker") # Reviewer provides feedback to Worker - .set_start_executor("worker") - .build() - .as_agent() # Wrap workflow as an agent - ) - - print("Running workflow agent with user query...") - print("Query: 'Write code for parallel reading 1 million files on disk and write to a sorted output file.'") - print("-" * 50) - - # Run agent in streaming mode to observe incremental updates. - async for event in agent.run_stream( - "Write code for parallel reading 1 million files on disk and write to a sorted output file." - ): - print(f"Agent Response: {event}") - - print("=" * 50) - print("Workflow completed!") - - -if __name__ == "__main__": - print("Initializing Workflow as Agent Sample...") - asyncio.run(main()) \ No newline at end of file From 265bdcdaaa0cbd3ea8e5ca35a0b7b580fe5efa1d Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Wed, 7 Jan 2026 21:19:20 -0800 Subject: [PATCH 07/11] refine human feedback convert --- .../agentframework/models/human_in_the_loop_helper.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py index 96737853e49b..a3d32454efe3 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py @@ -37,10 +37,11 @@ def convert_request_arguments(self, arguments: Any) -> str: def convert_response(self, hitl_request: RequestInfoEvent, input: Dict) -> List[ChatMessage]: response_type = hitl_request.response_type + response_result = input.get("output", "") if response_type and hasattr(response_type, "convert_from_payload"): response_result = response_type.convert_from_payload(input.get("output", "")) - response_content = FunctionResultContent( - call_id=hitl_request.request_id, - result=response_result, - ) - return [ChatMessage(role="tool", contents=[response_content])] \ No newline at end of file + response_content = FunctionResultContent( + call_id=hitl_request.request_id, + result=response_result, + ) + return [ChatMessage(role="tool", contents=[response_content])] \ No newline at end of file From daf2cdce4ef561134ea15d81591192772eaf4d21 Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Fri, 9 Jan 2026 09:30:34 -0800 Subject: [PATCH 08/11] hitl working with thread --- .../agentframework/agent_framework.py | 54 +++++-- .../agent_framework_input_converters.py | 35 +++-- .../models/agent_state_inventory.py | 77 ++++++++++ .../models/human_in_the_loop_helper.py | 80 ++++++++++- .../.envtemplate | 3 + .../human_in_the_loop_ai_function/README.md | 46 ++++++ .../human_in_the_loop_ai_function/main.py | 134 ++++++++++++++++++ .../requirements.txt | 5 + 8 files changed, 413 insertions(+), 21 deletions(-) create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_state_inventory.py create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/.envtemplate create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/README.md create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/main.py create mode 100644 sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/requirements.txt diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py index 0ef15435f090..fdbc33ce01c0 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Optional, Protocol, Union, List import inspect -from agent_framework import AgentProtocol, AIFunction +from agent_framework import AgentProtocol, AIFunction, InMemoryCheckpointStorage from agent_framework.azure import AzureAIClient # pylint: disable=no-name-in-module from opentelemetry import trace @@ -87,6 +87,8 @@ def __init__(self, agent: Union[AgentProtocol, AgentFactory], self._agent_or_factory: Union[AgentProtocol, AgentFactory] = agent self._resolved_agent: "Optional[AgentProtocol]" = None self._hitl_helper = HumanInTheLoopHelper() + self._checkpoint_storage = InMemoryCheckpointStorage() + self._agent_thread_in_memory = {} # If agent is already instantiated, use it directly if isinstance(agent, AgentProtocol): self._resolved_agent = agent @@ -189,9 +191,13 @@ def init_tracing(self): self.tracer = trace.get_tracer(__name__) def setup_tracing_with_azure_ai_client(self, project_endpoint: str): + logger.info("Setting up tracing with AzureAIClient") + logger.info(f"Project endpoint for tracing credential: {self.credentials}") async def setup_async(): async with AzureAIClient( - project_endpoint=project_endpoint, async_credential=self.credentials + project_endpoint=project_endpoint, + async_credential=self.credentials, + credential=self.credentials, ) as agent_client: await agent_client.setup_azure_ai_observability() @@ -225,9 +231,20 @@ async def agent_run( # pylint: disable=too-many-statements logger.info(f"Starting agent_run with stream={context.stream}") request_input = context.request.get("input") - - input_converter = AgentFrameworkInputConverter(agent=agent, hitl_helper=self._hitl_helper) - message = input_converter.transform_input(request_input) + # TODO: load agent thread from storage and deserialize + agent_thread = self._agent_thread_in_memory.get(context.conversation_id, agent.get_new_thread()) + + last_checkpoint = None + if self._checkpoint_storage: + checkpoints = await self._checkpoint_storage.list_checkpoints() + last_checkpoint = checkpoints[-1] if len(checkpoints) > 0 else None + logger.info(f"Last checkpoint data: {last_checkpoint.to_dict() if last_checkpoint else 'None'}") + + input_converter = AgentFrameworkInputConverter(hitl_helper=self._hitl_helper) + message = await input_converter.transform_input( + request_input, + agent_thread=agent_thread, + checkpoint=last_checkpoint) logger.debug(f"Transformed input message type: {type(message)}") # Use split converters @@ -238,13 +255,23 @@ async def agent_run( # pylint: disable=too-many-statements async def stream_updates(): try: update_count = 0 - updates = agent.run_stream(message) + updates = agent.run_stream( + message, + thread=agent_thread, + checkpoint_storage=self._checkpoint_storage, + checkpoint_id=last_checkpoint.checkpoint_id if last_checkpoint else None, + ) async for event in streaming_converter.convert(updates): update_count += 1 yield event - + + if agent_thread: + self._agent_thread_in_memory[context.conversation_id] = agent_thread logger.info("Streaming completed with %d updates", update_count) finally: + if hasattr(agent, "pending_requests"): + logger.info("Clearing agent pending requests after streaming completed") + agent.pending_requests.clear() # Close tool_client if it was created for this request if tool_client is not None: try: @@ -258,8 +285,14 @@ async def stream_updates(): # Non-streaming path logger.info("Running agent in non-streaming mode") non_streaming_converter = AgentFrameworkOutputNonStreamingConverter(context, hitl_helper=self._hitl_helper) - result = await agent.run(message) - logger.debug(f"Agent run completed, result type: {type(result)}") + result = await agent.run(message, + thread=agent_thread, + checkpoint_storage=self._checkpoint_storage, + checkpoint_id=last_checkpoint.checkpoint_id if last_checkpoint else None, + ) + logger.info(f"Agent run completed, result type: {type(result)}") + if agent_thread: + self._agent_thread_in_memory[context.conversation_id] = agent_thread transformed_result = non_streaming_converter.transform_output_for_response(result) logger.info("Agent run and transformation completed successfully") return transformed_result @@ -281,3 +314,6 @@ async def oauth_consent_stream(error=e): logger.debug("Closed tool_client after request processing") except Exception as ex: # pylint: disable=broad-exception-caught logger.warning(f"Error closing tool_client: {ex}") + if not context.stream and hasattr(agent, "pending_requests"): + logger.info("Clearing agent pending requests after streaming completed") + # agent.pending_requests.clear() diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py index 6507f88802e4..28cc76b51c32 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py @@ -7,7 +7,13 @@ from typing import Dict, List, Optional -from agent_framework import ChatMessage, RequestInfoEvent, Role as ChatRole +from agent_framework import ( + AgentThread, + ChatMessage, + RequestInfoEvent, + Role as ChatRole, + WorkflowCheckpoint, +) from agent_framework._types import TextContent from azure.ai.agentserver.core.logger import get_logger @@ -21,13 +27,14 @@ class AgentFrameworkInputConverter: Accepts: str | List | None Returns: None | str | ChatMessage | list[str] | list[ChatMessage] """ - def __init__(self, *, agent, hitl_helper=None): - self._agent = agent + def __init__(self, *, hitl_helper=None) -> None: self._hitl_helper = hitl_helper - def transform_input( + async def transform_input( self, input: str | List[Dict] | None, + agent_thread: Optional[AgentThread] = None, + checkpoint: Optional[WorkflowCheckpoint] = None, ) -> str | ChatMessage | list[str] | list[ChatMessage] | None: logger.debug("Transforming input of type: %s", type(input)) @@ -37,11 +44,21 @@ def transform_input( if isinstance(input, str): return input - pending_requests = getattr(self._agent, 'pending_requests', {}) - if self._hitl_helper and pending_requests: - hitl_response = self._validate_and_convert_hitl_response(pending_requests, input) + if self._hitl_helper: + # load pending requests from checkpoint and thread messages if available + thread_messages = [] + if agent_thread: + thread_messages = await agent_thread.message_store.list_messages() + logger.info(f"Thread messages count: {len(thread_messages)}") + pending_hitl_requests = self._hitl_helper.get_pending_hitl_request(thread_messages, checkpoint) + logger.info(f"Pending HitL requests: {list(pending_hitl_requests.keys())}") + hitl_response = self._hitl_helper.validate_and_convert_hitl_response( + input, + pending_requests=pending_hitl_requests) + logger.info(f"HitL response validation result: {[m.to_dict() for m in hitl_response]}") if hitl_response: return hitl_response + return self._transform_input_internal(input) def _transform_input_internal( @@ -157,7 +174,9 @@ def _validate_and_convert_hitl_response( logger.warning("Function call output missing valid call_id for HitL response validation.") return None request_info = pending_request[call_id] - if not request_info or not isinstance(request_info, RequestInfoEvent): + if isinstance(request_info, dict): + request_info = RequestInfoEvent.from_dict(request_info) + if not isinstance(request_info, RequestInfoEvent): logger.warning("No valid pending request info found for call_id: %s", call_id) return None diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_state_inventory.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_state_inventory.py new file mode 100644 index 000000000000..4195f384dca1 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_state_inventory.py @@ -0,0 +1,77 @@ +from typing import Any, Optional + +from agent_framework import AgentThread, BaseAgent + + +class AgentStateInventory: + """Checkpoint inventory to manage saved states of agent threads and workflows.""" + + async def get(self, conversation_id: str) -> Optional[Any]: + """Retrieve the saved state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + """ + pass + + async def set(self, conversation_id: str, state: Any) -> None: + """Save the state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + state (Any): The state to save. + """ + pass + + +class InMemoryThreadAgentStateInventory(AgentStateInventory): + """In-memory implementation of AgentStateInventory.""" + def __init__(self, agent: BaseAgent) -> None: + self._agent = agent + self._inventory: dict[str, AgentThread] = {} + + async def get(self, conversation_id: str) -> Optional[AgentThread]: + """Retrieve the saved state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + """ + if conversation_id in self._inventory: + serialized_thread = self._inventory[conversation_id] + return await self._agent.deserialize_thread(serialized_thread) + return None + + async def set(self, conversation_id: str, state: AgentThread) -> None: + """Save the state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + state (AgentThread): The state to save. + """ + if conversation_id and state: + serialized_thread = await state.serialize() + self._inventory[conversation_id] = serialized_thread + + +class InMemoryCheckpointAgentStateInventory(AgentStateInventory): + """In-memory implementation of AgentStateInventory for workflow checkpoints.""" + def __init__(self) -> None: + self._inventory: dict[str, Any] = {} + + async def get(self, conversation_id: str) -> Optional[Any]: + """Retrieve the saved state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + """ + return self._inventory.get(conversation_id, None) + + async def set(self, conversation_id: str, state: Any) -> None: + """Save the state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + state (Any): The state to save. + """ + if conversation_id and state: + self._inventory[conversation_id] = state \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py index a3d32454efe3..30bb3aa8d9c5 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py @@ -1,14 +1,65 @@ -from typing import Any, List, Dict +from typing import Any, List, Dict, Optional, Union import json -from agent_framework import ChatMessage, FunctionResultContent, RequestInfoEvent +from agent_framework import ( + ChatMessage, + FunctionResultContent, + FunctionApprovalResponseContent, + RequestInfoEvent, + WorkflowCheckpoint, +) from agent_framework._types import UserInputRequestContents +from azure.ai.agentserver.core.logger import get_logger from azure.ai.agentserver.core.server.common.constants import HUMAN_IN_THE_LOOP_FUNCTION_NAME +logger = get_logger() class HumanInTheLoopHelper: + def get_pending_hitl_request(self, + thread_messages: List[ChatMessage] = None, + checkpoint: Optional[WorkflowCheckpoint] = None, + ) -> dict[str, Union[RequestInfoEvent, Any]]: + res = {} + # if has checkpoint (WorkflowAgent), find pending request info from checkpoint + if checkpoint and checkpoint.pending_request_info_events: + for call_id, request in checkpoint.pending_request_info_events.items(): + # find if the request is already responded in the thread messages + if isinstance(request, dict): + request_obj = RequestInfoEvent.from_dict(request) + res[call_id] = request_obj + return res + + if not thread_messages: + return res + + # if no checkpoint (Agent), find user input request and pair the feedbacks + for message in thread_messages: + for content in message.contents: + print(f" Content {type(content)}: {content.to_dict()}") + if isinstance(content, UserInputRequestContents): + # is a human input request + function_call = content.function_call + call_id = getattr(function_call, "call_id", "") + if call_id: + res[call_id] = RequestInfoEvent( + source_executor_id="agent", + request_id=call_id, + response_type=None, + request_data=function_call, + ) + elif isinstance(content, FunctionResultContent): + if content.call_id and content.call_id in res: + # remove requests that already got feedback + res.pop(content.call_id) + elif isinstance(content, FunctionApprovalResponseContent): + function_call = content.function_call + call_id = getattr(function_call, "call_id", "") + if call_id and call_id in res: + res.pop(call_id) + return res + def convert_user_input_request_content(self, content: UserInputRequestContents) -> dict: function_call = content.function_call call_id = getattr(function_call, "call_id", "") @@ -35,13 +86,34 @@ def convert_request_arguments(self, arguments: Any) -> str: arguments = str(arguments) return arguments - def convert_response(self, hitl_request: RequestInfoEvent, input: Dict) -> List[ChatMessage]: + def validate_and_convert_hitl_response(self, + input: str | List[Dict] | None, + pending_requests: Dict[str, RequestInfoEvent], + ) -> List[ChatMessage] | None: + + if input is None or isinstance(input, str): + logger.warning("Expected list input for HitL response validation, got str.") + return None + + res = [] + for item in input: + if item.get("type") != "function_call_output": + logger.warning("Expected function_call_output type for HitL response validation.") + return None + call_id = item.get("call_id", None) + if call_id and call_id in pending_requests: + res.append(self.convert_response(pending_requests[call_id], item)) + return res + + def convert_response(self, hitl_request: RequestInfoEvent, input: Dict) -> ChatMessage: response_type = hitl_request.response_type response_result = input.get("output", "") + logger.info(f"response_type {type(response_type)}: %s", response_type) if response_type and hasattr(response_type, "convert_from_payload"): response_result = response_type.convert_from_payload(input.get("output", "")) + logger.info(f"response_result {type(response_result)}: %s", response_result) response_content = FunctionResultContent( call_id=hitl_request.request_id, result=response_result, ) - return [ChatMessage(role="tool", contents=[response_content])] \ No newline at end of file + return ChatMessage(role="tool", contents=[response_content]) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/.envtemplate b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/.envtemplate new file mode 100644 index 000000000000..bd646f163bb7 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/.envtemplate @@ -0,0 +1,3 @@ +AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ +OPENAI_API_VERSION=2025-03-01-preview +AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/README.md b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/README.md new file mode 100644 index 000000000000..64f19cefcbcb --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/README.md @@ -0,0 +1,46 @@ +pip install -e src/adapter/python +# Agent Framework Sample + +This sample demonstrates how to use the agents hosting adapter with Microsoft Agent Framework. + +## Prerequisites + +> **Azure sign-in:** Run `az login` before starting the sample so `DefaultAzureCredential` can acquire a CLI token. + +### Environment Variables + +Copy `.envtemplate` to `.env` and supply: + +``` +AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ +OPENAI_API_VERSION=2025-03-01-preview +AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= +``` + +## Running the Sample + +Follow these steps from this folder: + +1) Start the agent server (defaults to 0.0.0.0:8088): + +```bash +python minimal_example.py +``` + +2) Send a non-streaming request (returns a single JSON response): + +```bash +curl -sS \ + -H "Content-Type: application/json" \ + -X POST http://localhost:8088/responses \ + -d "{\"input\":\"What's the weather like in Seattle?\",\"stream\":false}" +``` + +3) Send a streaming request (server-sent events). Use -N to disable curl buffering: + +```bash +curl -N \ + -H "Content-Type: application/json" \ + -X POST http://localhost:8088/responses \ + -d "{\"input\":\"What's the weather like in New York?\",\"stream\":true}" +``` \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/main.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/main.py new file mode 100644 index 000000000000..483919a436cb --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/main.py @@ -0,0 +1,134 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Annotated, Any, Collection +from dotenv import load_dotenv + +load_dotenv() + +from agent_framework import ChatAgent, ChatMessage, ChatMessageStoreProtocol, FunctionResultContent, ai_function +from agent_framework._threads import ChatMessageStoreState +from agent_framework._types import UserInputRequestContents +from agent_framework.azure import AzureOpenAIChatClient + +from azure.ai.agentserver.agentframework import from_agent_framework + +""" +Tool Approvals with Threads + +This sample demonstrates using tool approvals with threads. +With threads, you don't need to manually pass previous messages - +the thread stores and retrieves them automatically. +""" + + +class CustomChatMessageStore(ChatMessageStoreProtocol): + """Implementation of custom chat message store. + In real applications, this can be an implementation of relational database or vector store.""" + + def __init__(self, messages: Collection[ChatMessage] | None = None) -> None: + self._messages: list[ChatMessage] = [] + if messages: + self._messages.extend(messages) + + async def add_messages(self, messages: Collection[ChatMessage]) -> None: + self._messages.extend(messages) + + async def list_messages(self) -> list[ChatMessage]: + return self._messages + + @classmethod + async def deserialize(cls, serialized_store_state: Any, **kwargs: Any) -> "CustomChatMessageStore": + """Create a new instance from serialized state.""" + store = cls() + await store.update_from_state(serialized_store_state, **kwargs) + return store + + async def update_from_state(self, serialized_store_state: Any, **kwargs: Any) -> None: + """Update this instance from serialized state.""" + if serialized_store_state: + state = ChatMessageStoreState.from_dict(serialized_store_state, **kwargs) + if state.messages: + self._messages.extend(state.messages) + + async def serialize(self, **kwargs: Any) -> Any: + """Serialize this store's state.""" + state = ChatMessageStoreState(messages=self._messages) + return state.to_dict(**kwargs) + + +@ai_function(approval_mode="always_require") +def add_to_calendar( + event_name: Annotated[str, "Name of the event"], date: Annotated[str, "Date of the event"] +) -> str: + """Add an event to the calendar (requires approval).""" + print(f">>> EXECUTING: add_to_calendar(event_name='{event_name}', date='{date}')") + return f"Added '{event_name}' to calendar on {date}" + + +def build_agent(): + return ChatAgent( + chat_client=AzureOpenAIChatClient(), + name="CalendarAgent", + instructions="You are a helpful calendar assistant.", + tools=[add_to_calendar], + chat_message_store_factory=CustomChatMessageStore, + ) + +async def run_agent() -> None: + """Example showing approval with threads.""" + print("=== Tool Approval with Thread ===\n") + + agent = build_agent() + + thread = agent.get_new_thread() + thread_id = thread.service_thread_id + # Step 1: Agent requests to call the tool + query = "Add a dentist appointment on March 15th" + print(f"User: {query}") + result = await agent.run(query, thread=thread) + serialized_thread = await thread.serialize() + print(f"Agent: {result.to_dict()}") + print(f"Thread: {serialized_thread}\n\n") + + resume_thread = await agent.deserialize_thread(serialized_thread) + res = await resume_thread.message_store.list_messages() + print(f"Resumed thread messages: {res}") + for message in res: + print(f" Thread message {type(message)}: {message.to_dict()}") + for content in message.contents: + print(f" Content {type(content)}: {content.to_dict()}") + + # Check for approval requests + if result.user_input_requests: + for request in result.user_input_requests: + print("\nApproval needed:") + print(f" Function: {request.function_call.name}") + print(f" Arguments: {request.function_call.arguments}") + print(f" type: {type(request.function_call)}") + print(f" function arg type: {type(request.function_call.arguments)}") + + # User approves (in real app, this would be user input) + approved = True # Change to False to see rejection + print(f" Decision: {'Approved' if approved else 'Rejected'}") + + # Step 2: Send approval response + # approval_response = request.create_response(approved=approved) + #response_message = ChatMessage(role="user", contents=[approval_response]) + approval_response = FunctionResultContent( + call_id = request.function_call.call_id, + result="denied", + ) + response_message = ChatMessage(role="tool", contents=[approval_response]) + result = await agent.run(response_message, thread=resume_thread) + + print(f"Agent: {result}\n") + + +async def main() -> None: + agent = build_agent() + await from_agent_framework(agent).run_async() + +if __name__ == "__main__": + asyncio.run(main()) + # asyncio.run(run_agent()) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/requirements.txt b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/requirements.txt new file mode 100644 index 000000000000..c044abf99eb1 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/requirements.txt @@ -0,0 +1,5 @@ +python-dotenv>=1.0.0 +azure-identity +agent-framework-azure-ai +azure-ai-agentserver-core +azure-ai-agentserver-agentframework From 5818e488316d5abcd5426356a07b7b9ed006b4f3 Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Fri, 9 Jan 2026 10:07:35 -0800 Subject: [PATCH 09/11] check checkpoint status --- .../azure/ai/agentserver/agentframework/agent_framework.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py index fdbc33ce01c0..9d66e6596083 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py @@ -10,6 +10,7 @@ from agent_framework import AgentProtocol, AIFunction, InMemoryCheckpointStorage from agent_framework.azure import AzureAIClient # pylint: disable=no-name-in-module +from agent_framework._workflows import get_checkpoint_summary from opentelemetry import trace from azure.ai.agentserver.core.client.tools import OAuthConsentRequiredError @@ -239,6 +240,11 @@ async def agent_run( # pylint: disable=too-many-statements checkpoints = await self._checkpoint_storage.list_checkpoints() last_checkpoint = checkpoints[-1] if len(checkpoints) > 0 else None logger.info(f"Last checkpoint data: {last_checkpoint.to_dict() if last_checkpoint else 'None'}") + if last_checkpoint: + summary = get_checkpoint_summary(last_checkpoint) + logger.info(f"Last checkpoint summary status: {summary.status}") + if summary.status == "completed": + last_checkpoint = None # Do not resume from completed checkpoints input_converter = AgentFrameworkInputConverter(hitl_helper=self._hitl_helper) message = await input_converter.transform_input( From e8b9b422b49186dcab9cb184613fc9a116593dc3 Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Fri, 9 Jan 2026 14:12:10 -0800 Subject: [PATCH 10/11] remove unused code --- .../azure/ai/agentserver/agentframework/agent_framework.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py index 9d66e6596083..e24558d6def9 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py @@ -275,9 +275,6 @@ async def stream_updates(): self._agent_thread_in_memory[context.conversation_id] = agent_thread logger.info("Streaming completed with %d updates", update_count) finally: - if hasattr(agent, "pending_requests"): - logger.info("Clearing agent pending requests after streaming completed") - agent.pending_requests.clear() # Close tool_client if it was created for this request if tool_client is not None: try: @@ -320,6 +317,4 @@ async def oauth_consent_stream(error=e): logger.debug("Closed tool_client after request processing") except Exception as ex: # pylint: disable=broad-exception-caught logger.warning(f"Error closing tool_client: {ex}") - if not context.stream and hasattr(agent, "pending_requests"): - logger.info("Clearing agent pending requests after streaming completed") - # agent.pending_requests.clear() + From 8511e4fb963a4d72b459cbd759797e946606bd37 Mon Sep 17 00:00:00 2001 From: Lu Sun Date: Fri, 9 Jan 2026 14:12:53 -0800 Subject: [PATCH 11/11] add checkpoint storage --- .../samples/human_in_the_loop/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py index 433a3161f9b4..abc603f9be53 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py @@ -12,6 +12,7 @@ from agent_framework import ( # noqa: E402 Executor, + InMemoryCheckpointStorage, WorkflowAgent, WorkflowBuilder, WorkflowContext, @@ -114,6 +115,7 @@ def build_agent(): async def run_agent() -> None: """Run the workflow inside the agent server adapter.""" agent = build_agent() + checkpoint_storage = InMemoryCheckpointStorage() await from_agent_framework(agent).run_async() if __name__ == "__main__":