-
-
Notifications
You must be signed in to change notification settings - Fork 34.5k
Expand file tree
/
Copy pathtest_preload.py
More file actions
230 lines (191 loc) · 9.43 KB
/
test_preload.py
File metadata and controls
230 lines (191 loc) · 9.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""Tests for forkserver preload functionality."""
import contextlib
import multiprocessing
import os
import shutil
import sys
import tempfile
import unittest
from multiprocessing import forkserver, spawn
class TestForkserverPreload(unittest.TestCase):
"""Tests for forkserver preload functionality."""
def setUp(self):
self._saved_warnoptions = sys.warnoptions.copy()
# Remove warning options that would convert ImportWarning to errors:
# - 'error' converts all warnings to errors
# - 'error::ImportWarning' specifically converts ImportWarning
# Keep other specific options like 'error::BytesWarning' that
# subprocess's _args_from_interpreter_flags() expects to remove
sys.warnoptions[:] = [
opt for opt in sys.warnoptions
if opt not in ('error', 'error::ImportWarning')
]
self.ctx = multiprocessing.get_context('forkserver')
forkserver._forkserver._stop()
def tearDown(self):
sys.warnoptions[:] = self._saved_warnoptions
forkserver._forkserver._stop()
@staticmethod
def _send_value(conn, value):
"""Send value through connection. Static method to be picklable as Process target."""
conn.send(value)
@contextlib.contextmanager
def capture_forkserver_stderr(self):
"""Capture stderr from forkserver by preloading a module that redirects it.
Yields (module_name, capture_file_path). The capture file can be read
after the forkserver has processed preloads. This works because
forkserver.main() calls util._flush_std_streams() after preloading,
ensuring captured output is written before we read it.
"""
tmpdir = tempfile.mkdtemp()
capture_module = os.path.join(tmpdir, '_capture_stderr.py')
capture_file = os.path.join(tmpdir, 'stderr.txt')
try:
with open(capture_module, 'w') as f:
# Use line buffering (buffering=1) to ensure warnings are written.
# Enable ImportWarning since it's ignored by default.
f.write(
f'import sys, warnings; '
f'sys.stderr = open({capture_file!r}, "w", buffering=1); '
f'warnings.filterwarnings("always", category=ImportWarning)\n'
)
sys.path.insert(0, tmpdir)
yield '_capture_stderr', capture_file
finally:
sys.path.remove(tmpdir)
shutil.rmtree(tmpdir, ignore_errors=True)
def test_preload_on_error_ignore_default(self):
"""Test that invalid modules are silently ignored by default."""
self.ctx.set_forkserver_preload(['nonexistent_module_xyz'])
r, w = self.ctx.Pipe(duplex=False)
p = self.ctx.Process(target=self._send_value, args=(w, 42))
p.start()
w.close()
result = r.recv()
r.close()
p.join()
self.assertEqual(result, 42)
self.assertEqual(p.exitcode, 0)
def test_preload_on_error_ignore_explicit(self):
"""Test that invalid modules are silently ignored with on_error='ignore'."""
self.ctx.set_forkserver_preload(['nonexistent_module_xyz'], on_error='ignore')
r, w = self.ctx.Pipe(duplex=False)
p = self.ctx.Process(target=self._send_value, args=(w, 99))
p.start()
w.close()
result = r.recv()
r.close()
p.join()
self.assertEqual(result, 99)
self.assertEqual(p.exitcode, 0)
def test_preload_on_error_warn(self):
"""Test that invalid modules emit warnings with on_error='warn'."""
with self.capture_forkserver_stderr() as (capture_mod, stderr_file):
self.ctx.set_forkserver_preload(
[capture_mod, 'nonexistent_module_xyz'], on_error='warn')
r, w = self.ctx.Pipe(duplex=False)
p = self.ctx.Process(target=self._send_value, args=(w, 123))
p.start()
w.close()
result = r.recv()
r.close()
p.join()
self.assertEqual(result, 123)
self.assertEqual(p.exitcode, 0)
with open(stderr_file) as f:
stderr_output = f.read()
self.assertIn('nonexistent_module_xyz', stderr_output)
self.assertIn('ImportWarning', stderr_output)
def test_preload_on_error_fail_breaks_context(self):
"""Test that invalid modules with on_error='fail' breaks the forkserver."""
with self.capture_forkserver_stderr() as (capture_mod, stderr_file):
self.ctx.set_forkserver_preload(
[capture_mod, 'nonexistent_module_xyz'], on_error='fail')
r, w = self.ctx.Pipe(duplex=False)
try:
p = self.ctx.Process(target=self._send_value, args=(w, 42))
with self.assertRaises((EOFError, ConnectionError, BrokenPipeError)) as cm:
p.start()
notes = getattr(cm.exception, '__notes__', [])
self.assertTrue(notes, "Expected exception to have __notes__")
self.assertIn('Forkserver process may have crashed', notes[0])
with open(stderr_file) as f:
stderr_output = f.read()
self.assertIn('nonexistent_module_xyz', stderr_output)
self.assertIn('ModuleNotFoundError', stderr_output)
finally:
w.close()
r.close()
def test_preload_valid_modules_with_on_error_fail(self):
"""Test that valid modules work fine with on_error='fail'."""
self.ctx.set_forkserver_preload(['os', 'sys'], on_error='fail')
r, w = self.ctx.Pipe(duplex=False)
p = self.ctx.Process(target=self._send_value, args=(w, 'success'))
p.start()
w.close()
result = r.recv()
r.close()
p.join()
self.assertEqual(result, 'success')
self.assertEqual(p.exitcode, 0)
def test_preload_invalid_on_error_value(self):
"""Test that invalid on_error values raise ValueError."""
with self.assertRaises(ValueError) as cm:
self.ctx.set_forkserver_preload(['os'], on_error='invalid')
self.assertIn("on_error must be 'ignore', 'warn', or 'fail'", str(cm.exception))
class TestHandlePreload(unittest.TestCase):
"""Unit tests for _handle_preload() function."""
def setUp(self):
self._saved_main = sys.modules['__main__']
def tearDown(self):
spawn.old_main_modules.clear()
sys.modules['__main__'] = self._saved_main
def test_handle_preload_main_on_error_fail(self):
"""Test that __main__ import failures raise with on_error='fail'."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f:
f.write('raise RuntimeError("test error in __main__")\n')
f.flush()
with self.assertRaises(RuntimeError) as cm:
forkserver._handle_preload(['__main__'], main_path=f.name, on_error='fail')
self.assertIn("test error in __main__", str(cm.exception))
def test_handle_preload_main_on_error_warn(self):
"""Test that __main__ import failures warn with on_error='warn'."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f:
f.write('raise ImportError("test import error")\n')
f.flush()
with self.assertWarns(ImportWarning) as cm:
forkserver._handle_preload(['__main__'], main_path=f.name, on_error='warn')
self.assertIn("Failed to preload __main__", str(cm.warning))
self.assertIn("test import error", str(cm.warning))
def test_handle_preload_main_on_error_ignore(self):
"""Test that __main__ import failures are ignored with on_error='ignore'."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f:
f.write('raise ImportError("test import error")\n')
f.flush()
forkserver._handle_preload(['__main__'], main_path=f.name, on_error='ignore')
def test_handle_preload_main_valid(self):
"""Test that valid __main__ preload works."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f:
f.write('test_var = 42\n')
f.flush()
forkserver._handle_preload(['__main__'], main_path=f.name, on_error='fail')
def test_handle_preload_module_on_error_fail(self):
"""Test that module import failures raise with on_error='fail'."""
with self.assertRaises(ModuleNotFoundError):
forkserver._handle_preload(['nonexistent_test_module_xyz'], on_error='fail')
def test_handle_preload_module_on_error_warn(self):
"""Test that module import failures warn with on_error='warn'."""
with self.assertWarns(ImportWarning) as cm:
forkserver._handle_preload(['nonexistent_test_module_xyz'], on_error='warn')
self.assertIn("Failed to preload module", str(cm.warning))
def test_handle_preload_module_on_error_ignore(self):
"""Test that module import failures are ignored with on_error='ignore'."""
forkserver._handle_preload(['nonexistent_test_module_xyz'], on_error='ignore')
def test_handle_preload_combined(self):
"""Test preloading both __main__ and modules."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as f:
f.write('import sys\n')
f.flush()
forkserver._handle_preload(['__main__', 'os', 'sys'], main_path=f.name, on_error='fail')
if __name__ == '__main__':
unittest.main()