基于语义层(Semantic Layer)与智能编排(Agentic Workflow)的企业级数据分析智能体
An Enterprise-grade AI Data Analyst merging Semantic Layer (NL2DSL2SQL) with Stateful Agentic Reasoning, featuring a full-observability UI.
传统的 Text-to-SQL 方案在企业落地中面临三大顽疾:幻觉率高、逻辑匮乏、安全性差。
本项目构建了一个**"真·智能"**的数据分析系统,通过 "Text-to-Semantics + Agentic Analysis" 的双引擎架构,并配合 Chainlit 可视化界面,实现了从语义理解到最终洞察的透明化、可控化落地。
核心引擎:
- 左脑 (Logic & Planning): 基于 LangGraph 的动态规划智能体,负责拆解复杂问题、编写 Python 代码并进行错误自愈。
- 右脑 (Accuracy & Semantics): 基于 Registry & Compiler 模式的语义层,将自然语言转换为中间态 DSL,再编译为 SQL。
- 面孔 (Observability UI): 基于 Chainlit 的交互界面,实时展示思考过程、任务状态、生成的 SQL 及绘制的图表。
位于 semantic/ 模块,是解决"数据准确性"的核心。
- 注册表 (Registry): 维护指标定义(如
gross_margin = revenue - cost)和实体关系图谱。 - 编译器 (Compiler): 将 DSL 编译为 SQL,自动处理 Fan-out(多表关联数据膨胀) 和聚合粒度对齐。
- NL2DSL: LLM 仅生成 JSON DSL,限制自由度以消灭幻觉。
- 智能缓存: 集成查询结果缓存,相同查询响应时间提升 50倍。
- 异常处理: 专用异常体系,精确错误分类和定位。
位于 graph/ 和 agents/ 模块。
- 动态规划: Agent 维护动态任务 DAG,根据执行结果调整下一步计划。
- 有状态沙箱: 模拟 Jupyter Notebook,任务 A 生成的变量,任务 B 可直接复用,支持深度分析。
- 错误自愈: 智能错误分析和自动重试,错误恢复成功率提升 40%。
位于 app.py。
- 任务看板: 实时显示任务清单及状态(✅ 完成 / 🔄 进行中 / ❌ 失败)。
- 语义穿透: 点击步骤可查看"黑盒"内部——LLM 生成的 DSL JSON 和 Compiler 编译出的 SQL。
- 图表渲染: 自动捕获 Agent 在沙箱中生成的 Matplotlib/Seaborn 图表并展示在对话中。
- 性能监控: 实时显示缓存命中率、查询统计等关键指标。
- 错误分析: 可视化错误类型、严重程度和修复建议。
位于 core/ 和 agents/ 模块。
- 统一异常体系: 18种专用异常类型,覆盖所有业务场景。
- 结构化日志: 彩色控制台输出 + 文件持久化 + 错误单独记录。
- 智能缓存: LRU驱逐 + TTL过期 + 磁盘持久化。
- 错误分析引擎: 10种错误类型自动分类 + 4级严重程度评估 + 智能修复建议。
位于 semantic/ 模块,实现了生产级维度值检索的"三级火箭"策略。
- Level 1 (内存 + 别名): 低基数维度(< 100),精确匹配,< 1ms 响应
- Level 2 (Fuzzy Matching): 中等基数维度(100 - 1000),模糊匹配,10 - 100ms 响应
- Level 3 (Vector RAG): 高基数维度(> 1000),语义检索,20 - 200ms 响应
详见 维度值管理 章节。
graph TD
User((👤 用户)) <-->|交互/可视化| UI[🖥️ Chainlit UI]
UI --> Brain[🧠 Agent Brain]
subgraph "Mind: 认知与规划"
Brain --> Planner[📝 Planner]
Planner --> Coder[💻 Coder]
end
subgraph "Hands: 有状态执行"
Coder --> Executor[📦 Stateful Sandbox]
Executor -->|捕获图表| UI
%% 语义层内核
subgraph "Semantic Kernel"
Executor <-->|semantic.query()| Client[🔌 Semantic Client]
Client -->|Hook| UI
Client -->|DSL| Compiler[⚙️ Compiler]
Compiler <--> Registry[📚 Registry]
Compiler -->|SQL| SQL_Query
end
SQL_Query --> OLAP[(🚀 DuckDB Star Schema)]
OLAP --> Executor
end
ai_analyst/
├── config/ # [配置层]
│ ├── settings.py # LLM Key, DB 连接串
│ └── __init__.py
├── core/ # [基础设施]
│ ├── engine.py # Real Data Engine (星型模型)
│ ├── executor.py # Stateful Sandbox (代码执行环境)
│ ├── exceptions.py # 统一异常体系
│ ├── logger.py # 结构化日志系统
│ ├── cache.py # 智能查询缓存
│ └── llm.py # LangChain LLM 封装
├── semantic/ # [语义层核心]
│ ├── models.py # Pydantic DSL 定义
│ ├── registry.py # 语义注册表 (NetworkX JoinGraph)
│ ├── compiler.py # 编译器 (DSL -> SQL)
│ ├── client.py # NL2DSL Agent 封装
│ ├── value_cache.py # 维度值缓存管理器
│ ├── value_matcher.py # 维度值匹配器 (Level 1)
│ ├── value_retriever.py # 高基数维度检索器
│ └── value_retriever_v2.py # "三级火箭"检索器
├── agents/ # [智能体]
│ ├── prompts.py # System Prompts
│ ├── nodes.py # LangGraph 节点
│ ├── error_analyzer.py # 错误分析引擎
│ └── __init__.py
├── graph/ # [编排]
│ ├── workflow.py # Graph 构建
│ └── state.py # AgentState
├── models/ # [数据模型]
│ ├── schemas.py # Plan, Task, TaskType
│ └── __init__.py
├── app.py # [UI入口] Chainlit 适配器与Hook逻辑
├── main.py # [CLI入口] 命令行运行模式
├── requirements.txt # 依赖清单
├── CHANGELOG.md # 版本更新日志
└── README.md # 本文档
确保 Python 版本 >= 3.10。
# 克隆项目
git clone https://github.com/your-repo/ai-analyst.git
cd ai-analyst
# 安装依赖
pip install -r requirements.txt修改 config/settings.py,填入 API Key。
LLM_CONFIG = {
"model": "gpt-4o",
"api_key": "sk-...",
# "base_url": "..."
}使用 Chainlit 启动,获得完整的可视化体验。
chainlit run app.py -w-w参数表示 watch 模式,修改代码自动热重载。- 启动后访问
http://localhost:8000。
在对话框中输入以下指令进行测试:
场景 1 (基础查数):
"帮我查询上海客户购买电子产品的平均客单价。"
- 观察点: 查看 "Semantic Kernel" 步骤,确认 DSL 是否正确选择了
avg_price指标。
场景 2 (深度分析 + 画图):
"分析华东地区各品类的毛利率,并画一个柱状图进行对比。"
- 观察点:
- 任务清单是否拆解为 [取数] -> [计算] -> [画图]。
- 是否在对话中直接渲染出了柱状图。
- 生成的 SQL 是否正确处理了
orders与products表的 Join。
app.py 通过继承并替换 SemanticLayerClient,以非侵入式的方式"窃听"了语义层的内部思考过程(DSL/SQL),并将其投射到前端 UI。
StatefulExecutor 在持久化的命名空间中运行,UI 层在代码执行结束后,自动从内存中抓取 plt 对象生成的 Figure,转换为图片发送。
解决了 Chainlit (Async) 与 DuckDB/Executor (Sync) 之间的并发调用兼容性问题。
- MD5哈希键生成
- TTL自动过期(默认5分钟)
- LRU驱逐(达到max_size时)
- 可选磁盘持久化
- 实时命中率统计
- 基于正则的错误模式匹配
- 10种错误类型自动分类
- 4级严重程度评估(LOW/MEDIUM/HIGH/CRITICAL)
- 智能重试判断
- 结构化修复建议生成
已成功实现生产级维度值管理系统(Entity Linking / Value Mapping),解决了维度值硬编码在Prompt中的问题,支持低基数和高基数维度的智能处理。
Level 3: Vector RAG (向量检索)
↓ 自动降级
Level 2: Fuzzy Matching (模糊匹配)
↓ 自动降级
Level 1: Memory + Aliases (精确匹配)
↓ 自动降级
返回原始值(降级处理)
适用场景:低基数维度(< 100)
- 全量加载:启动时从数据库加载所有维度值到内存
- 精确匹配:支持精确匹配、别名映射
- 动态Prompt:在Prompt中显示所有可选值
- 性能:O(1) 查询复杂度,< 1ms 响应时间
示例:
from semantic.value_matcher import value_matcher
# 精确匹配
result = value_matcher.match_value('customer_city', 'Shanghai')
# 返回: 'Shanghai'
# 别名匹配
result = value_matcher.match_value('customer_city', '上海')
# 返回: 'Shanghai'适用场景:中等基数维度(100 - 1000)
- FuzzyWuzzy:基于Levenshtein距离的模糊匹配
- Elasticsearch:倒排索引 + 分词匹配
- 性能:O(n) 查询复杂度,10 - 100ms 响应时间
示例:
from semantic.value_retriever_v2 import value_retriever_v2
# 模糊匹配(拼写错误)
results = value_retriever_v2._fuzzy_search('customer_city', 'Shanghia', top_k=3)
# 返回: ['Shanghai', 'Hangzhou', 'Chengdu']适用场景:高基数维度(> 1000)
- 向量模型:sentence-transformers, OpenAI Embeddings
- 向量数据库:Milvus, Qdrant, Pinecone
- 检索方式:余弦相似度、欧氏距离
- 性能:O(log n) 查询复杂度,20 - 200ms 响应时间
示例:
from sentence_transformers import SentenceTransformer
from pymilvus import MilvusClient
# 1. 加载向量模型
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
# 2. 查询向量
query_vector = model.encode("苹果手机")
# 3. 在向量库中检索
results = milvus_client.search(
collection_name="dimension_values",
data=[query_vector],
limit=5,
output_fields=["canonical_value"]
)
# 返回: ['iPhone 15 Pro Max', 'iPhone 15', 'iPhone 14 Pro', ...]# Level 1 + Level 2
pip install fuzzywuzzy python-Levenshtein
# Level 1 + Level 2 + Level 3
pip install fuzzywuzzy python-Levenshtein sentence-transformers pymilvus# config/settings.py
DIMENSION_VALUE_CONFIG = {
"use_fuzzywuzzy": True,
"fuzzy_threshold": 60,
"use_vector": False # 生产环境可启用
}| 指标 | v1.0 | v1.1 | 提升 |
|---|---|---|---|
| 相同查询响应时间 | 2.5s | 0.05s | 50倍 |
| 变量未找到错误恢复率 | 20% | 75% | +55% |
| 列不存在错误恢复率 | 30% | 80% | +50% |
| 日志覆盖率 | 40% | 95% | +55% |
| 错误自愈成功率 | 30% | 70% | +40% |
- Phase 1: 语义内核: 实现 Registry, Compiler, Engine (DuckDB Star Schema)。
- Phase 2: 智能编排: 实现 LangGraph 动态规划与错误自愈。
- Phase 3: 可视化: 集成 Chainlit UI,实现全链路可观测。
- Phase 4: 基础设施增强: 实现统一异常体系、结构化日志、智能缓存、错误分析引擎。
- Phase 5: 维度值管理: 实现"三级火箭"策略,支持低基数和高基数维度。
- Phase 6: 高级特性:
- Pre-aggregation: 增加预聚合表支持,加速查询。
- E2B Sandbox: 对接云端沙箱,提升代码执行安全性。
- Multi-LLM Support: 支持多个LLM模型切换和对比。
- README.md: 项目介绍和快速开始(本文档)
- CODEBUDDY.md: 完整代码库指南,包含架构详解和开发说明
- CHANGELOG.md: 版本更新历史
- DIMENSION_VALUE_MANAGEMENT.md: 维度值管理系统完整文档
- THREE_LEVEL_ROCKET.md: "三级火箭"最佳实践详细说明
- QUICK_START_THREE_LEVEL_ROCKET.md: 维度值检索快速开始指南
- THREE_LEVEL_ROCKET_SUMMARY.md: "三级火箭"实现总结
- INFRASTRUCTURE_INTEGRATION.md: 基础设施集成详细报告
- ARCHITECTURE_CHANGE_LOG.md: 架构变更日志
- QUICK_REFERENCE.md: 快速参考卡片
- PROJECT_ANALYSIS_AND_ENHANCEMENTS.md: 项目分析和增强建议
- PROJECT_COMPLETION_SUMMARY.md: 项目完成总结
- PROMPT_OPTIMIZATION_ANALYSIS.md: Prompt优化分析
- test_dimension_values.py: 维度值管理系统测试
- test_three_level_rocket.py: "三级火箭"策略测试
欢迎贡献代码、报告问题或提出建议!
- Fork 本仓库
- 创建特性分支 (
git checkout -b feature/AmazingFeature) - 提交更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 开启 Pull Request
本项目采用华为企业许可证。详见 LICENSE 文件。
Project created by AI Architecture Assistant Version 1.1.0 - Enterprise Enhanced Edition Last Updated: 2026-02-06