Skip to content

Commit 0509a44

Browse files
authored
Expand slot scheduler to resource scheduler (mars-project#2846)
1 parent 9423511 commit 0509a44

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+329
-182
lines changed

asv_bench/benchmarks/graph_assigner.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import mars.tensor as mt
1616
import mars.dataframe as md
1717
from mars.core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
18+
from mars.resource import Resource
1819
from mars.services.task.analyzer import GraphAnalyzer
1920
from mars.services.task.analyzer.assigner import GraphAssigner
2021

@@ -39,8 +40,10 @@ def setup(self):
3940

4041
def time_assigner(self):
4142
start_ops = list(GraphAnalyzer._iter_start_ops(self.chunk_graph))
42-
band_slots = {(f"worker-{i}", "numa-0"): 16 for i in range(50)}
43+
band_resource = {
44+
(f"worker-{i}", "numa-0"): Resource(num_cpus=16) for i in range(50)
45+
}
4346
current_assign = {}
44-
assigner = GraphAssigner(self.chunk_graph, start_ops, band_slots)
47+
assigner = GraphAssigner(self.chunk_graph, start_ops, band_resource)
4548
assigned_result = assigner.assign(current_assign)
4649
assert len(assigned_result) == len(start_ops)

mars/_resource.pyx

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ cdef class Resource:
1616
cdef readonly:
1717
float num_cpus
1818
float num_gpus
19-
float num_mem_bytes
19+
float mem_bytes
2020

21-
def __init__(self, float num_cpus=0, float num_gpus=0, float num_mem_bytes=0):
21+
def __init__(self, float num_cpus=0, float num_gpus=0, float mem_bytes=0):
2222
self.num_cpus = num_cpus
2323
self.num_gpus = num_gpus
24-
self.num_mem_bytes = num_mem_bytes
24+
self.mem_bytes = mem_bytes
2525

2626
def __eq__(self, Resource other):
27-
return self.num_mem_bytes == other.num_mem_bytes and \
27+
return self.mem_bytes == other.mem_bytes and \
2828
self.num_gpus == other.num_gpus and \
2929
self.num_cpus == other.num_cpus
3030

@@ -33,22 +33,22 @@ cdef class Resource:
3333

3434
def __le__(self, Resource other):
3535
# memory first, then gpu, cpu last
36-
return self.num_mem_bytes <= other.num_mem_bytes and \
36+
return self.mem_bytes <= other.mem_bytes and \
3737
self.num_gpus <= other.num_gpus and \
3838
self.num_cpus <= other.num_cpus
3939

4040
def __add__(self, Resource other):
4141
return Resource(num_cpus=self.num_cpus + other.num_cpus,
4242
num_gpus=self.num_gpus + other.num_gpus,
43-
num_mem_bytes=self.num_mem_bytes + other.num_mem_bytes)
43+
mem_bytes=self.mem_bytes + other.mem_bytes)
4444
def __sub__(self, Resource other):
4545
return Resource(num_cpus=self.num_cpus - other.num_cpus,
4646
num_gpus=self.num_gpus - other.num_gpus,
47-
num_mem_bytes=self.num_mem_bytes - other.num_mem_bytes)
47+
mem_bytes=self.mem_bytes - other.mem_bytes)
4848
def __neg__(self):
49-
return Resource(num_cpus=-self.num_cpus, num_gpus=-self.num_gpus, num_mem_bytes=-self.num_mem_bytes)
49+
return Resource(num_cpus=-self.num_cpus, num_gpus=-self.num_gpus, mem_bytes=-self.mem_bytes)
5050

5151
def __repr__(self):
52-
return f"Resource(num_cpus={self.num_cpus}, num_gpus={self.num_gpus}, num_mem_bytes={self.num_mem_bytes})"
52+
return f"Resource(num_cpus={self.num_cpus}, num_gpus={self.num_gpus}, mem_bytes={self.mem_bytes})"
5353

54-
ZeroResource = Resource(num_cpus=0, num_gpus=0, num_mem_bytes=0)
54+
ZeroResource = Resource(num_cpus=0, num_gpus=0, mem_bytes=0)

mars/deploy/kubernetes/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async def start_services(self):
3333
await start_worker(
3434
self.pool.external_address,
3535
self.args.supervisors,
36-
self.band_to_slot,
36+
self.band_to_resource,
3737
list(self.args.load_modules),
3838
self.config,
3939
mark_ready=False,

mars/deploy/oscar/local.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from ... import oscar as mo
2525
from ...core.entrypoints import init_extension_entrypoints
2626
from ...lib.aio import get_isolation, stop_isolation
27-
from ...resource import cpu_count, cuda_count
27+
from ...resource import cpu_count, cuda_count, mem_total, Resource
2828
from ...services import NodeRole
2929
from ...typing import ClusterType, ClientType
3030
from ..utils import get_third_party_modules_from_config
@@ -51,6 +51,7 @@ async def new_cluster_in_isolation(
5151
address: str = "0.0.0.0",
5252
n_worker: int = 1,
5353
n_cpu: Union[int, str] = "auto",
54+
mem_bytes: Union[int, str] = "auto",
5455
cuda_devices: Union[List[int], str] = "auto",
5556
subprocess_start_method: str = None,
5657
backend: str = None,
@@ -65,6 +66,7 @@ async def new_cluster_in_isolation(
6566
address,
6667
n_worker,
6768
n_cpu,
69+
mem_bytes,
6870
cuda_devices,
6971
subprocess_start_method,
7072
config,
@@ -79,6 +81,7 @@ async def new_cluster(
7981
address: str = "0.0.0.0",
8082
n_worker: int = 1,
8183
n_cpu: Union[int, str] = "auto",
84+
mem_bytes: Union[int, str] = "auto",
8285
cuda_devices: Union[List[int], str] = "auto",
8386
subprocess_start_method: str = None,
8487
config: Union[str, Dict] = None,
@@ -91,6 +94,7 @@ async def new_cluster(
9194
address,
9295
n_worker=n_worker,
9396
n_cpu=n_cpu,
97+
mem_bytes=mem_bytes,
9498
cuda_devices=cuda_devices,
9599
subprocess_start_method=subprocess_start_method,
96100
config=config,
@@ -116,6 +120,7 @@ def __init__(
116120
address: str = "0.0.0.0",
117121
n_worker: int = 1,
118122
n_cpu: Union[int, str] = "auto",
123+
mem_bytes: Union[int, str] = "auto",
119124
cuda_devices: Union[List[int], List[List[int]], str] = "auto",
120125
subprocess_start_method: str = None,
121126
config: Union[str, Dict] = None,
@@ -132,6 +137,7 @@ def __init__(
132137
self._subprocess_start_method = subprocess_start_method
133138
self._config = config
134139
self._n_cpu = cpu_count() if n_cpu == "auto" else n_cpu
140+
self._mem_bytes = mem_total() if mem_bytes == "auto" else mem_bytes
135141
self._n_supervisor_process = n_supervisor_process
136142
if cuda_devices == "auto":
137143
total = cuda_count()
@@ -148,19 +154,22 @@ def __init__(
148154

149155
self._n_worker = n_worker
150156
self._web = web
151-
self._bands_to_slot = bands_to_slot = []
157+
self._bands_to_resource = bands_to_resource = []
152158
worker_cpus = self._n_cpu // n_worker
153159
if sum(len(devices) for devices in devices_list) == 0:
154160
assert worker_cpus > 0, (
155161
f"{self._n_cpu} cpus are not enough "
156162
f"for {n_worker}, try to decrease workers."
157163
)
164+
mem_bytes = self._mem_bytes // n_worker
158165
for _, devices in zip(range(n_worker), devices_list):
159-
worker_band_to_slot = dict()
160-
worker_band_to_slot["numa-0"] = worker_cpus
166+
worker_band_to_resource = dict()
167+
worker_band_to_resource["numa-0"] = Resource(
168+
num_cpus=worker_cpus, mem_bytes=mem_bytes
169+
)
161170
for i in devices: # pragma: no cover
162-
worker_band_to_slot[f"gpu-{i}"] = 1
163-
bands_to_slot.append(worker_band_to_slot)
171+
worker_band_to_resource[f"gpu-{i}"] = Resource(num_gpus=1)
172+
bands_to_resource.append(worker_band_to_resource)
164173
self._supervisor_pool = None
165174
self._worker_pools = []
166175

@@ -211,10 +220,10 @@ async def _start_worker_pools(self):
211220
worker_modules = get_third_party_modules_from_config(
212221
self._config, NodeRole.WORKER
213222
)
214-
for band_to_slot in self._bands_to_slot:
223+
for band_to_resource in self._bands_to_resource:
215224
worker_pool = await create_worker_actor_pool(
216225
self._address,
217-
band_to_slot,
226+
band_to_resource,
218227
modules=worker_modules,
219228
subprocess_start_method=self._subprocess_start_method,
220229
metrics=self._config.get("metrics", {}),
@@ -225,11 +234,13 @@ async def _start_service(self):
225234
self._web = await start_supervisor(
226235
self.supervisor_address, config=self._config, web=self._web
227236
)
228-
for worker_pool, band_to_slot in zip(self._worker_pools, self._bands_to_slot):
237+
for worker_pool, band_to_resource in zip(
238+
self._worker_pools, self._bands_to_resource
239+
):
229240
await start_worker(
230241
worker_pool.external_address,
231242
self.supervisor_address,
232-
band_to_slot,
243+
band_to_resource,
233244
config=self._config,
234245
)
235246

mars/deploy/oscar/pool.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from typing import Dict, List
1717

1818
from ... import oscar as mo
19-
from ...resource import cuda_count
19+
from ...resource import cuda_count, Resource
2020

2121
try:
2222
from IPython import get_ipython
@@ -46,7 +46,7 @@ async def create_supervisor_actor_pool(
4646

4747
async def create_worker_actor_pool(
4848
address: str,
49-
band_to_slots: Dict[str, int],
49+
band_to_resource: Dict[str, Resource],
5050
n_io_process: int = 1,
5151
modules: List[str] = None,
5252
ports: List[int] = None,
@@ -55,7 +55,10 @@ async def create_worker_actor_pool(
5555
**kwargs,
5656
):
5757
# TODO: support NUMA when ready
58-
n_process = sum(slot for slot in band_to_slots.values())
58+
n_process = sum(
59+
int(resource.num_cpus) or int(resource.num_gpus)
60+
for resource in band_to_resource.values()
61+
)
5962
envs = []
6063
labels = ["main"]
6164

@@ -67,15 +70,16 @@ async def create_worker_actor_pool(
6770
cuda_devices = [int(i) for i in env_devices.split(",")]
6871

6972
i_gpu = iter(sorted(cuda_devices))
70-
for band, slot in band_to_slots.items():
73+
for band, resource in band_to_resource.items():
7174
if band.startswith("gpu"): # pragma: no cover
7275
idx = str(next(i_gpu))
7376
envs.append({"CUDA_VISIBLE_DEVICES": idx})
7477
labels.append(f"gpu-{idx}")
7578
else:
7679
assert band.startswith("numa")
77-
envs.extend([dict() for _ in range(slot)])
78-
labels.extend([band] * slot)
80+
num_cpus = int(resource.num_cpus)
81+
envs.extend([dict() for _ in range(num_cpus)])
82+
labels.extend([band] * num_cpus)
7983

8084
suspend_sigint = get_ipython is not None and get_ipython() is not None
8185
return await mo.create_actor_pool(

mars/deploy/oscar/ray.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
)
3131
from ...oscar.backends.ray.pool import RayPoolState
3232
from ...oscar.errors import ReconstructWorkerError
33+
from ...resource import Resource
3334
from ...services.cluster.backends.base import (
3435
register_cluster_backend,
3536
AbstractClusterBackend,
@@ -130,7 +131,7 @@ def get_cluster_state_ref(self):
130131
class ClusterStateActor(mo.StatelessActor):
131132
def __init__(self):
132133
self._worker_cpu, self._worker_mem, self._config = None, None, None
133-
self._pg_name, self._band_to_slot, self._worker_modules = None, None, None
134+
self._pg_name, self._band_to_resource, self._worker_modules = None, None, None
134135
self._pg_counter = itertools.count()
135136
self._worker_count = 0
136137
self._workers = {}
@@ -147,7 +148,9 @@ def set_config(self, worker_cpu, worker_mem, config):
147148
config,
148149
)
149150
# TODO(chaokunyang) Support gpu
150-
self._band_to_slot = {"numa-0": self._worker_cpu}
151+
self._band_to_resource = {
152+
"numa-0": Resource(num_cpus=self._worker_cpu, mem_bytes=self._worker_mem)
153+
}
151154
self._worker_modules = get_third_party_modules_from_config(
152155
self._config, NodeRole.WORKER
153156
)
@@ -156,11 +159,14 @@ async def request_worker(
156159
self, worker_cpu: int = None, worker_mem: int = None, timeout: int = None
157160
) -> Optional[str]:
158161
worker_cpu = worker_cpu or self._worker_cpu
162+
worker_mem = worker_mem or self._worker_mem
159163
bundle = {
160164
"CPU": worker_cpu,
161165
# "memory": worker_mem or self._worker_mem
162166
}
163-
band_to_slot = {"numa-0": worker_cpu}
167+
band_to_resource = {
168+
"numa-0": Resource(num_cpus=worker_cpu, mem_bytes=worker_mem)
169+
}
164170
start_time = time.time()
165171
logger.info("Start to request worker with resource %s.", bundle)
166172
# TODO rescale ray placement group instead of creating new placement group
@@ -193,7 +199,7 @@ async def request_worker(
193199
)
194200
worker_address = process_placement_to_address(pg_name, 0, 0)
195201
worker_pool = await self.create_worker(worker_address)
196-
await self.start_worker(worker_address, band_to_slot=band_to_slot)
202+
await self.start_worker(worker_address, band_to_resource=band_to_resource)
197203
logger.info(
198204
"Request worker %s succeeds in %.4f seconds",
199205
worker_address,
@@ -206,7 +212,7 @@ async def create_worker(self, worker_address):
206212
start_time = time.time()
207213
worker_pool = await create_worker_actor_pool(
208214
worker_address,
209-
self._band_to_slot,
215+
self._band_to_resource,
210216
modules=self._worker_modules,
211217
metrics=self._config.get("metrics", {}),
212218
)
@@ -217,12 +223,12 @@ async def create_worker(self, worker_address):
217223
)
218224
return worker_pool
219225

220-
async def start_worker(self, worker_address, band_to_slot=None):
226+
async def start_worker(self, worker_address, band_to_resource=None):
221227
self._worker_count += 1
222228
start_time = time.time()
223-
band_to_slot = band_to_slot or self._band_to_slot
229+
band_to_resource = band_to_resource or self._band_to_resource
224230
await start_worker(
225-
worker_address, self.address, band_to_slot, config=self._config
231+
worker_address, self.address, band_to_resource, config=self._config
226232
)
227233
worker_pool = ray.get_actor(worker_address)
228234
await worker_pool.mark_service_ready.remote()
@@ -290,7 +296,7 @@ async def _reconstruct_worker():
290296

291297
start_time = time.time()
292298
await start_worker(
293-
address, self.address, self._band_to_slot, config=self._config
299+
address, self.address, self._band_to_resource, config=self._config
294300
)
295301
await actor.mark_service_ready.remote()
296302
logger.info(
@@ -514,7 +520,11 @@ async def start(self):
514520
asyncio.create_task(
515521
create_worker_actor_pool(
516522
addr,
517-
{"numa-0": self._worker_cpu},
523+
{
524+
"numa-0": Resource(
525+
num_cpus=self._worker_cpu, mem_bytes=self._worker_mem
526+
)
527+
},
518528
modules=get_third_party_modules_from_config(
519529
self._config, NodeRole.WORKER
520530
),

mars/deploy/oscar/service.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import os
1717
from typing import List, Dict, Union
1818

19+
from ...resource import Resource
1920
from ...services import start_services, stop_services, NodeRole
2021
from ..utils import load_service_config_file
2122

@@ -75,7 +76,7 @@ async def stop_supervisor(address: str, config: Dict = None):
7576
async def start_worker(
7677
address: str,
7778
lookup_address: str,
78-
band_to_slots: Dict[str, int],
79+
band_to_resource: Dict[str, Resource],
7980
modules: Union[List, str, None] = None,
8081
config: Dict = None,
8182
mark_ready: bool = True,
@@ -87,9 +88,9 @@ async def start_worker(
8788
if backend == "fixed" and config["cluster"].get("lookup_address") is None:
8889
config["cluster"]["lookup_address"] = lookup_address
8990
if config["cluster"].get("resource") is None:
90-
config["cluster"]["resource"] = band_to_slots
91+
config["cluster"]["resource"] = band_to_resource
9192
if any(
92-
band_name.startswith("gpu-") for band_name in band_to_slots
93+
band_name.startswith("gpu-") for band_name in band_to_resource
9394
): # pragma: no cover
9495
if "cuda" not in config["storage"]["backends"]:
9596
config["storage"]["backends"].append("cuda")

mars/deploy/oscar/session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,10 +1256,10 @@ async def fetch_tileable_op_logs(
12561256
async def get_total_n_cpu(self):
12571257
all_bands = await self._cluster_api.get_all_bands()
12581258
n_cpu = 0
1259-
for band, size in all_bands.items():
1259+
for band, resource in all_bands.items():
12601260
_, band_name = band
12611261
if band_name.startswith("numa-"):
1262-
n_cpu += size
1262+
n_cpu += resource.num_cpus
12631263
return n_cpu
12641264

12651265
async def get_cluster_versions(self) -> List[str]:

0 commit comments

Comments
 (0)