forked from pypa/virtualenv
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_util.py
More file actions
36 lines (27 loc) · 975 Bytes
/
test_util.py
File metadata and controls
36 lines (27 loc) · 975 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from __future__ import annotations
import concurrent.futures
import traceback
import pytest
from virtualenv.util.lock import ReentrantFileLock
from virtualenv.util.subprocess import run_cmd
def test_run_fail(tmp_path):
code, out, err = run_cmd([str(tmp_path)])
assert err
assert not out
assert code
def test_reentrant_file_lock_is_thread_safe(tmp_path):
lock = ReentrantFileLock(tmp_path)
target_file = tmp_path / "target"
target_file.touch()
def recreate_target_file():
with lock.lock_for_key("target"):
target_file.unlink()
target_file.touch()
with concurrent.futures.ThreadPoolExecutor() as executor:
tasks = [executor.submit(recreate_target_file) for _ in range(4)]
concurrent.futures.wait(tasks)
for task in tasks:
try:
task.result()
except Exception: # noqa: BLE001, PERF203
pytest.fail(traceback.format_exc())