forked from FoundationAgents/OpenManus
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpython_execute.py
More file actions
75 lines (65 loc) · 2.44 KB
/
python_execute.py
File metadata and controls
75 lines (65 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import multiprocessing
import sys
from io import StringIO
from typing import Dict
from app.tool.base import BaseTool
class PythonExecute(BaseTool):
"""A tool for executing Python code with timeout and safety restrictions."""
name: str = "python_execute"
description: str = "Executes Python code string. Note: Only print outputs are visible, function return values are not captured. Use print statements to see results."
parameters: dict = {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The Python code to execute.",
},
},
"required": ["code"],
}
def _run_code(self, code: str, result_dict: dict, safe_globals: dict) -> None:
original_stdout = sys.stdout
try:
output_buffer = StringIO()
sys.stdout = output_buffer
exec(code, safe_globals, safe_globals)
result_dict["observation"] = output_buffer.getvalue()
result_dict["success"] = True
except Exception as e:
result_dict["observation"] = str(e)
result_dict["success"] = False
finally:
sys.stdout = original_stdout
async def execute(
self,
code: str,
timeout: int = 5,
) -> Dict:
"""
Executes the provided Python code with a timeout.
Args:
code (str): The Python code to execute.
timeout (int): Execution timeout in seconds.
Returns:
Dict: Contains 'output' with execution output or error message and 'success' status.
"""
with multiprocessing.Manager() as manager:
result = manager.dict({"observation": "", "success": False})
if isinstance(__builtins__, dict):
safe_globals = {"__builtins__": __builtins__}
else:
safe_globals = {"__builtins__": __builtins__.__dict__.copy()}
proc = multiprocessing.Process(
target=self._run_code, args=(code, result, safe_globals)
)
proc.start()
proc.join(timeout)
# timeout process
if proc.is_alive():
proc.terminate()
proc.join(1)
return {
"observation": f"Execution timeout after {timeout} seconds",
"success": False,
}
return dict(result)