Skip to content

Commit 4ad5491

Browse files
authored
Raise ActorNotExist when no supervisors available (mars-project#2859)
1 parent fabe7fa commit 4ad5491

File tree

10 files changed

+97
-17
lines changed

10 files changed

+97
-17
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ test*.ipynb
6868
*.log
6969
*.swp
7070

71+
# nfs temp file
72+
.nfs*
73+
7174
# docs
7275
*.mo
7376
docs/**/generated

mars/deploy/kubernetes/tests/test_kubernetes.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,34 @@ def _build_docker_images(use_test_docker_file=True):
120120

121121

122122
def _remove_docker_image(image_name, raises=True):
123+
if "CI" not in os.environ:
124+
# delete image iff in CI environment
125+
return
123126
proc = subprocess.Popen(["docker", "rmi", "-f", image_name])
124127
if proc.wait() != 0 and raises:
125128
raise SystemError("Executing docker rmi failed.")
126129

127130

131+
def _load_docker_env():
132+
if os.path.exists("/var/run/docker.sock") or not shutil.which("minikube"):
133+
return
134+
135+
proc = subprocess.Popen(["minikube", "docker-env"], stdout=subprocess.PIPE)
136+
proc.wait(30)
137+
for line in proc.stdout:
138+
line = line.decode().split("#", 1)[0]
139+
line = line.strip() # type: str | bytes
140+
export_pos = line.find("export")
141+
if export_pos < 0:
142+
continue
143+
line = line[export_pos + 6 :].strip()
144+
var, value = line.split("=", 1)
145+
os.environ[var] = value.strip('"')
146+
147+
128148
@contextmanager
129149
def _start_kube_cluster(use_test_docker_file=True, **kwargs):
150+
_load_docker_env()
130151
image_name = _build_docker_images(use_test_docker_file=use_test_docker_file)
131152

132153
temp_spill_dir = tempfile.mkdtemp(prefix="test-mars-k8s-")
@@ -208,7 +229,7 @@ def _start_kube_cluster(use_test_docker_file=True, **kwargs):
208229
_remove_docker_image(image_name, False)
209230

210231

211-
@pytest.mark.parametrize("use_test_docker_file", [False, True])
232+
@pytest.mark.parametrize("use_test_docker_file", [True, False])
212233
@pytest.mark.skipif(not kube_available, reason="Cannot run without kubernetes")
213234
def test_run_in_kubernetes(use_test_docker_file):
214235
with _start_kube_cluster(
@@ -240,6 +261,7 @@ def test_run_in_kubernetes(use_test_docker_file):
240261
new=lambda *_, **__: None,
241262
)
242263
def test_create_timeout():
264+
_load_docker_env()
243265
api_client = k8s_config.new_client_from_config()
244266

245267
cluster = None

mars/services/cluster/api/oscar.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ async def get_supervisor_refs(self, uids: List[str]) -> List[mo.ActorRef]:
114114
references of the actors
115115
"""
116116
addrs = await self.get_supervisors_by_keys(uids)
117+
if any(addr is None for addr in addrs):
118+
none_uid = next(uid for addr, uid in zip(addrs, uids) if addr is None)
119+
raise mo.ActorNotExist(f"Actor {none_uid} not exist as no supervisors")
120+
117121
return await asyncio.gather(
118122
*[mo.actor_ref(uid, address=addr) for addr, uid in zip(addrs, uids)]
119123
)
@@ -155,7 +159,7 @@ async def get_nodes_info(
155159
detail: bool = False,
156160
statuses: Set[NodeStatus] = None,
157161
exclude_statuses: Set[NodeStatus] = None,
158-
):
162+
) -> Dict[str, Dict]:
159163
statuses = self._calc_statuses(statuses, exclude_statuses)
160164
node_info_ref = await self._get_node_info_ref()
161165
return await node_info_ref.get_nodes_info(

mars/services/cluster/backends/fixed.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import AsyncGenerator, List, Optional
15+
from typing import AsyncGenerator, List, Optional, Union
1616

1717
from ..core import NodeRole
1818
from .base import AbstractClusterBackend, register_cluster_backend
@@ -22,8 +22,10 @@
2222
class FixedClusterBackend(AbstractClusterBackend):
2323
name = "fixed"
2424

25-
def __init__(self, lookup_address: str):
26-
self._supervisors = [n.strip() for n in lookup_address.split(",")]
25+
def __init__(self, lookup_address: Union[List[str], str]):
26+
if isinstance(lookup_address, str):
27+
lookup_address = lookup_address.split(",")
28+
self._supervisors = [n.strip() for n in lookup_address]
2729

2830
@classmethod
2931
async def create(

mars/services/cluster/locator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ async def get_supervisors(self, filter_ready: bool = True):
7979

8080
@mo.extensible
8181
def get_supervisor(self, key: str, size=1):
82-
if self._supervisors is None: # pragma: no cover
82+
if not self._supervisors:
8383
return None
8484
elif size == 1:
8585
return self._hash_ring.get_node(key)

mars/services/cluster/tests/test_api.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from ....utils import get_next_port
2121
from ... import NodeRole
2222
from ...web.supervisor import WebSupervisorService
23-
from ..api import MockClusterAPI, WebClusterAPI
23+
from ..api import ClusterAPI, MockClusterAPI, WebClusterAPI
2424
from ..api.web import web_handlers
2525
from ..core import NodeStatus
2626

@@ -137,3 +137,31 @@ async def test_web_api(actor_pool):
137137
await asyncio.wait_for(wait_async_gen(web_api.watch_all_bands()), timeout=0.1)
138138

139139
await MockClusterAPI.cleanup(pool_addr)
140+
141+
142+
@pytest.mark.asyncio
143+
async def test_no_supervisor(actor_pool):
144+
pool_addr = actor_pool.external_address
145+
146+
from ..supervisor.locator import SupervisorPeerLocatorActor
147+
from ..uploader import NodeInfoUploaderActor
148+
149+
await mo.create_actor(
150+
SupervisorPeerLocatorActor,
151+
"fixed",
152+
[],
153+
uid=SupervisorPeerLocatorActor.default_uid(),
154+
address=pool_addr,
155+
)
156+
await mo.create_actor(
157+
NodeInfoUploaderActor,
158+
NodeRole.WORKER,
159+
interval=1,
160+
band_to_slots=None,
161+
use_gpu=False,
162+
uid=NodeInfoUploaderActor.default_uid(),
163+
address=pool_addr,
164+
)
165+
api = await ClusterAPI.create(address=pool_addr)
166+
with pytest.raises(mo.ActorNotExist):
167+
await api.get_supervisor_refs(["KEY"])

mars/services/scheduling/worker/tests/test_execution.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ async def __post_create__(self):
127127
async def __pre_destroy__(self):
128128
pass
129129

130+
@mo.extensible
130131
async def update_subtask_resources(
131132
self, band, session_id: str, subtask_id: str, resources: Resource
132133
):

mars/services/scheduling/worker/workerslot.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,7 @@ async def __post_create__(self):
118118
)
119119
self._fresh_slots.add(slot_id)
120120

121-
self._usage_upload_task = self.ref().upload_slot_usages.tell_delay(
122-
periodical=True, delay=1
123-
)
121+
self._upload_slot_usage_with_delay()
124122

125123
async def __pre_destroy__(self):
126124
self._usage_upload_task.cancel()
@@ -131,9 +129,12 @@ async def _get_global_resource_ref(self):
131129

132130
from ..supervisor import GlobalResourceManagerActor
133131

134-
[self._global_resource_ref] = await self._cluster_api.get_supervisor_refs(
135-
[GlobalResourceManagerActor.default_uid()]
136-
)
132+
try:
133+
[self._global_resource_ref] = await self._cluster_api.get_supervisor_refs(
134+
[GlobalResourceManagerActor.default_uid()]
135+
)
136+
except mo.ActorNotExist:
137+
self._global_resource_ref = None
137138
return self._global_resource_ref
138139

139140
def get_slot_address(self, slot_id: int):
@@ -241,10 +242,21 @@ async def restart_free_slots(self):
241242
self._restarting = False
242243
self._restart_done_event.set()
243244

245+
def _upload_slot_usage_with_delay(self, delay: int = 1):
246+
self._usage_upload_task = self.ref().upload_slot_usages.tell_delay(
247+
periodical=True, delay=delay
248+
)
249+
244250
async def upload_slot_usages(self, periodical: bool = False):
245251
delays = []
246252
slot_infos = []
247253
global_resource_ref = await self._get_global_resource_ref()
254+
255+
if global_resource_ref is None: # pragma: no cover
256+
if periodical:
257+
self._upload_slot_usage_with_delay()
258+
return
259+
248260
for slot_id, proc in self._slot_to_proc.items():
249261
if slot_id not in self._slot_to_session_stid:
250262
continue
@@ -291,9 +303,7 @@ async def upload_slot_usages(self, periodical: bool = False):
291303
await self._cluster_api.set_band_slot_infos(self._band_name, slot_infos)
292304

293305
if periodical:
294-
self._usage_upload_task = self.ref().upload_slot_usages.tell_delay(
295-
periodical=True, delay=1
296-
)
306+
self._upload_slot_usage_with_delay()
297307

298308
def dump_data(self):
299309
"""

mars/services/session/supervisor/core.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import asyncio
1616
import functools
17+
import time
1718
from typing import Dict, List, Optional
1819

1920
from .... import oscar as mo
@@ -28,9 +29,11 @@ def __init__(self, service_config: Optional[Dict] = None):
2829
self._session_refs: Dict[str, mo.ActorRef] = dict()
2930
self._cluster_api: Optional[ClusterAPI] = None
3031
self._service_config = service_config or dict()
32+
self._stored_last_idle_time = None
3133

3234
async def __post_create__(self):
3335
self._cluster_api = await ClusterAPI.create(self.address)
36+
self._stored_last_idle_time = time.time()
3437

3538
async def __pre_destroy__(self):
3639
await asyncio.gather(
@@ -124,7 +127,10 @@ async def get_last_idle_time(self, session_id=None):
124127
if any(last_idle_time is None for last_idle_time in all_last_idle_time):
125128
raise mo.Return(None)
126129
else:
127-
raise mo.Return(max(all_last_idle_time))
130+
self._stored_last_idle_time = max(
131+
[self._stored_last_idle_time] + all_last_idle_time
132+
)
133+
raise mo.Return(self._stored_last_idle_time)
128134

129135

130136
class SessionActor(mo.Actor):

mars/services/session/tests/test_service.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import threading
16+
import time
1617

1718
import pytest
1819
import numpy as np
@@ -99,7 +100,10 @@ async def test_get_last_idle_time():
99100
NodeRole.WORKER, config, address=worker_pool.external_address
100101
)
101102

103+
start_time = time.time()
102104
session_api = await SessionAPI.create(sv_pool.external_address)
105+
assert await session_api.get_last_idle_time() < start_time
106+
103107
session_id = "test_session"
104108
await session_api.create_session(session_id)
105109
# check last idle time is not None

0 commit comments

Comments
 (0)