forked from FoundationAgents/OpenManus
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbash.py
More file actions
158 lines (127 loc) · 5.62 KB
/
bash.py
File metadata and controls
158 lines (127 loc) · 5.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import asyncio
import os
from typing import Optional
from app.exceptions import ToolError
from app.tool.base import BaseTool, CLIResult
_BASH_DESCRIPTION = """Execute a bash command in the terminal.
* Long running commands: For commands that may run indefinitely, it should be run in the background and the output should be redirected to a file, e.g. command = `python3 app.py > server.log 2>&1 &`.
* Interactive: If a bash command returns exit code `-1`, this means the process is not yet finished. The assistant must then send a second call to terminal with an empty `command` (which will retrieve any additional logs), or it can send additional text (set `command` to the text) to STDIN of the running process, or it can send command=`ctrl+c` to interrupt the process.
* Timeout: If a command execution result says "Command timed out. Sending SIGINT to the process", the assistant should retry running the command in the background.
"""
class _BashSession:
"""A session of a bash shell."""
_started: bool
_process: asyncio.subprocess.Process
command: str = "/bin/bash"
_output_delay: float = 0.2 # seconds
_timeout: float = 120.0 # seconds
_sentinel: str = "<<exit>>"
def __init__(self):
self._started = False
self._timed_out = False
async def start(self):
if self._started:
return
self._process = await asyncio.create_subprocess_shell(
self.command,
preexec_fn=os.setsid,
shell=True,
bufsize=0,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._started = True
def stop(self):
"""Terminate the bash shell."""
if not self._started:
raise ToolError("Session has not started.")
if self._process.returncode is not None:
return
self._process.terminate()
async def run(self, command: str):
"""Execute a command in the bash shell."""
if not self._started:
raise ToolError("Session has not started.")
if self._process.returncode is not None:
return CLIResult(
system="tool must be restarted",
error=f"bash has exited with returncode {self._process.returncode}",
)
if self._timed_out:
raise ToolError(
f"timed out: bash has not returned in {self._timeout} seconds and must be restarted",
)
# we know these are not None because we created the process with PIPEs
assert self._process.stdin
assert self._process.stdout
assert self._process.stderr
# send command to the process
self._process.stdin.write(
command.encode() + f"; echo '{self._sentinel}'\n".encode()
)
await self._process.stdin.drain()
# read output from the process, until the sentinel is found
try:
async with asyncio.timeout(self._timeout):
while True:
await asyncio.sleep(self._output_delay)
# if we read directly from stdout/stderr, it will wait forever for
# EOF. use the StreamReader buffer directly instead.
output = (
self._process.stdout._buffer.decode()
) # pyright: ignore[reportAttributeAccessIssue]
if self._sentinel in output:
# strip the sentinel and break
output = output[: output.index(self._sentinel)]
break
except asyncio.TimeoutError:
self._timed_out = True
raise ToolError(
f"timed out: bash has not returned in {self._timeout} seconds and must be restarted",
) from None
if output.endswith("\n"):
output = output[:-1]
error = (
self._process.stderr._buffer.decode()
) # pyright: ignore[reportAttributeAccessIssue]
if error.endswith("\n"):
error = error[:-1]
# clear the buffers so that the next output can be read correctly
self._process.stdout._buffer.clear() # pyright: ignore[reportAttributeAccessIssue]
self._process.stderr._buffer.clear() # pyright: ignore[reportAttributeAccessIssue]
return CLIResult(output=output, error=error)
class Bash(BaseTool):
"""A tool for executing bash commands"""
name: str = "bash"
description: str = _BASH_DESCRIPTION
parameters: dict = {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute. Can be empty to view additional logs when previous exit code is `-1`. Can be `ctrl+c` to interrupt the currently running process.",
},
},
"required": ["command"],
}
_session: Optional[_BashSession] = None
async def execute(
self, command: str | None = None, restart: bool = False, **kwargs
) -> CLIResult:
if restart:
if self._session:
self._session.stop()
self._session = _BashSession()
await self._session.start()
return CLIResult(system="tool has been restarted.")
if self._session is None:
self._session = _BashSession()
await self._session.start()
if command is not None:
return await self._session.run(command)
raise ToolError("no command provided.")
if __name__ == "__main__":
bash = Bash()
rst = asyncio.run(bash.execute("ls -l"))
print(rst)