通过一个小玩具来了解 Agent
实现最基础 API 调用
通过 chat_history 记录对话历史
chat_history = [
# system : global instructions
# user : user input
# assistant: model output
{"role": "system", "content": SYSTEM_PROMPT},
]对每个用户输入或每轮思考,将所有 chat_history 重新输入给大模型,实现连续对话
组装 prompt 时,需要考虑不同提示词的顺序,否则大概率失忆
如果输入一个巨大的文本,因为选择的 LLM 支持巨大上下文,所以可以硬读,但是 Prompt 会瞬间爆炸
- 要求 Agent 在读入巨大文本时简化
- 将读入内容存入 WorkSpace,回复 "已读取并存入暂存 ID 114514",后续读取共享工作区内容,保持 Prompt 干净
提前编码可以让大模型调用的工具,如文件读写等。在 SYSTEM_PROMPT 中严格定义输出格式,根据大模型输出结果调用相应工具
对于函数,参数等,注意使用 JSON 格式化
在 SYSTEM_PROMPT 中加入 Thought-Action-Observation 循环
通过 chromadb 实现检索增强生成 (RAG: Retrieval-Augmented Generation)
- 提前在
knowledge目录下放入可以被检索的文本文件 - 将文本 (分段) 转换为高维向量 (Embedding),建立向量数据库
- 将用户输入转换为向量,匹配数据库中距离最近的片段
需要提前运行 rag_loader.py 加载文档,后续每次运行 agent.py 都不用重新加载
将工具封装在独立的 MCP Server 中,Agent 链接 Server 后自动发现并使用工具
- MCP Host: 与 LLM 交互,管理与 MCP Server 的链接
- MCP Server: 独立进程,暴露数据,函数,模板
- Transport: 用 JSON 进行 Host 和 Server 间通信
对过去代码的重构:
prompt.txt变为模板,启动 agent 时动态生成- 将 RAG 功能封装,与其他工具一起放入 MCP 工具,用
@mcp.tool()修饰
用状态机的思维管理 Agent 的逻辑,构建有状态,多角色,复杂循环
可以在关键节点设置断点,等待人工审核
- Node: 一个 python 函数,如调用 LLM,执行工具等
- Edge: 条件 / 无条件,定义下一步逻辑
- State: 一个共享数据结构,记录当前工作流状态
对过去代码的重构:
- 引入 LangGraph 抽象层,不需要 openai
- prompt 不再包含工具清单,只包含身份,原则,策略
实现模板参考 LangGraph 教程
# 1. get MCP tools
mcp_tools = (await self.mcp_session.list_tools()).tools
# 2. render tools for LLM
self.tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
}
} for tool in mcp_tools]
# 3. bind tools to the model
self.model_executor = self.model.bind_tools(self.tools)workflow = StateGraph(PlanExecuteState)
workflow.add_node("planner", self.planner)
workflow.add_node("executor", self.executor)
workflow.add_node("replanner", self.replanner)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "replanner")
def should_continue(state: PlanExecuteState):
if state.get("response"):
return "end"
return "continue"
workflow.add_conditional_edges(
"replanner",
should_continue,
{
"continue": "executor",
"end": END
}
)
return workflow.compile()# ---------- Node: planner ----------
async def planner(self, state: PlanExecuteState):
prompt = "..."
response = await self.model_planner.ainvoke([SystemMessage(content=prompt)])
return {"plan": response.steps}
# ---------- Node: replanner ----------
async def replanner(self, state: PlanExecuteState):
prompt = "..."
decision = await self.model_replanner.ainvoke([SystemMessage(content=prompt)])
if isinstance(decision.action, Response):
return {"response": decision.action.response}
else:
return {"plan": decision.action.steps}调用 MCP 工具可用以下代码
for tool_call in response.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
print(f" - Tool Call: {tool_name}({tool_args})")
tool_result = await self.mcp_session.call_tool(tool_name, arguments=tool_args)
observation = tool_result.content[0].text- 将 executor 设计为一个 sub-agent,否则会在 replanner 和 executor 中无限循环,无法理解当前 step 进度
- 善用 asyncio 防止 executor 执行时间过长
- 不再手动维护 state 的 history,使用 LangGraph 提供的持久化机制
- 为每个 session 分配一个 thread_id,自动保存每一轮 state
- 下次用相同 thread_id 调用图时,自动加载 history 和新 user_input
# build_graph
memory = MemorySaver()
return workflow.compile(checkpointer=memory)# main
config = {
"configurable": {
"thread_id": "114514"
},
"recursion_limit": 50
}
async for event in graph.astream(state, config=config):
for _, output in event.items():
if "response" in output and output["response"]:
await graph.aupdate_state(
config,
{
"global_history": [AIMessage(content=output["response"])]
}
)执行 Planner 前,搜索是否有相关技能,若有,直接输出预设步骤
执行 Replanner 后,增加 Learner 节点,泛化该次任务流程并保存