forked from localstack/localstack
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlocalstack-supervisor
More file actions
executable file
·198 lines (157 loc) · 6.24 KB
/
Copy pathlocalstack-supervisor
File metadata and controls
executable file
·198 lines (157 loc) · 6.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python3
"""Supervisor script for managing localstack processes, acting like a mini init system tailored to
localstack. This can be used on the host or in the docker-entrypoint.sh.
The supervisor behaves as follows:
* SIGUSR1 to supervisor will terminate the localstack instance and then start a new process
* SIGTERM to supervisor will terminate the localstack instance and then return
* if the localstack instance exits, then the supervisor exits with the same exit code.
The methods ``waitpid_reap_other_children`` and ``stop_child_process`` were adapted from baseimage-docker
licensed under MIT: https://github.com/phusion/baseimage-docker/blob/rel-0.9.16/image/bin/my_init"""
import errno
import os
import signal
import subprocess
import sys
import threading
from typing import Optional
DEBUG = os.getenv("DEBUG", "").strip().lower() in ["1", "true"]
# configurable process shutdown timeout, to allow for longer shutdown procedures
DEFAULT_SHUTDOWN_TIMEOUT = int(os.getenv("SHUTDOWN_TIMEOUT", "").strip() or 5)
class AlarmException(Exception):
"""Special exception raise if SIGALRM is received."""
pass
def get_localstack_command() -> list[str]:
"""
Allow modification of the command to start LocalStack
:return: Command to start LocalStack
"""
import shlex
command = os.environ.get("LOCALSTACK_SUPERVISOR_COMMAND")
if not command:
return [sys.executable, "-m", "localstack.runtime.main"]
return shlex.split(command)
def log(message: str):
"""Prints the given message to stdout with a logging prefix."""
if not DEBUG:
return
print(f"LocalStack supervisor: {message}")
_terminated_child_processes = {}
def waitpid_reap_other_children(pid: int) -> Optional[int]:
"""
Waits for the child process with the given PID, while at the same time reaping any other child
processes that have exited (e.g. adopted child processes that have terminated).
:param pid: the pid of the process
:returns: the status of the process
"""
global _terminated_child_processes
status = _terminated_child_processes.get(pid)
if status:
# A previous call to waitpid_reap_other_children(),
# with an argument not equal to the current argument,
# already waited for this process. Return the status
# that was obtained back then.
del _terminated_child_processes[pid]
return status
done = False
status = None
while not done:
try:
this_pid, status = os.waitpid(-1, 0)
if this_pid == pid:
done = True
else:
# Save status for later.
_terminated_child_processes[this_pid] = status
except OSError as e:
if e.errno == errno.ECHILD or e.errno == errno.ESRCH:
return None
else:
raise
return status
def stop_child_process(name: str, pid: int, sig: int = signal.SIGTERM, timeout: int | None = None):
"""
Sends a signal to the given process and then waits for all child processes to avoid zombie processes.
:param name: readable process name to log
:param pid: the pid to terminate
:param sig: the signal to send to the process
:param timeout: the wait timeout
:return:
"""
log(f"Shutting down {name} (PID {pid})...")
try:
os.kill(pid, sig)
except OSError:
pass
timeout = timeout or DEFAULT_SHUTDOWN_TIMEOUT
signal.alarm(timeout)
try:
waitpid_reap_other_children(pid)
except OSError:
pass
except AlarmException:
log(f"{name} (PID {pid}) did not shut down in time. Forcing it to exit.")
try:
os.kill(pid, signal.SIGKILL)
except OSError:
pass
try:
waitpid_reap_other_children(pid)
except OSError:
pass
finally:
signal.alarm(0)
def main():
# the localstack process
process: Optional[subprocess.Popen] = None
# signal handlers set these events which further determine which actions should be taken in the main loop
should_restart = threading.Event()
# signal handlers
def _raise_alarm_exception(signum, frame):
raise AlarmException()
def _terminate_localstack(signum, frame):
if not process:
return
stop_child_process("localstack", process.pid, signal.SIGTERM)
def _restart_localstack(signum, frame):
# this handler terminates localstack but leaves the supervisor in a state to restart it
if not process:
return
should_restart.set()
stop_child_process("localstack", process.pid, signal.SIGTERM)
signal.signal(signal.SIGALRM, _raise_alarm_exception)
signal.signal(signal.SIGTERM, _terminate_localstack)
# TODO investigate: when we tried to forward SIGINT to LS, for some reason SIGINT was raised twice in LS
# yet setting this to a no-op also worked. since we couldn't really figure out what was going on, we just
# translate SIGINT to SIGTERM for the localstack process.
signal.signal(signal.SIGINT, _terminate_localstack)
signal.signal(signal.SIGUSR1, _restart_localstack)
# sets the supervisor PID so localstack can signal to it more easily
os.environ["SUPERVISOR_PID"] = str(os.getpid())
exit_code = 0
try:
log("starting")
while True:
# clear force event indicators
should_restart.clear()
# start a new localstack process
process = subprocess.Popen(
get_localstack_command(),
stdout=sys.stdout,
stderr=subprocess.STDOUT,
)
log(f"localstack process (PID {process.pid}) starting")
# wait for the localstack process to return
exit_code = process.wait()
log(f"localstack process (PID {process.pid}) returned with exit code {exit_code}")
# make sure that, if the localstack process terminates on its own accord, that we still reap all
# child processes
waitpid_reap_other_children(process.pid)
if should_restart.is_set():
continue
else:
break
finally:
log("exiting")
sys.exit(exit_code)
if __name__ == "__main__":
main()