-
-
Notifications
You must be signed in to change notification settings - Fork 34.5k
Expand file tree
/
Copy pathtest_object_proxy.py
More file actions
121 lines (97 loc) · 3.94 KB
/
test_object_proxy.py
File metadata and controls
121 lines (97 loc) · 3.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import unittest
from test.support import import_helper
from test.support import threading_helper
# Raise SkipTest if subinterpreters not supported.
import_helper.import_module('_interpreters')
from concurrent.interpreters import share, SharedObjectProxy
from test.test_interpreters.utils import TestBase
from threading import Barrier, Thread, Lock
from concurrent import interpreters
class SharedObjectProxyTests(TestBase):
def run_concurrently(self, func, num_threads=4):
barrier = Barrier(num_threads)
def thread():
interp = interpreters.create()
barrier.wait()
func(interp)
with threading_helper.catch_threading_exception() as cm:
with threading_helper.start_threads((Thread(target=thread) for _ in range(num_threads))):
pass
if cm.exc_value is not None:
raise cm.exc_value
def test_create(self):
proxy = share(object())
self.assertIsInstance(proxy, SharedObjectProxy)
# Shareable objects should pass through
for shareable in (None, True, False, 100, 10000, "hello", b"world", memoryview(b"test")):
self.assertTrue(interpreters.is_shareable(shareable))
with self.subTest(shareable=shareable):
not_a_proxy = share(shareable)
self.assertNotIsInstance(not_a_proxy, SharedObjectProxy)
self.assertIs(not_a_proxy, shareable)
@threading_helper.requires_working_threading()
def test_create_concurrently(self):
def thread(interp):
for iteration in range(100):
with self.subTest(iteration=iteration):
interp.exec("""if True:
from concurrent.interpreters import share
share(object())""")
self.run_concurrently(thread)
def test_access_proxy(self):
class Test:
def silly(self):
return "silly"
interp = interpreters.create()
obj = Test()
proxy = share(obj)
obj.test = "silly"
interp.prepare_main(proxy=proxy)
interp.exec("assert proxy.test == 'silly'")
interp.exec("assert isinstance(proxy.test, str)")
interp.exec("""if True:
from concurrent.interpreters import SharedObjectProxy
method = proxy.silly
assert isinstance(method, SharedObjectProxy)
assert method() == 'silly'
assert isinstance(method(), str)
""")
with self.assertRaises(interpreters.ExecutionFailed):
interp.exec("proxy.noexist")
@threading_helper.requires_working_threading()
def test_access_proxy_concurrently(self):
class Test:
def __init__(self):
self.lock = Lock()
self.value = 0
def increment(self):
with self.lock:
self.value += 1
test = Test()
proxy = share(test)
def thread(interp):
interp.prepare_main(proxy=proxy)
for _ in range(100):
interp.exec("proxy.increment()")
interp.exec("assert isinstance(proxy.value, int)")
self.run_concurrently(thread)
self.assertEqual(test.value, 400)
def test_proxy_call(self):
constant = 67 # Hilarious
def my_function(arg=1, /, *, arg2=2):
# We need the constant here to make this function unshareable.
return constant + arg + arg2
proxy = share(my_function)
self.assertIsInstance(proxy, SharedObjectProxy)
self.assertEqual(proxy(), 70)
self.assertEqual(proxy(0, arg2=1), 68)
self.assertEqual(proxy(2), 71)
interp = interpreters.create()
interp.prepare_main(proxy=proxy)
interp.exec("""if True:
assert isinstance(proxy(), int)
assert proxy() == 70
assert proxy(0, arg2=1) == 68
assert proxy(2) == 71""")
if __name__ == '__main__':
unittest.main()