-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlocal_llm_service_restful.py
More file actions
788 lines (634 loc) · 28.7 KB
/
Copy pathlocal_llm_service_restful.py
File metadata and controls
788 lines (634 loc) · 28.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
"""
Local LLM Service Layer with RESTful API
通过RESTful API调用本地模型服务,遵循消息队列架构
"""
import json
import logging
import asyncio
import aiohttp
import requests
from typing import Dict, Any, Optional, List
from datetime import datetime
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from message_queue_client import Message, create_message_queue_client
logger = logging.getLogger(__name__)
class ServiceRequest(BaseModel):
"""请求模型"""
session_id: str
context: str
messages: list
agent_name: str
max_retries: int = 3
class ServiceResponse(BaseModel):
"""响应模型"""
success: bool
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
class LocalLLMRestfulClient:
"""本地LLM RESTful API客户端 - 使用同步HTTP客户端避免异步上下文问题"""
def __init__(self, base_url: str = "http://localhost:8000", timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
def chat_completion(self, messages: List[Dict[str, str]], model: str,
max_tokens: int = 1024, temperature: float = 0.1) -> Dict[str, Any]:
"""调用本地LLM的chat completion API - 同步版本"""
try:
url = f"{self.base_url}/v1/chat/completions"
payload = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature
}
# 使用同步的requests库
response = requests.post(url, json=payload, timeout=self.timeout)
if response.status_code != 200:
raise Exception(f"LLM API error {response.status_code}: {response.text}")
return response.json()
except Exception as e:
logger.error(f"Failed to call LLM API: {e}")
raise
def health_check(self) -> bool:
"""检查LLM服务健康状态 - 同步版本"""
try:
url = f"{self.base_url}/health"
response = requests.get(url, timeout=5)
return response.status_code == 200
except:
return False
class LocalLLMRestfulService:
"""基于RESTful API的本地LLM服务基类"""
def __init__(self, service_name: str, llm_client: LocalLLMRestfulClient,
queue_type: str = "redis", queue_config: Dict[str, Any] = None):
self.service_name = service_name
self.llm_client = llm_client
self.queue_client = create_message_queue_client(queue_type, **(queue_config or {}))
def start(self):
"""启动服务并订阅消息队列"""
import sys
logger.info(f"Starting {self.service_name} service with RESTful LLM API")
sys.stderr.write(f"[LocalLLMRestfulService] Starting {self.service_name} service\n")
sys.stderr.flush()
# 订阅服务特定的队列
topic = f"{self.service_name}.requests"
sys.stderr.write(f"[LocalLLMRestfulService] Subscribing to topic: {topic}\n")
sys.stderr.flush()
self.queue_client.subscribe(topic, self._handle_request)
sys.stderr.write(f"[LocalLLMRestfulService] Subscribed to {topic}\n")
sys.stderr.flush()
# 启动消息消费
if hasattr(self.queue_client, 'start_consuming'):
sys.stderr.write(f"[LocalLLMRestfulService] Starting message consumption\n")
sys.stderr.flush()
self.queue_client.start_consuming()
sys.stderr.write(f"[LocalLLMRestfulService] Message consumption started\n")
sys.stderr.flush()
else:
sys.stderr.write(f"[LocalLLMRestfulService] WARNING: No start_consuming method!\n")
sys.stderr.flush()
def _handle_request(self, message: Message):
"""处理传入的请求消息"""
import sys
try:
sys.stderr.write(f"[{self.service_name}] 🔔 Received message: {message.message_id}\n")
sys.stderr.flush()
logger.info(f"Processing request {message.message_id} for session {message.session_id}")
# 提取请求数据
request_data = message.payload
# 处理请求 - 现在使用同步方法
result = self.process_request(request_data)
# 创建响应消息
response_data = {
"success": True,
"data": result,
"session_id": message.session_id,
"request_id": message.message_id
}
response_message = Message(
agent_name=self.service_name,
session_id=message.session_id,
payload=response_data
)
# 发布响应
self.queue_client.publish(f"{self.service_name}.responses", response_message)
logger.info(f"Published response for request {message.message_id}")
except Exception as e:
logger.error(f"Error processing request {message.message_id}: {e}")
# 发布错误响应
error_data = {
"success": False,
"error": str(e),
"session_id": message.session_id,
"request_id": message.message_id
}
error_message = Message(
agent_name=self.service_name,
session_id=message.session_id,
payload=error_data
)
self.queue_client.publish(f"{self.service_name}.errors", error_message)
def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理请求 - 由子类实现"""
raise NotImplementedError
def _parse_llm_response(self, llm_response: Dict[str, Any]) -> str:
"""解析LLM响应内容"""
try:
choices = llm_response.get("choices", [])
if choices:
message = choices[0].get("message", {})
return message.get("content", "")
return ""
except Exception as e:
logger.error(f"Failed to parse LLM response: {e}")
return ""
class LocalCoderRestfulService(LocalLLMRestfulService):
"""Coder Agent服务 - 通过RESTful API调用本地模型"""
def __init__(self, llm_client: LocalLLMRestfulClient, **kwargs):
super().__init__("coder", llm_client, **kwargs)
def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""生成代码解决方案"""
try:
context = request_data.get("context", "")
messages = request_data.get("messages", [])
# 构建系统提示
system_prompt = f"""You are a coding assistant with expertise in LCEL, LangChain expression language.
Here is a full set of LCEL documentation:
-------
{context}
-------
Answer the user question based on the above provided documentation. Ensure any code you provide can be executed
with all required imports and variables defined. Structure your answer with a description of the code solution.
Then list the imports. And finally list the functioning code block."""
# 构建消息列表 - 转换消息格式
api_messages = [{"role": "system", "content": system_prompt}]
# 转换消息格式:如果是元组列表,转换为字典格式
for msg in messages:
if isinstance(msg, tuple) and len(msg) == 2:
# 元组格式: (role, content)
role, content = msg
api_messages.append({"role": role, "content": content})
elif isinstance(msg, dict) and "role" in msg and "content" in msg:
# 已经是字典格式
api_messages.append(msg)
else:
# 未知格式,尝试作为用户消息处理
api_messages.append({"role": "user", "content": str(msg)})
# 调用本地LLM API
llm_response = self.llm_client.chat_completion(
messages=api_messages,
model="coder-model",
max_tokens=1024,
temperature=0.1
)
# 解析响应
content = self._parse_llm_response(llm_response)
# 尝试解析JSON格式的响应
try:
code_data = json.loads(content)
if isinstance(code_data, dict) and all(k in code_data for k in ["prefix", "imports", "code"]):
return code_data
except:
pass
# 如果JSON解析失败,手动提取结构
return self._extract_code_structure(content)
except Exception as e:
logger.error(f"Error generating code with RESTful LLM: {e}")
raise
def _extract_code_structure(self, text: str) -> Dict[str, str]:
"""从文本中提取代码结构"""
lines = text.split('\n')
prefix = []
imports = []
code = []
current_section = "prefix"
for line in lines:
line = line.strip()
if line.startswith("import ") or line.startswith("from "):
if current_section == "prefix":
current_section = "imports"
imports.append(line)
elif current_section == "prefix" and line:
prefix.append(line)
elif current_section in ["imports", "code"] and line:
code.append(line)
return {
"prefix": " ".join(prefix).strip() or "Generated code solution",
"imports": "\n".join(imports).strip(),
"code": "\n".join(code).strip() or "# Code generation failed"
}
class LocalDebuggerRestfulService(LocalLLMRestfulService):
"""Debugger Agent服务 - 修复代码错误"""
def __init__(self, llm_client: LocalLLMRestfulClient, **kwargs):
super().__init__("debugger", llm_client, **kwargs)
def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""调试并修复代码错误"""
try:
original_code = request_data.get("original_code", {})
errors = request_data.get("errors", [])
context = request_data.get("context", "")
# 构建调试提示
original_code_str = json.dumps(original_code, indent=2)
errors_str = json.dumps(errors, indent=2)
system_prompt = f"""You are a debugging assistant. Given the following code that has errors,
analyze the errors and provide a corrected version of the code.
Original code:
{original_code_str}
Errors encountered:
{errors_str}
Context:
{context}
Please provide:
1. Analysis of what went wrong
2. The corrected imports
3. The corrected code block
Format your response as JSON with keys: prefix, imports, code"""
# 构建消息
api_messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Please fix the errors in this code."}
]
# 调用本地LLM API
llm_response = self.llm_client.chat_completion(
messages=api_messages,
model="debugger-model",
max_tokens=1024,
temperature=0.2
)
# 解析响应
content = self._parse_llm_response(llm_response)
# 尝试解析JSON
try:
debug_data = json.loads(content)
if isinstance(debug_data, dict) and all(k in debug_data for k in ["prefix", "imports", "code"]):
return debug_data
except:
pass
# 如果JSON解析失败,手动提取结构
return self._extract_code_structure(content)
except Exception as e:
logger.error(f"Error debugging code with RESTful LLM: {e}")
raise
class LocalCheckerRestfulService(LocalLLMRestfulService):
"""Checker Agent服务 - 通过LLM验证代码是否符合用户需求"""
def __init__(self, llm_client: LocalLLMRestfulClient, **kwargs):
super().__init__("checker", llm_client, **kwargs)
def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""使用LLM验证代码解决方案是否符合用户需求"""
import sys
try:
code_solution = request_data.get("code_solution", {})
messages = request_data.get("messages", [])
generation = request_data.get("generation", {})
sys.stderr.write(f"[Checker] 📨 收到消息数量: {len(messages)}\n")
sys.stderr.flush()
# 提取用户的原始需求
user_requirements = []
for i, msg in enumerate(messages):
sys.stderr.write(f"[Checker] 消息 {i}: type={type(msg)}, content={str(msg)[:100]}\n")
sys.stderr.flush()
# 处理 list 或 tuple 格式: [role, content] 或 (role, content)
if isinstance(msg, (tuple, list)) and len(msg) == 2:
role, content = msg
sys.stderr.write(f"[Checker] List/Tuple消息 - role: {role}, content: {content[:50]}...\n")
sys.stderr.flush()
if role == "user":
user_requirements.append(content)
# 处理 dict 格式: {"role": "user", "content": "..."}
elif isinstance(msg, dict) and msg.get("role") == "user":
user_requirements.append(msg.get("content", ""))
sys.stderr.write(f"[Checker] 提取到的用户需求数量: {len(user_requirements)}\n")
sys.stderr.flush()
original_request = user_requirements[0] if user_requirements else "No user request found"
sys.stderr.write(f"[Checker] 原始请求: {original_request[:100]}...\n")
sys.stderr.flush()
# 构建代码字符串
code_prefix = code_solution.get("prefix", "")
code_imports = code_solution.get("imports", "")
code_block = code_solution.get("code", "")
full_code = f"""
Prefix/Description: {code_prefix}
Imports:
{code_imports}
Code:
{code_block}
"""
# 构建验证提示
system_prompt = f"""You are a code validation expert. Your job is to verify whether the generated code
solution accurately addresses the user's original requirements without deviating from the intended functionality.
This is important because during debugging iterations, the code might be modified to fix errors, but those
modifications could inadvertently change the original logic or miss the user's requirements.
You need to:
1. Compare the code against the original user requirements
2. Check if the code logic matches what the user asked for
3. Verify that the code hasn't been altered in ways that deviate from the original intent
4. Ensure all user requirements are met
Provide your response as JSON with the following structure:
{{
"is_valid": true/false,
"message": "Brief validation summary",
"errors": ["list of issues if any"],
"requirements_met": ["list of requirements that are satisfied"],
"requirements_missing": ["list of requirements that are not satisfied"]
}}"""
user_prompt = f"""
Original User Request:
{original_request}
Generated Code Solution:
{full_code}
Please validate whether this code solution accurately addresses the user's requirements.
Check for:
- Does it solve the user's problem?
- Does it implement what was requested?
- Has the logic been preserved (not altered during debugging)?
- Are all requirements satisfied?
"""
# 构建消息
api_messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# 调用本地LLM API
llm_response = self.llm_client.chat_completion(
messages=api_messages,
model="checker-model",
max_tokens=1024,
temperature=0.2
)
# 解析响应
content = self._parse_llm_response(llm_response)
# 尝试解析JSON
try:
validation_result = json.loads(content)
if isinstance(validation_result, dict) and "is_valid" in validation_result:
return validation_result
except json.JSONDecodeError:
pass
# 如果JSON解析失败,构造默认响应
return {
"is_valid": "requirements are met" in content.lower() and "valid" in content.lower(),
"message": content[:200], # 取前200字符作为摘要
"errors": [] if "no issues" in content.lower() else ["Unable to parse validation details"],
"requirements_met": [],
"requirements_missing": []
}
except Exception as e:
logger.error(f"Error validating code with LLM: {e}")
raise
class LocalToolRestfulService(LocalLLMRestfulService):
"""Tool Agent服务 - 执行代码"""
def __init__(self, llm_client: LocalLLMRestfulClient, **kwargs):
super().__init__("tool", llm_client, **kwargs)
def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""执行代码并返回结果"""
try:
code = request_data.get("code", "")
imports = request_data.get("imports", "")
prefix = request_data.get("prefix", "")
logger.info(f"Tool executing code (prefix: {prefix})")
logger.debug(f"Imports: {imports}")
logger.debug(f"Code: {code[:100]}...")
execution_result = {
"success": False,
"output": "",
"error": None
}
# 执行代码
try:
# 准备执行环境
exec_globals = {}
exec_locals = {}
# 合并导入和代码一起执行
full_code = ""
if imports:
full_code += imports + "\n"
if code:
full_code += code
if full_code.strip():
exec(full_code, exec_globals, exec_locals)
execution_result["success"] = True
execution_result["output"] = f"Code executed successfully. Defined: {list(exec_locals.keys())}"
logger.info(f"✅ Code execution succeeded")
else:
execution_result["error"] = "No code to execute"
logger.warning("⚠️ No code provided for execution")
except Exception as e:
execution_result["success"] = False
execution_result["error"] = str(e)
execution_result["output"] = f"Execution failed: {str(e)}"
logger.error(f"❌ Code execution failed: {e}")
return execution_result
except Exception as e:
logger.error(f"Error in process_request: {e}")
raise
# FastAPI应用工厂函数
def create_restful_coder_app(llm_base_url: str, queue_config: Dict[str, Any] = None) -> FastAPI:
"""创建Coder服务的FastAPI应用"""
app = FastAPI(title="RESTful Coder Service", version="1.0.0")
# 初始化LLM客户端
llm_client = LocalLLMRestfulClient(llm_base_url)
service = LocalCoderRestfulService(llm_client, queue_config=queue_config)
@app.on_event("startup")
async def startup_event():
service.start()
@app.on_event("shutdown")
async def shutdown_event():
service.queue_client.close()
# 无需关闭同步客户端
@app.post("/generate", response_model=ServiceResponse)
async def generate_code(request: ServiceRequest):
try:
result = await service.process_request({
"context": request.context,
"messages": request.messages,
"session_id": request.session_id
})
return ServiceResponse(success=True, data=result)
except Exception as e:
return ServiceResponse(success=False, error=str(e))
@app.get("/health")
async def health_check():
is_healthy = service.llm_client.health_check()
return {
"status": "healthy" if is_healthy else "unhealthy",
"service": "restful_coder",
"llm_service": is_healthy
}
return app
def create_restful_checker_app(llm_base_url: str, queue_config: Dict[str, Any] = None) -> FastAPI:
"""创建Checker服务的FastAPI应用"""
app = FastAPI(title="RESTful Checker Service", version="1.0.0")
# 初始化LLM客户端
llm_client = LocalLLMRestfulClient(llm_base_url)
service = LocalCheckerRestfulService(llm_client, queue_config=queue_config)
@app.on_event("startup")
async def startup_event():
service.start()
@app.on_event("shutdown")
async def shutdown_event():
service.queue_client.close()
# 无需关闭同步客户端
@app.post("/validate", response_model=ServiceResponse)
async def validate_code(request: ServiceRequest):
try:
result = await service.process_request({
"code_solution": request.messages[0] if request.messages else {},
"session_id": request.session_id
})
return ServiceResponse(success=True, data=result)
except Exception as e:
return ServiceResponse(success=False, error=str(e))
@app.get("/health")
async def health_check():
is_healthy = service.llm_client.health_check()
return {
"status": "healthy" if is_healthy else "unhealthy",
"service": "restful_checker",
"llm_service": is_healthy
}
return app
def create_restful_debugger_app(llm_base_url: str, queue_config: Dict[str, Any] = None) -> FastAPI:
"""创建Debugger服务的FastAPI应用"""
app = FastAPI(title="RESTful Debugger Service", version="1.0.0")
# 初始化LLM客户端
llm_client = LocalLLMRestfulClient(llm_base_url)
service = LocalDebuggerRestfulService(llm_client, queue_config=queue_config)
@app.on_event("startup")
async def startup_event():
service.start()
@app.on_event("shutdown")
async def shutdown_event():
service.queue_client.close()
# 无需关闭同步客户端
@app.post("/debug", response_model=ServiceResponse)
async def debug_code(request: ServiceRequest):
try:
result = await service.process_request({
"original_code": request.messages[0] if request.messages else {},
"errors": request.messages[1] if len(request.messages) > 1 else [],
"context": request.context,
"session_id": request.session_id
})
return ServiceResponse(success=True, data=result)
except Exception as e:
return ServiceResponse(success=False, error=str(e))
@app.get("/health")
async def health_check():
is_healthy = service.llm_client.health_check()
return {
"status": "healthy" if is_healthy else "unhealthy",
"service": "restful_debugger",
"llm_service": is_healthy
}
return app
# 服务启动函数
def start_restful_coder_service(llm_base_url: str = "http://localhost:8000",
port: int = 8001,
queue_config: Dict[str, Any] = None):
"""启动Coder服务"""
import uvicorn
app = create_restful_coder_app(llm_base_url, queue_config)
uvicorn.run(app, host="0.0.0.0", port=port)
def start_restful_checker_service(llm_base_url: str = "http://localhost:8000",
port: int = 8002,
queue_config: Dict[str, Any] = None):
"""启动Checker服务"""
import uvicorn
app = create_restful_checker_app(llm_base_url, queue_config)
uvicorn.run(app, host="0.0.0.0", port=port)
def start_restful_debugger_service(llm_base_url: str = "http://localhost:8000",
port: int = 8003,
queue_config: Dict[str, Any] = None):
"""启动Debugger服务"""
import uvicorn
app = create_restful_debugger_app(llm_base_url, queue_config)
uvicorn.run(app, host="0.0.0.0", port=port)
def create_restful_tool_app(llm_base_url: str, queue_config: Dict[str, Any] = None) -> FastAPI:
"""创建Tool服务的FastAPI应用"""
app = FastAPI(title="RESTful Tool Service", version="1.0.0")
# 初始化LLM客户端(Tool服务实际不需要LLM,但为了保持接口一致)
llm_client = LocalLLMRestfulClient(llm_base_url)
service = LocalToolRestfulService(llm_client, queue_config=queue_config)
@app.on_event("startup")
async def startup_event():
print("🚀 Tool Service: Calling service.start()...")
logger.info("Tool Service: Starting and subscribing to message queue...")
service.start()
print("✅ Tool Service: service.start() completed")
@app.on_event("shutdown")
async def shutdown_event():
service.queue_client.close()
@app.post("/execute", response_model=ServiceResponse)
async def execute_code(request: ServiceRequest):
try:
# context 是 JSON 字符串,需要解析
import json
context_data = json.loads(request.context) if isinstance(request.context, str) else request.context
result = service.process_request({
"code": context_data.get("code", ""),
"imports": context_data.get("imports", ""),
"prefix": context_data.get("prefix", ""),
"session_id": request.session_id
})
return ServiceResponse(success=True, data=result)
except Exception as e:
return ServiceResponse(success=False, error=str(e))
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"service": "restful_tool"
}
return app
def start_restful_tool_service(llm_base_url: str = "http://localhost:8000",
port: int = 8004,
queue_config: Dict[str, Any] = None):
"""启动Tool服务"""
import uvicorn
import sys
sys.stderr.write("🚀 Creating Tool service app...\n")
sys.stderr.flush()
print(f"🚀 Creating Tool service app...", flush=True)
app = create_restful_tool_app(llm_base_url, queue_config)
# 手动启动服务(不依赖 FastAPI on_event)
sys.stderr.write("🚀 Manually starting tool service...\n")
sys.stderr.flush()
# 需要访问 service 实例
# 创建service并手动启动
llm_client = LocalLLMRestfulClient(llm_base_url)
sys.stderr.write(f"📦 Created LLM client\n")
sys.stderr.flush()
tool_service = LocalToolRestfulService(llm_client, queue_config=queue_config)
sys.stderr.write(f"📦 Created Tool service instance\n")
sys.stderr.flush()
sys.stderr.write(f"🚀 Starting tool service and subscribing to message queue...\n")
sys.stderr.flush()
tool_service.start()
sys.stderr.write(f"✅ Tool service started and listening for messages\n")
sys.stderr.flush()
# 将service存储在app.state中以便后续使用
app.state.tool_service = tool_service
uvicorn.run(app, host="0.0.0.0", port=port)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Start Local LLM Service")
parser.add_argument("--service", choices=["coder", "checker", "debugger", "tool"], required=True)
parser.add_argument("--llm-url", default="http://localhost:8000", help="本地LLM服务器URL")
parser.add_argument("--port", type=int, help="服务端口")
parser.add_argument("--queue-type", default="redis", choices=["redis", "rabbitmq"])
parser.add_argument("--redis-host", default="localhost")
parser.add_argument("--redis-port", type=int, default=6379)
args = parser.parse_args()
queue_config = {
"host": args.redis_host,
"port": args.redis_port
}
if args.service == "coder":
start_restful_coder_service(args.llm_url, args.port or 8001, queue_config)
elif args.service == "checker":
start_restful_checker_service(args.llm_url, args.port or 8002, queue_config)
elif args.service == "debugger":
start_restful_debugger_service(args.llm_url, args.port or 8003, queue_config)
elif args.service == "tool":
start_restful_tool_service(args.llm_url, args.port or 8004, queue_config)