Skip to content

Commit f62a3f8

Browse files
authored
Accurate resource management for global slot manager (mars-project#2732)
1 parent 5bdd17b commit f62a3f8

22 files changed

+463
-293
lines changed

mars/_resource.pyx

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright 1999-2021 Alibaba Group Holding Ltd.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
cdef class Resource:
16+
cdef readonly:
17+
float num_cpus
18+
float num_gpus
19+
float num_mem_bytes
20+
21+
def __init__(self, float num_cpus=0, float num_gpus=0, float num_mem_bytes=0):
22+
self.num_cpus = num_cpus
23+
self.num_gpus = num_gpus
24+
self.num_mem_bytes = num_mem_bytes
25+
26+
def __eq__(self, Resource other):
27+
return self.num_mem_bytes == other.num_mem_bytes and \
28+
self.num_gpus == other.num_gpus and \
29+
self.num_cpus == other.num_cpus
30+
31+
def __gt__(self, Resource other):
32+
return not self.__le__(other)
33+
34+
def __le__(self, Resource other):
35+
# memory first, then gpu, cpu last
36+
return self.num_mem_bytes <= other.num_mem_bytes and \
37+
self.num_gpus <= other.num_gpus and \
38+
self.num_cpus <= other.num_cpus
39+
40+
def __add__(self, Resource other):
41+
return Resource(num_cpus=self.num_cpus + other.num_cpus,
42+
num_gpus=self.num_gpus + other.num_gpus,
43+
num_mem_bytes=self.num_mem_bytes + other.num_mem_bytes)
44+
def __sub__(self, Resource other):
45+
return Resource(num_cpus=self.num_cpus - other.num_cpus,
46+
num_gpus=self.num_gpus - other.num_gpus,
47+
num_mem_bytes=self.num_mem_bytes - other.num_mem_bytes)
48+
def __neg__(self):
49+
return Resource(num_cpus=-self.num_cpus, num_gpus=-self.num_gpus, num_mem_bytes=-self.num_mem_bytes)
50+
51+
def __repr__(self):
52+
return f"Resource(num_cpus={self.num_cpus}, num_gpus={self.num_gpus}, num_mem_bytes={self.num_mem_bytes})"
53+
54+
ZeroResource = Resource(num_cpus=0, num_gpus=0, num_mem_bytes=0)

mars/resource.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@
2525
import psutil
2626

2727
from .lib import nvutils
28+
from ._resource import Resource, ZeroResource
2829
from .utils import get_bool_environ
2930

31+
Resource = Resource
32+
ZeroResource = ZeroResource
33+
3034
logger = logging.getLogger(__name__)
3135

3236
CGROUP_CPU_STAT_FILE = "/sys/fs/cgroup/cpuacct/cpuacct.usage"

mars/services/scheduling/api/oscar.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,11 @@ async def finish_subtasks(
135135
class MockSchedulingAPI(SchedulingAPI):
136136
@classmethod
137137
async def create(cls: Type[APIType], session_id: str, address: str) -> APIType:
138-
from ..supervisor import GlobalSlotManagerActor, AutoscalerActor
138+
from ..supervisor import GlobalResourceManagerActor, AutoscalerActor
139139

140140
await mo.create_actor(
141-
GlobalSlotManagerActor,
142-
uid=GlobalSlotManagerActor.default_uid(),
141+
GlobalResourceManagerActor,
142+
uid=GlobalResourceManagerActor.default_uid(),
143143
address=address,
144144
)
145145
await mo.create_actor(

mars/services/scheduling/supervisor/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from .assigner import AssignerActor
1616
from .autoscale import AutoscalerActor
17-
from .globalslot import GlobalSlotManagerActor
17+
from .globalresource import GlobalResourceManagerActor
1818
from .manager import SubtaskManagerActor
1919
from .queueing import SubtaskQueueingActor
2020
from .service import SchedulingSupervisorService

mars/services/scheduling/supervisor/autoscale.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def __init__(self, autoscale_conf: Dict[str, Any]):
3636
self._autoscale_conf = autoscale_conf
3737
self._cluster_api = None
3838
self.queueing_refs = dict()
39-
self.global_slot_ref = None
39+
self.global_resource_ref = None
4040
self._dynamic_workers: Set[str] = set()
4141

4242
async def __post_create__(self):
@@ -46,10 +46,10 @@ async def __post_create__(self):
4646
strategy_cls = getattr(importlib.import_module(module), name)
4747
else:
4848
strategy_cls = PendingTaskBacklogStrategy
49-
from ..supervisor import GlobalSlotManagerActor
49+
from ..supervisor import GlobalResourceManagerActor
5050

51-
self.global_slot_ref = await mo.actor_ref(
52-
GlobalSlotManagerActor.default_uid(), address=self.address
51+
self.global_resource_ref = await mo.actor_ref(
52+
GlobalResourceManagerActor.default_uid(), address=self.address
5353
)
5454
self._cluster_api = await ClusterAPI.create(self.address)
5555
self._strategy = await strategy_cls.create(self._autoscale_conf, self)
@@ -114,14 +114,17 @@ async def release_workers(self, addresses: List[str]):
114114
)
115115
# Ensure global_slot_manager get latest bands timely, so that we can invoke `wait_band_idle`
116116
# to ensure there won't be new tasks scheduled to the stopping worker.
117-
await self.global_slot_ref.refresh_bands()
117+
await self.global_resource_ref.refresh_bands()
118118
excluded_bands = set(b for bands in workers_bands.values() for b in bands)
119119

120120
async def release_worker(address):
121121
logger.info("Start to release worker %s.", address)
122122
worker_bands = workers_bands[address]
123123
await asyncio.gather(
124-
*[self.global_slot_ref.wait_band_idle(band) for band in worker_bands]
124+
*[
125+
self.global_resource_ref.wait_band_idle(band)
126+
for band in worker_bands
127+
]
125128
)
126129
await self._migrate_data_of_bands(worker_bands, excluded_bands)
127130
await self._cluster_api.release_worker(address)
@@ -353,7 +356,7 @@ async def _scale_out(self, queueing_refs):
353356

354357
async def _scale_in(self):
355358
idle_bands = set(
356-
await self._autoscaler.global_slot_ref.get_idle_bands(
359+
await self._autoscaler.global_resource_ref.get_idle_bands(
357360
self._worker_idle_timeout
358361
)
359362
)
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# Copyright 1999-2021 Alibaba Group Holding Ltd.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import logging
17+
import time
18+
from collections import defaultdict
19+
from typing import List, DefaultDict, Dict, Tuple
20+
21+
from .... import oscar as mo
22+
from ....resource import Resource, ZeroResource
23+
from ....typing import BandType
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
class GlobalResourceManagerActor(mo.Actor):
29+
# {(address, resource_type): {(session_id, subtask_id): Resource(...)}}
30+
_band_stid_resources: DefaultDict[BandType, Dict[Tuple[str, str], Resource]]
31+
_band_used_resources: Dict[BandType, Resource]
32+
_band_total_resources: Dict[BandType, Resource]
33+
34+
def __init__(self):
35+
self._band_stid_resources = defaultdict(dict)
36+
self._band_used_resources = defaultdict(lambda: ZeroResource)
37+
self._band_idle_start_time = dict()
38+
self._band_idle_events = dict()
39+
self._band_total_resources = dict()
40+
self._cluster_api = None
41+
self._band_watch_task = None
42+
43+
async def __post_create__(self):
44+
from ...cluster.api import ClusterAPI
45+
46+
self._cluster_api = await ClusterAPI.create(self.address)
47+
48+
async def watch_bands():
49+
async for bands in self._cluster_api.watch_all_bands():
50+
old_bands = set(self._band_total_resources.keys())
51+
await self._refresh_bands(bands)
52+
new_bands = set(bands.keys()) - old_bands
53+
for band in new_bands:
54+
self._update_band_usage(band, ZeroResource)
55+
56+
self._band_watch_task = asyncio.create_task(watch_bands())
57+
58+
async def __pre_destroy__(self):
59+
self._band_watch_task.cancel()
60+
61+
async def refresh_bands(self):
62+
bands = await self._cluster_api.get_all_bands()
63+
await self._refresh_bands(bands)
64+
65+
async def _refresh_bands(self, bands):
66+
# TODO add `num_mem_bytes` after supported report worker memory
67+
band_total_resources = {}
68+
for band, slot in bands.items():
69+
if band[1].startswith("gpu"):
70+
band_total_resources[band] = Resource(num_gpus=slot)
71+
elif band[1].startswith("numa"):
72+
band_total_resources[band] = Resource(num_cpus=slot)
73+
else:
74+
raise NotImplementedError(f"Unsupported band type {band}")
75+
self._band_total_resources = band_total_resources
76+
77+
@mo.extensible
78+
async def apply_subtask_resources(
79+
self,
80+
band: BandType,
81+
session_id: str,
82+
subtask_ids: List[str],
83+
subtask_resources: List[Resource],
84+
) -> List[str]:
85+
if (
86+
not self._band_total_resources or band not in self._band_total_resources
87+
): # pragma: no cover
88+
await self.refresh_bands()
89+
idx = 0
90+
# only ready bands will pass
91+
if band in self._band_total_resources:
92+
total_resource = self._band_total_resources[band]
93+
for stid, subtask_resource in zip(subtask_ids, subtask_resources):
94+
band_used_resource = self._band_used_resources[band]
95+
if band_used_resource + subtask_resource > total_resource:
96+
break
97+
self._band_stid_resources[band][(session_id, stid)] = subtask_resource
98+
self._update_band_usage(band, subtask_resource)
99+
idx += 1
100+
if idx == 0:
101+
logger.debug(
102+
"No resources available, status: %r, request: %r",
103+
self._band_used_resources,
104+
subtask_resources,
105+
)
106+
return subtask_ids[:idx]
107+
108+
@mo.extensible
109+
def update_subtask_resources(
110+
self, band: BandType, session_id: str, subtask_id: str, resource: Resource
111+
):
112+
session_subtask_id = (session_id, subtask_id)
113+
subtask_resources = self._band_stid_resources[band]
114+
if session_subtask_id not in subtask_resources:
115+
return
116+
117+
resource_delta = resource - subtask_resources[session_subtask_id]
118+
subtask_resources[session_subtask_id] = resource
119+
self._update_band_usage(band, resource_delta)
120+
121+
@mo.extensible
122+
def release_subtask_resource(
123+
self, band: BandType, session_id: str, subtask_id: str
124+
):
125+
# todo ensure slots released when subtasks ends in all means
126+
resource_delta = self._band_stid_resources[band].pop(
127+
(session_id, subtask_id), ZeroResource
128+
)
129+
self._update_band_usage(band, -resource_delta)
130+
131+
def _update_band_usage(self, band: BandType, band_usage_delta: Resource):
132+
self._band_used_resources[band] += band_usage_delta
133+
# some code path doesn't call `apply_subtask_resources`
134+
band_total_resource = self._band_total_resources.get(band)
135+
if (
136+
band_total_resource is not None
137+
and self._band_used_resources[band] > band_total_resource
138+
): # pragma: no cover
139+
raise Exception(
140+
f"Resource exceed: band used resource {self._band_used_resources[band]} "
141+
f"band total resource {self._band_total_resources[band]}"
142+
)
143+
if self._band_used_resources[band] <= ZeroResource:
144+
self._band_used_resources.pop(band)
145+
self._band_idle_start_time[band] = time.time()
146+
if band in self._band_idle_events:
147+
self._band_idle_events.pop(band).set()
148+
else:
149+
self._band_idle_start_time[band] = -1
150+
151+
def get_used_resources(self) -> Dict[BandType, Resource]:
152+
return self._band_used_resources
153+
154+
def get_remaining_resources(self) -> Dict[BandType, Resource]:
155+
resources = {}
156+
for band, resource in self._band_total_resources.items():
157+
used_resource = self.get_used_resources()[band]
158+
resources[band] = resource - used_resource
159+
return resources
160+
161+
async def get_idle_bands(self, idle_duration: int):
162+
"""Return a band list which all bands has been idle for at least `idle_duration` seconds."""
163+
now = time.time()
164+
idle_bands = []
165+
for band in self._band_total_resources.keys():
166+
idle_start_time = self._band_idle_start_time.get(band)
167+
if idle_start_time is None: # pragma: no cover
168+
# skip new requested band for this round scale in.
169+
self._band_idle_start_time[band] = now
170+
elif idle_start_time > 0 and now >= idle_start_time + idle_duration:
171+
idle_bands.append(band)
172+
return idle_bands
173+
174+
async def wait_band_idle(self, band: BandType):
175+
if self._band_idle_start_time[band] <= 0:
176+
if band in self._band_idle_events:
177+
event = self._band_idle_events[band]
178+
else:
179+
event = asyncio.Event()
180+
self._band_idle_events[band] = event
181+
return event.wait()

0 commit comments

Comments
 (0)