Skip to content

Commit 75e1c88

Browse files
author
Xuye (Chris) Qin
authored
Make sure errors can be raised in Actor.__pre_destroy__ (mars-project#2887)
1 parent f702803 commit 75e1c88

File tree

11 files changed

+68
-7
lines changed

11 files changed

+68
-7
lines changed

mars/core/entity/executable.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def start(self):
3939

4040
def _thread_body(self):
4141
from ...deploy.oscar.session import SyncSession
42+
from ...oscar.errors import ActorNotExist
4243

4344
while True:
4445
key, session_ref, fut = self._queue.get()
@@ -53,7 +54,7 @@ def _thread_body(self):
5354
s = SyncSession.from_isolated_session(session)
5455
s.decref(key)
5556
fut.set_result(None)
56-
except (RuntimeError, ConnectionError, KeyError):
57+
except (RuntimeError, ConnectionError, KeyError, ActorNotExist):
5758
fut.set_result(None)
5859
except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except
5960
fut.set_exception(ex)

mars/deploy/oscar/local.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,12 @@ async def _start_service(self):
245245
)
246246

247247
async def stop(self):
248+
from .session import SessionAPI
249+
250+
# delete all sessions
251+
session_api = await SessionAPI.create(self._supervisor_pool.external_address)
252+
await session_api.delete_all_sessions()
253+
248254
for worker_pool in self._worker_pools:
249255
await stop_worker(worker_pool.external_address, self._config)
250256
await stop_supervisor(self._supervisor_pool.external_address, self._config)

mars/oscar/backends/mars/tests/test_mars_actor_context.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,3 +600,21 @@ async def test_promise_chain(actor_pool_context):
600600
call_log = await promise_test_ref.get_call_log()
601601
assert len(call_log) == 2
602602
assert call_log[1][0] - call_log[0][0] < 1
603+
604+
605+
class ActorCannotDestroy(mo.Actor):
606+
async def __pre_destroy__(self):
607+
raise ValueError("Cannot destroy")
608+
609+
610+
@pytest.mark.asyncio
611+
@pytest.mark.parametrize("in_sub_pool", [True, False])
612+
async def test_error_in_pre_destroy(actor_pool_context, in_sub_pool):
613+
pool = actor_pool_context
614+
615+
strategy = None if not in_sub_pool else RandomSubPool()
616+
a = await mo.create_actor(
617+
ActorCannotDestroy, address=pool.external_address, strategy=strategy
618+
)
619+
with pytest.raises(ValueError, match="Cannot destroy"):
620+
await mo.destroy_actor(a)

mars/oscar/backends/pool.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,9 @@ async def destroy_actor(self, message: DestroyActorMessage) -> ResultMessageType
858858
return result
859859
real_actor_ref = result.result
860860
if real_actor_ref.address == self.external_address:
861-
await super().destroy_actor(message)
861+
result = await super().destroy_actor(message)
862+
if result.message_type == MessageType.error:
863+
return result
862864
del self._allocated_actors[self.external_address][real_actor_ref]
863865
return ResultMessage(
864866
message.message_id, real_actor_ref.uid, protocol=message.protocol

mars/services/session/api/core.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ async def delete_session(self, session_id: str):
5757
Session ID.
5858
"""
5959

60+
@abstractmethod
61+
async def delete_all_sessions(self):
62+
"""
63+
Delete all sessions.
64+
"""
65+
6066
@abstractmethod
6167
async def get_last_idle_time(
6268
self, session_id: Union[str, None] = None

mars/services/session/api/oscar.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ async def has_session(self, session_id: str) -> bool:
6565
async def delete_session(self, session_id: str):
6666
await self._session_manager_ref.delete_session(session_id)
6767

68+
async def delete_all_sessions(self):
69+
await self._session_manager_ref.delete_all_sessions()
70+
6871
@alru_cache(cache_exceptions=False)
6972
async def get_session_address(self, session_id: str) -> str:
7073
"""

mars/services/session/api/web.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ async def delete_session(self, session_id: str):
6767
oscar_api = await self._get_oscar_session_api()
6868
await oscar_api.delete_session(session_id)
6969

70+
@web_api("", method="delete")
71+
async def delete_all_sessions(self):
72+
oscar_api = await self._get_oscar_session_api()
73+
await oscar_api.delete_all_sessions()
74+
7075
@web_api(
7176
"(?P<session_id>[^/]+)", method="get", arg_filter={"action": "check_exist"}
7277
)
@@ -135,6 +140,10 @@ async def delete_session(self, session_id: str):
135140
addr = f"{self._address}/api/session/{session_id}"
136141
await self._request_url(path=addr, method="DELETE")
137142

143+
async def delete_all_sessions(self):
144+
addr = f"{self._address}/api/session"
145+
await self._request_url(path=addr, method="DELETE")
146+
138147
async def has_session(self, session_id: str):
139148
addr = f"{self._address}/api/session/{session_id}"
140149
params = dict(action="check_exist")

mars/services/session/supervisor/core.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ def has_session(self, session_id: str):
102102

103103
async def delete_session(self, session_id):
104104
session_actor_ref = self._session_refs.pop(session_id)
105+
await session_actor_ref.remove()
105106
await mo.destroy_actor(session_actor_ref)
106107

107108
# sync removing to other managers
@@ -113,6 +114,10 @@ async def delete_session(self, session_id):
113114
)
114115
await session_manager_ref.remove_session_ref(session_id)
115116

117+
async def delete_all_sessions(self):
118+
for session_id in list(self._session_refs):
119+
await self.delete_session(session_id)
120+
116121
async def get_last_idle_time(self, session_id=None):
117122
if session_id is not None:
118123
session = self._session_refs[session_id]
@@ -160,10 +165,12 @@ async def __post_create__(self):
160165
uid=CustomLogMetaActor.gen_uid(self._session_id),
161166
)
162167

163-
async def __pre_destroy__(self):
168+
async def remove(self):
164169
await destroy_service_session(
165170
NodeRole.SUPERVISOR, self._service_config, self._session_id, self.address
166171
)
172+
173+
async def __pre_destroy__(self):
167174
await mo.destroy_actor(self._custom_log_meta_ref)
168175

169176
async def create_services(self):

mars/services/session/tests/test_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ async def test_session_service(test_web):
6262
await session_api.delete_session(session_id)
6363
assert await session_api.has_session(session_id) is False
6464
assert await session_api.get_sessions() == []
65+
await session_api.delete_all_sessions()
66+
assert await session_api.has_session(session_id) is False
6567

6668
await stop_services(NodeRole.SUPERVISOR, config, address=pool.external_address)
6769

mars/services/subtask/worker/runner.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,16 @@ async def __post_create__(self):
5959
self._cluster_api = await ClusterAPI.create(address=self.address)
6060

6161
async def __pre_destroy__(self):
62-
await asyncio.gather(
63-
*[mo.destroy_actor(ref) for ref in self._session_id_to_processors.values()]
64-
)
62+
try:
63+
await asyncio.gather(
64+
*[
65+
mo.destroy_actor(ref)
66+
for ref in self._session_id_to_processors.values()
67+
]
68+
)
69+
except mo.ActorNotExist: # pragma: no cover
70+
# deleted, ignore
71+
pass
6572

6673
@classmethod
6774
def _get_subtask_processor_cls(cls, subtask_processor_cls):

0 commit comments

Comments
 (0)