Skip to content

Commit 1c4fbf7

Browse files
authored
Ignore Fetch operands when assigning initial nodes (mars-project#2929)
1 parent b51afae commit 1c4fbf7

File tree

11 files changed

+133
-22
lines changed

11 files changed

+133
-22
lines changed

.github/workflows/benchmark-ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ jobs:
5252
asv check -E existing
5353
git remote add upstream https://github.com/mars-project/mars.git
5454
git fetch upstream
55+
git merge upstream/master
5556
asv machine --yes
5657
asv continuous -f 1.1 --strict upstream/master HEAD
5758
if: ${{ steps.build.outcome == 'success' }}

docs/source/reference/tensor/special.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,18 @@ Error function
4545
mars.tensor.special.erf
4646

4747

48+
Ellipsoidal harmonics
49+
---------------------
50+
51+
.. autosummary::
52+
:toctree: generated/
53+
:nosignatures:
54+
55+
mars.tensor.special.ellip_harm
56+
mars.tensor.special.ellip_harm_2
57+
mars.tensor.special.ellip_normal
58+
59+
4860
Gamma and related functions
4961
---------------------------
5062

mars/services/task/analyzer/analyzer.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,18 @@ def _gen_logic_key(self, chunks: List[ChunkType]):
281281
)
282282

283283
@enter_mode(build=True)
284-
def gen_subtask_graph(self) -> SubtaskGraph:
284+
def gen_subtask_graph(
285+
self, op_to_bands: Dict[str, BandType] = None
286+
) -> SubtaskGraph:
285287
"""
286288
Analyze chunk graph and generate subtask graph.
287289
288290
Returns
289291
-------
290292
subtask_graph: SubtaskGraph
291293
Subtask graph.
294+
op_to_bands: Dict
295+
Assigned operand's band, usually for fetch operands.
292296
"""
293297
reassign_worker_ops = [
294298
chunk.op for chunk in self._chunk_graph if chunk.op.reassign_worker
@@ -310,6 +314,8 @@ def gen_subtask_graph(self) -> SubtaskGraph:
310314
for op in start_ops
311315
if op.expect_worker is not None
312316
}
317+
if op_to_bands:
318+
cur_assigns.update(op_to_bands)
313319
logger.debug(
314320
"Start to assign %s start chunks for task %s",
315321
len(start_ops),
@@ -360,7 +366,8 @@ def gen_subtask_graph(self) -> SubtaskGraph:
360366
chunk_to_colors[c] = op_to_colors[c.op]
361367
color_to_chunks = defaultdict(list)
362368
for chunk, color in chunk_to_colors.items():
363-
color_to_chunks[color].append(chunk)
369+
if not isinstance(chunk.op, Fetch):
370+
color_to_chunks[color].append(chunk)
364371

365372
# gen subtask graph
366373
subtask_graph = SubtaskGraph()
@@ -370,7 +377,8 @@ def gen_subtask_graph(self) -> SubtaskGraph:
370377
visited = set()
371378
logic_key_to_subtasks = defaultdict(list)
372379
for chunk in self._chunk_graph.topological_iter():
373-
if chunk in visited:
380+
if chunk in visited or isinstance(chunk.op, Fetch):
381+
# skip fetch chunk
374382
continue
375383

376384
color = chunk_to_colors[chunk]
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Copyright 1999-2021 Alibaba Group Holding Ltd.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import numpy as np
16+
17+
from .....config import Config
18+
from .....core import ChunkGraph
19+
from .....tensor.random import TensorRand
20+
from .....tensor.arithmetic import TensorAdd
21+
from .....tensor.fetch import TensorFetch
22+
from .....resource import Resource
23+
from ...core import Task
24+
from ..analyzer import GraphAnalyzer
25+
from ..assigner import GraphAssigner
26+
27+
28+
def test_assigner_with_fetch_inputs():
29+
band_num = 8
30+
all_bands = [(f"address_{i}", "numa-0") for i in range(band_num)]
31+
inputs = [
32+
TensorFetch(key=str(i), source_key=str(i), dtype=np.dtype(int)).new_chunk([])
33+
for i in range(band_num)
34+
]
35+
no_fetch_inputs = [TensorRand(i).new_chunk([]) for i in range(4)]
36+
results = [TensorAdd(lhs=inp, rhs=1).new_chunk([inp]) for inp in inputs]
37+
cur_assigns = dict(
38+
(fetch_chunk.op.key, band[0][0])
39+
for fetch_chunk, band in zip(reversed(inputs), all_bands)
40+
)
41+
42+
chunk_graph = ChunkGraph()
43+
for fetch_chunk, add_chunk in zip(inputs, results):
44+
chunk_graph.add_node(fetch_chunk)
45+
chunk_graph.add_node(add_chunk)
46+
chunk_graph.add_edge(fetch_chunk, add_chunk)
47+
for inp in no_fetch_inputs:
48+
results.append(inp)
49+
chunk_graph.add_node(inp)
50+
chunk_graph.results = results
51+
52+
band_resource = dict((band, Resource(num_cpus=1)) for band in all_bands)
53+
54+
task = Task("mock_task", "mock_session")
55+
analyzer = GraphAnalyzer(chunk_graph, band_resource, task, Config())
56+
subtask_graph = analyzer.gen_subtask_graph(cur_assigns)
57+
58+
assigner = GraphAssigner(
59+
chunk_graph, list(GraphAnalyzer._iter_start_ops(chunk_graph)), band_resource
60+
)
61+
assigns = assigner.assign(cur_assigns)
62+
key_to_assign = dict((c.key, band) for c, band in assigns.items())
63+
for subtask in subtask_graph:
64+
input_chunks = list(subtask.chunk_graph.iter_indep())
65+
if all(isinstance(inp.op, TensorFetch) for inp in input_chunks):
66+
# all inputs are fetch, expect band should be None
67+
assert subtask.expect_band is None
68+
else:
69+
# if subtask has truly initial chunks, expect band should be
70+
# same as assign results
71+
for inp in input_chunks:
72+
if not isinstance(inp.op, TensorFetch):
73+
assert subtask.expect_band == key_to_assign[inp.key]

mars/services/task/api/web.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,11 @@ async def get_last_idle_time(self) -> Union[float, None]:
247247

248248
async def wait_task(self, task_id: str, timeout: float = None):
249249
path = f"{self._address}/api/session/{self._session_id}/task/{task_id}"
250-
# client timeout should be longer than server timeout.
251-
server_timeout = "" if timeout is None else str(max(timeout / 2.0, timeout - 1))
252-
params = {"action": "wait", "timeout": server_timeout}
250+
# increase client timeout to handle network overhead during entire request
251+
client_timeout = timeout + 3 if timeout else 0
252+
params = {"action": "wait", "timeout": "" if timeout is None else str(timeout)}
253253
res = await self._request_url(
254-
"GET", path, params=params, request_timeout=timeout or 0
254+
"GET", path, params=params, request_timeout=client_timeout
255255
)
256256
return _json_deserial_task_result(json.loads(res.body.decode()))
257257

mars/services/task/execution/api.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
from dataclasses import dataclass
1717
from typing import List, Dict, Any, Type
1818

19-
from ....core import ChunkGraph
19+
from ....core import ChunkGraph, Chunk
2020
from ....typing import BandType
2121
from ...subtask import SubtaskGraph, SubtaskResult
2222

2323

2424
@dataclass
2525
class ExecutionChunkResult:
26-
key: str # The chunk key for fetching the result.
26+
chunk: Chunk # The chunk key for fetching the result.
2727
meta: Dict # The chunk meta for iterative tiling.
2828
context: Any # The context info, e.g. ray.ObjectRef.
2929

mars/services/task/execution/mars/executor.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from typing import Dict, List, Optional
2020

2121
from ..... import oscar as mo
22-
from .....core import ChunkGraph
22+
from .....core import ChunkGraph, TileableGraph
2323
from .....core.operand import (
2424
Fetch,
2525
MapReduceOperand,
@@ -37,6 +37,7 @@
3737
from ....meta.api import MetaAPI
3838
from ....scheduling import SchedulingAPI
3939
from ....subtask import Subtask, SubtaskResult, SubtaskStatus, SubtaskGraph
40+
from ...core import Task
4041
from ..api import TaskExecutor, register_executor_cls
4142
from .resource import ResourceEvaluator
4243
from .stage import TaskStageProcessor
@@ -62,14 +63,14 @@ class MarsTaskExecutor(TaskExecutor):
6263

6364
def __init__(
6465
self,
65-
config,
66-
task,
67-
tileable_graph,
68-
tile_context,
69-
cluster_api,
70-
lifecycle_api,
71-
scheduling_api,
72-
meta_api,
66+
config: Dict,
67+
task: Task,
68+
tileable_graph: TileableGraph,
69+
tile_context: Dict[TileableType, TileableType],
70+
cluster_api: ClusterAPI,
71+
lifecycle_api: LifecycleAPI,
72+
scheduling_api: SchedulingAPI,
73+
meta_api: MetaAPI,
7374
):
7475
self._config = config
7576
self._task = task

mars/services/task/execution/mars/stage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ async def _get_stage_result(self):
118118
metas = await self._meta_api.get_chunk_meta.batch(*get_meta)
119119
for chunk, meta in zip(chunks, metas):
120120
execution_chunk_results.append(
121-
ExecutionChunkResult(key=chunk.key, meta=meta, context=None)
121+
ExecutionChunkResult(chunk=chunk, meta=meta, context=None)
122122
)
123123
return execution_chunk_results
124124

mars/services/task/supervisor/preprocessor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,14 @@ def analyze(
206206
chunk_graph: ChunkGraph,
207207
available_bands: Dict[BandType, Resource],
208208
stage_id: str = None,
209+
op_to_bands: Dict[str, BandType] = None,
209210
) -> SubtaskGraph:
210211
logger.debug("Start to gen subtask graph for task %s", self._task.task_id)
211212
task = self._task
212213
analyzer = GraphAnalyzer(
213214
chunk_graph, available_bands, task, self._config, stage_id=stage_id
214215
)
215-
graph = analyzer.gen_subtask_graph()
216+
graph = analyzer.gen_subtask_graph(op_to_bands)
216217
logger.debug(
217218
"Generated subtask graph of %s subtasks for task %s",
218219
len(graph),

mars/services/task/supervisor/processor.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,26 @@ async def _process_stage_chunk_graph(
182182
chunk_graph: ChunkGraph,
183183
):
184184
available_bands = await self._executor.get_available_band_slots()
185+
meta_api = self._executor._meta_api
186+
get_meta_tasks = []
187+
fetch_op_keys = []
188+
for c in chunk_graph.iter_indep():
189+
if isinstance(c.op, Fetch):
190+
get_meta_tasks.append(
191+
meta_api.get_chunk_meta.delay(c.key, fields=["bands"])
192+
)
193+
fetch_op_keys.append(c.op.key)
194+
key_to_bands = await meta_api.get_chunk_meta.batch(*get_meta_tasks)
195+
fetch_op_to_bands = dict(
196+
(key, meta["bands"][0]) for key, meta in zip(fetch_op_keys, key_to_bands)
197+
)
185198
with Timer() as timer:
186199
subtask_graph = await asyncio.to_thread(
187200
self._preprocessor.analyze,
188201
chunk_graph,
189202
available_bands,
190203
stage_id=stage_id,
204+
op_to_bands=fetch_op_to_bands,
191205
)
192206
stage_profiler.set(f"gen_subtask_graph({len(subtask_graph)})", timer.duration)
193207
logger.info(
@@ -239,8 +253,8 @@ async def _update_meta(
239253
):
240254
result_chunks = [c for c in chunk_graph.results if not isinstance(c.op, Fetch)]
241255
chunk_to_band = {
242-
c: r.meta["bands"][0][0]
243-
for c, r in zip(result_chunks, execution_chunk_results)
256+
result.chunk: result.meta["bands"][0][0]
257+
for result in execution_chunk_results
244258
}
245259
update_meta_chunks = set(result_chunks)
246260
update_meta_tileables = dict()

0 commit comments

Comments
 (0)