Skip to content

Commit f16c957

Browse files
Xuye (Chris) Qinwjsi
andauthored
Upgrade azure-pipelines to Python 3.9 (mars-project#2862)
Co-authored-by: wjsi <[email protected]>
1 parent ba0913f commit f16c957

File tree

13 files changed

+123
-85
lines changed

13 files changed

+123
-85
lines changed

.github/workflows/docker-cd.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ jobs:
2626
shell: bash
2727
env:
2828
DOCKER_ORG: ${{ secrets.DOCKERHUB_USERNAME }}
29+
if: ${{ github.repository == 'mars-project/mars' }}
2930
run: |
3031
source ./ci/reload-env.sh
3132

azure-pipelines.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
mars.test.module: 'tensor'
2424

2525
variables:
26-
PYTHON: '3.8'
26+
PYTHON: '3.9'
2727

2828
steps:
2929
- powershell: |
@@ -120,7 +120,7 @@ jobs:
120120
vmImage: 'ubuntu-latest'
121121

122122
variables:
123-
PYTHON: '3.8'
123+
PYTHON: '3.9'
124124

125125
steps:
126126
- bash: |

mars/core/entity/executable.py

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import atexit
1617
import concurrent.futures
18+
import queue
1719
import threading
1820
from typing import List
1921
from weakref import WeakKeyDictionary, ref
@@ -23,7 +25,55 @@
2325
from ..mode import enter_mode
2426

2527

26-
_decref_pool = concurrent.futures.ThreadPoolExecutor()
28+
class DecrefRunner:
29+
def __init__(self):
30+
self._decref_thread = None
31+
self._queue = queue.Queue()
32+
33+
def start(self):
34+
self._decref_thread = threading.Thread(
35+
target=self._thread_body, name="DecrefThread"
36+
)
37+
self._decref_thread.daemon = True
38+
self._decref_thread.start()
39+
40+
def _thread_body(self):
41+
from ...deploy.oscar.session import SyncSession
42+
43+
while True:
44+
key, session_ref, fut = self._queue.get()
45+
if key is None:
46+
break
47+
48+
session = session_ref()
49+
if session is None:
50+
fut.set_result(None)
51+
continue
52+
try:
53+
s = SyncSession.from_isolated_session(session)
54+
s.decref(key)
55+
fut.set_result(None)
56+
except (RuntimeError, ConnectionError, KeyError):
57+
fut.set_result(None)
58+
except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except
59+
fut.set_exception(ex)
60+
61+
def stop(self):
62+
if self._decref_thread: # pragma: no branch
63+
self._queue.put_nowait((None, None, None))
64+
self._decref_thread.join(1)
65+
66+
def put(self, key: str, session_ref: ref):
67+
if self._decref_thread is None:
68+
self.start()
69+
70+
fut = concurrent.futures.Future()
71+
self._queue.put_nowait((key, session_ref, fut))
72+
return fut
73+
74+
75+
_decref_runner = DecrefRunner()
76+
atexit.register(_decref_runner.stop)
2777

2878

2979
class _TileableSession:
@@ -38,18 +88,7 @@ def cb(_, sess=ref(session)):
3888
# isolation destroyed, no need to decref
3989
return
4090

41-
def decref():
42-
from ...deploy.oscar.session import SyncSession
43-
44-
s = sess()
45-
if s:
46-
try:
47-
s = SyncSession.from_isolated_session(s)
48-
s.decref(key)
49-
except (RuntimeError, ConnectionError, KeyError):
50-
pass
51-
52-
fut = _decref_pool.submit(decref)
91+
fut = _decref_runner.put(key, sess)
5392
if not decref_in_isolation:
5493
# if decref in isolation, means that this tileable
5594
# is not required for main thread, thus we do not need

mars/core/graph/tests/test_graph.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def test_to_dot():
105105

106106
dot = str(graph.to_dot(trunc_key=5))
107107
try:
108-
assert all(str(n.op.key)[5] in dot for n in graph) is True
108+
assert all(str(n.key)[5] in dot for n in graph) is True
109109
except AssertionError:
110110
graph_reprs = []
111111
for n in graph:
@@ -117,4 +117,6 @@ def test_to_dot():
117117
dot,
118118
"\n".join(graph_reprs),
119119
)
120+
missing_prefix = next(str(n.key)[5] not in dot for n in graph)
121+
logging.error("Missing prefix %s", missing_prefix)
120122
raise

mars/deploy/oscar/session.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1819,8 +1819,10 @@ def _attach_session(future: asyncio.Future):
18191819
execution_info, progress_bar, progress_update_interval, cancelled
18201820
)
18211821
else:
1822+
exec_task = asyncio.ensure_future(execution_info)
1823+
cancel_task = asyncio.ensure_future(cancelled.wait())
18221824
await asyncio.wait(
1823-
[execution_info, cancelled.wait()], return_when=asyncio.FIRST_COMPLETED
1825+
[exec_task, cancel_task], return_when=asyncio.FIRST_COMPLETED
18241826
)
18251827
if cancelled.is_set():
18261828
execution_info.remove_done_callback(_attach_session)

mars/oscar/backends/mars/tests/test_mars_actor_context.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -450,12 +450,6 @@ async def test_mars_destroy_has_actor(actor_pool_context):
450450
assert not await mo.has_actor(ref1)
451451
assert not await mo.has_actor(ref2)
452452

453-
# the lru_cache on _ExtensibleAccessor.__get__ will reference all the
454-
# decorated actors, so we have to clear the cache here.
455-
#
456-
# there will be memory leak if the actor create and destroy multiple times.
457-
DummyActor.__dict__["add"].__get__.__func__.cache_clear()
458-
459453
if isinstance(ref2, LocalActorRef):
460454
assert "weakref" in str(ref2)
461455
assert "dead" in str(ref2)

mars/oscar/batch.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
# limitations under the License.
1616

1717
import asyncio
18-
import functools
1918
import inspect
2019
import textwrap
2120
from collections import namedtuple
@@ -198,7 +197,6 @@ def batch(self, func: Callable):
198197
self.batch_func = func
199198
return self
200199

201-
@functools.lru_cache(1000)
202200
def __get__(self, instance, owner):
203201
if instance is None:
204202
# calling from class

mars/services/cluster/api/oscar.py

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -317,42 +317,43 @@ async def create(cls: Type[APIType], address: str, **kw) -> APIType:
317317
from ..supervisor.node_info import NodeInfoCollectorActor
318318
from ..uploader import NodeInfoUploaderActor
319319

320+
create_actor_coros = [
321+
mo.create_actor(
322+
SupervisorPeerLocatorActor,
323+
"fixed",
324+
address,
325+
uid=SupervisorPeerLocatorActor.default_uid(),
326+
address=address,
327+
),
328+
mo.create_actor(
329+
NodeInfoCollectorActor,
330+
uid=NodeInfoCollectorActor.default_uid(),
331+
address=address,
332+
),
333+
mo.create_actor(
334+
NodeAllocatorActor,
335+
"fixed",
336+
address,
337+
uid=NodeAllocatorActor.default_uid(),
338+
address=address,
339+
),
340+
mo.create_actor(
341+
NodeInfoUploaderActor,
342+
NodeRole.WORKER,
343+
interval=kw.get("upload_interval"),
344+
band_to_resource=kw.get("band_to_resource"),
345+
use_gpu=kw.get("use_gpu", False),
346+
uid=NodeInfoUploaderActor.default_uid(),
347+
address=address,
348+
),
349+
mo.create_actor(
350+
ProcessInfoManagerActor,
351+
uid=ProcessInfoManagerActor.default_uid(),
352+
address=address,
353+
),
354+
]
320355
dones, _ = await asyncio.wait(
321-
[
322-
mo.create_actor(
323-
SupervisorPeerLocatorActor,
324-
"fixed",
325-
address,
326-
uid=SupervisorPeerLocatorActor.default_uid(),
327-
address=address,
328-
),
329-
mo.create_actor(
330-
NodeInfoCollectorActor,
331-
uid=NodeInfoCollectorActor.default_uid(),
332-
address=address,
333-
),
334-
mo.create_actor(
335-
NodeAllocatorActor,
336-
"fixed",
337-
address,
338-
uid=NodeAllocatorActor.default_uid(),
339-
address=address,
340-
),
341-
mo.create_actor(
342-
NodeInfoUploaderActor,
343-
NodeRole.WORKER,
344-
interval=kw.get("upload_interval"),
345-
band_to_resource=kw.get("band_to_resource"),
346-
use_gpu=kw.get("use_gpu", False),
347-
uid=NodeInfoUploaderActor.default_uid(),
348-
address=address,
349-
),
350-
mo.create_actor(
351-
ProcessInfoManagerActor,
352-
uid=ProcessInfoManagerActor.default_uid(),
353-
address=address,
354-
),
355-
]
356+
[asyncio.ensure_future(coro) for coro in create_actor_coros]
356357
)
357358

358359
for task in dones:
@@ -371,22 +372,20 @@ async def cleanup(cls, address: str):
371372
from ..uploader import NodeInfoUploaderActor
372373
from ..supervisor.node_info import NodeInfoCollectorActor
373374

374-
await asyncio.wait(
375-
[
376-
mo.destroy_actor(
377-
mo.create_actor_ref(
378-
uid=SupervisorPeerLocatorActor.default_uid(), address=address
379-
)
380-
),
381-
mo.destroy_actor(
382-
mo.create_actor_ref(
383-
uid=NodeInfoCollectorActor.default_uid(), address=address
384-
)
385-
),
386-
mo.destroy_actor(
387-
mo.create_actor_ref(
388-
uid=NodeInfoUploaderActor.default_uid(), address=address
389-
)
390-
),
391-
]
375+
await asyncio.gather(
376+
mo.destroy_actor(
377+
mo.create_actor_ref(
378+
uid=SupervisorPeerLocatorActor.default_uid(), address=address
379+
)
380+
),
381+
mo.destroy_actor(
382+
mo.create_actor_ref(
383+
uid=NodeInfoCollectorActor.default_uid(), address=address
384+
)
385+
),
386+
mo.destroy_actor(
387+
mo.create_actor_ref(
388+
uid=NodeInfoUploaderActor.default_uid(), address=address
389+
)
390+
),
392391
)

mars/services/scheduling/supervisor/manager.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,13 @@ async def finish_subtasks(
197197
band_tasks[band] += 1
198198
await self._queueing_ref.remove_queued_subtasks(subtask_ids)
199199
if band_tasks:
200-
coros = []
200+
tasks = []
201201
for band, subtask_count in band_tasks.items():
202-
coros.append(
202+
task = asyncio.ensure_future(
203203
self._queueing_ref.submit_subtasks.tell(band, subtask_count)
204204
)
205-
await asyncio.wait(coros)
205+
tasks.append(task)
206+
await asyncio.wait(tasks)
206207

207208
def _get_subtasks_by_ids(self, subtask_ids: List[str]) -> List[Optional[Subtask]]:
208209
subtasks = []

mars/services/scheduling/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,6 @@ async def redirect_subtask_errors(actor: mo.Actor, subtasks):
5757
)
5858
)
5959
)
60-
await asyncio.wait(coros)
60+
tasks = [asyncio.ensure_future(coro) for coro in coros]
61+
await asyncio.wait(tasks)
6162
raise

0 commit comments

Comments
 (0)