forked from MagicStack/uvloop
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_tasks.py
More file actions
386 lines (302 loc) · 10.8 KB
/
test_tasks.py
File metadata and controls
386 lines (302 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# LICENSE: PSF.
import asyncio
from uvloop import _testbase as tb
class Dummy:
def __repr__(self):
return '<Dummy>'
def __call__(self, *args):
pass
def format_coroutine(qualname, state, src, source_traceback, generator=False):
if generator:
state = '%s' % state
else:
state = '%s, defined' % state
if source_traceback is not None:
frame = source_traceback[-1]
return ('coro=<%s() %s at %s> created at %s:%s'
% (qualname, state, src, frame[0], frame[1]))
else:
return 'coro=<%s() %s at %s>' % (qualname, state, src)
try:
all_tasks = asyncio.all_tasks
except AttributeError:
all_tasks = asyncio.Task.all_tasks
try:
current_task = asyncio.current_task
except AttributeError:
current_task = asyncio.Task.current_task
class _TestTasks:
def test_task_basics(self):
@asyncio.coroutine
def outer():
a = yield from inner1()
b = yield from inner2()
return a + b
@asyncio.coroutine
def inner1():
return 42
@asyncio.coroutine
def inner2():
return 1000
t = outer()
self.assertEqual(self.loop.run_until_complete(t), 1042)
def test_task_cancel_yield(self):
@asyncio.coroutine
def task():
while True:
yield
return 12
t = self.create_task(task())
tb.run_briefly(self.loop) # start coro
t.cancel()
self.assertRaises(
asyncio.CancelledError, self.loop.run_until_complete, t)
self.assertTrue(t.done())
self.assertTrue(t.cancelled())
self.assertFalse(t.cancel())
def test_task_cancel_inner_future(self):
f = self.create_future()
@asyncio.coroutine
def task():
yield from f
return 12
t = self.create_task(task())
tb.run_briefly(self.loop) # start task
f.cancel()
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(t)
self.assertTrue(f.cancelled())
self.assertTrue(t.cancelled())
def test_task_cancel_both_task_and_inner_future(self):
f = self.create_future()
@asyncio.coroutine
def task():
yield from f
return 12
t = self.create_task(task())
self.assertEqual(all_tasks(loop=self.loop), {t})
tb.run_briefly(self.loop)
f.cancel()
t.cancel()
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(t)
self.assertTrue(t.done())
self.assertTrue(f.cancelled())
self.assertTrue(t.cancelled())
def test_task_cancel_task_catching(self):
fut1 = self.create_future()
fut2 = self.create_future()
@asyncio.coroutine
def task():
yield from fut1
try:
yield from fut2
except asyncio.CancelledError:
return 42
t = self.create_task(task())
tb.run_briefly(self.loop)
self.assertIs(t._fut_waiter, fut1) # White-box test.
fut1.set_result(None)
tb.run_briefly(self.loop)
self.assertIs(t._fut_waiter, fut2) # White-box test.
t.cancel()
self.assertTrue(fut2.cancelled())
res = self.loop.run_until_complete(t)
self.assertEqual(res, 42)
self.assertFalse(t.cancelled())
def test_task_cancel_task_ignoring(self):
fut1 = self.create_future()
fut2 = self.create_future()
fut3 = self.create_future()
@asyncio.coroutine
def task():
yield from fut1
try:
yield from fut2
except asyncio.CancelledError:
pass
res = yield from fut3
return res
t = self.create_task(task())
tb.run_briefly(self.loop)
self.assertIs(t._fut_waiter, fut1) # White-box test.
fut1.set_result(None)
tb.run_briefly(self.loop)
self.assertIs(t._fut_waiter, fut2) # White-box test.
t.cancel()
self.assertTrue(fut2.cancelled())
tb.run_briefly(self.loop)
self.assertIs(t._fut_waiter, fut3) # White-box test.
fut3.set_result(42)
res = self.loop.run_until_complete(t)
self.assertEqual(res, 42)
self.assertFalse(fut3.cancelled())
self.assertFalse(t.cancelled())
def test_task_cancel_current_task(self):
@asyncio.coroutine
def task():
t.cancel()
self.assertTrue(t._must_cancel) # White-box test.
# The sleep should be canceled immediately.
yield from asyncio.sleep(100, loop=self.loop)
return 12
t = self.create_task(task())
self.assertRaises(
asyncio.CancelledError, self.loop.run_until_complete, t)
self.assertTrue(t.done())
self.assertFalse(t._must_cancel) # White-box test.
self.assertFalse(t.cancel())
def test_task_step_with_baseexception(self):
@asyncio.coroutine
def notmutch():
raise BaseException()
task = self.create_task(notmutch())
with self.assertRaises(BaseException):
tb.run_briefly(self.loop)
self.assertTrue(task.done())
self.assertIsInstance(task.exception(), BaseException)
def test_task_step_result_future(self):
# If coroutine returns future, task waits on this future.
class Fut(asyncio.Future):
def __init__(self, *args, **kwds):
self.cb_added = False
super().__init__(*args, **kwds)
def add_done_callback(self, fn, context=None):
self.cb_added = True
super().add_done_callback(fn)
fut = Fut(loop=self.loop)
result = None
@asyncio.coroutine
def wait_for_future():
nonlocal result
result = yield from fut
t = self.create_task(wait_for_future())
tb.run_briefly(self.loop)
self.assertTrue(fut.cb_added)
res = object()
fut.set_result(res)
tb.run_briefly(self.loop)
self.assertIs(res, result)
self.assertTrue(t.done())
self.assertIsNone(t.result())
def test_task_step_result(self):
@asyncio.coroutine
def notmuch():
yield None
yield 1
return 'ko'
self.assertRaises(
RuntimeError, self.loop.run_until_complete, notmuch())
def test_task_yield_vs_yield_from(self):
fut = asyncio.Future(loop=self.loop)
@asyncio.coroutine
def wait_for_future():
yield fut
task = wait_for_future()
with self.assertRaises(RuntimeError):
self.loop.run_until_complete(task)
self.assertFalse(fut.done())
def test_task_current_task(self):
self.assertIsNone(current_task(loop=self.loop))
@asyncio.coroutine
def coro(loop):
self.assertTrue(current_task(loop=loop) is task)
task = self.create_task(coro(self.loop))
self.loop.run_until_complete(task)
self.assertIsNone(current_task(loop=self.loop))
def test_task_current_task_with_interleaving_tasks(self):
self.assertIsNone(current_task(loop=self.loop))
fut1 = self.create_future()
fut2 = self.create_future()
@asyncio.coroutine
def coro1(loop):
self.assertTrue(current_task(loop=loop) is task1)
yield from fut1
self.assertTrue(current_task(loop=loop) is task1)
fut2.set_result(True)
@asyncio.coroutine
def coro2(loop):
self.assertTrue(current_task(loop=loop) is task2)
fut1.set_result(True)
yield from fut2
self.assertTrue(current_task(loop=loop) is task2)
task1 = self.create_task(coro1(self.loop))
task2 = self.create_task(coro2(self.loop))
self.loop.run_until_complete(asyncio.wait((task1, task2),
loop=self.loop))
self.assertIsNone(current_task(loop=self.loop))
def test_task_yield_future_passes_cancel(self):
# Canceling outer() cancels inner() cancels waiter.
proof = 0
waiter = self.create_future()
@asyncio.coroutine
def inner():
nonlocal proof
try:
yield from waiter
except asyncio.CancelledError:
proof += 1
raise
else:
self.fail('got past sleep() in inner()')
@asyncio.coroutine
def outer():
nonlocal proof
try:
yield from inner()
except asyncio.CancelledError:
proof += 100 # Expect this path.
else:
proof += 10
f = asyncio.ensure_future(outer(), loop=self.loop)
tb.run_briefly(self.loop)
f.cancel()
self.loop.run_until_complete(f)
self.assertEqual(proof, 101)
self.assertTrue(waiter.cancelled())
def test_task_yield_wait_does_not_shield_cancel(self):
# Canceling outer() makes wait() return early, leaves inner()
# running.
proof = 0
waiter = self.create_future()
@asyncio.coroutine
def inner():
nonlocal proof
yield from waiter
proof += 1
@asyncio.coroutine
def outer():
nonlocal proof
d, p = yield from asyncio.wait([inner()], loop=self.loop)
proof += 100
f = asyncio.ensure_future(outer(), loop=self.loop)
tb.run_briefly(self.loop)
f.cancel()
self.assertRaises(
asyncio.CancelledError, self.loop.run_until_complete, f)
waiter.set_result(None)
tb.run_briefly(self.loop)
self.assertEqual(proof, 1)
###############################################################################
# Tests Matrix
###############################################################################
class Test_UV_UV_Tasks(_TestTasks, tb.UVTestCase):
def create_future(self):
return self.loop.create_future()
def create_task(self, coro):
return self.loop.create_task(coro)
class Test_UV_UV_Tasks_AIO_Future(_TestTasks, tb.UVTestCase):
def create_future(self):
return asyncio.Future(loop=self.loop)
def create_task(self, coro):
return self.loop.create_task(coro)
class Test_UV_AIO_Tasks(_TestTasks, tb.UVTestCase):
def create_future(self):
return asyncio.Future(loop=self.loop)
def create_task(self, coro):
return asyncio.Task(coro, loop=self.loop)
class Test_AIO_Tasks(_TestTasks, tb.AIOTestCase):
def create_future(self):
return asyncio.Future(loop=self.loop)
def create_task(self, coro):
return asyncio.Task(coro, loop=self.loop)