diff --git a/.github/workflows/benchmark-ci.yml b/.github/workflows/benchmark-ci.yml index eb3436e517..e1861e1dd0 100644 --- a/.github/workflows/benchmark-ci.yml +++ b/.github/workflows/benchmark-ci.yml @@ -52,6 +52,7 @@ jobs: asv check -E existing git remote add upstream https://github.com/mars-project/mars.git git fetch upstream + git merge upstream/master asv machine --yes asv continuous -f 1.1 --strict upstream/master HEAD if: ${{ steps.build.outcome == 'success' }} diff --git a/docs/source/reference/tensor/special.rst b/docs/source/reference/tensor/special.rst index 314367a2e1..0348af98f4 100644 --- a/docs/source/reference/tensor/special.rst +++ b/docs/source/reference/tensor/special.rst @@ -45,6 +45,18 @@ Error function mars.tensor.special.erf +Ellipsoidal harmonics +--------------------- + +.. autosummary:: + :toctree: generated/ + :nosignatures: + + mars.tensor.special.ellip_harm + mars.tensor.special.ellip_harm_2 + mars.tensor.special.ellip_normal + + Gamma and related functions --------------------------- diff --git a/mars/services/task/analyzer/analyzer.py b/mars/services/task/analyzer/analyzer.py index 1137f9d480..714172fc13 100644 --- a/mars/services/task/analyzer/analyzer.py +++ b/mars/services/task/analyzer/analyzer.py @@ -281,7 +281,9 @@ def _gen_logic_key(self, chunks: List[ChunkType]): ) @enter_mode(build=True) - def gen_subtask_graph(self) -> SubtaskGraph: + def gen_subtask_graph( + self, op_to_bands: Dict[str, BandType] = None + ) -> SubtaskGraph: """ Analyze chunk graph and generate subtask graph. @@ -289,6 +291,8 @@ def gen_subtask_graph(self) -> SubtaskGraph: ------- subtask_graph: SubtaskGraph Subtask graph. + op_to_bands: Dict + Assigned operand's band, usually for fetch operands. """ reassign_worker_ops = [ chunk.op for chunk in self._chunk_graph if chunk.op.reassign_worker @@ -310,6 +314,8 @@ def gen_subtask_graph(self) -> SubtaskGraph: for op in start_ops if op.expect_worker is not None } + if op_to_bands: + cur_assigns.update(op_to_bands) logger.debug( "Start to assign %s start chunks for task %s", len(start_ops), @@ -360,7 +366,8 @@ def gen_subtask_graph(self) -> SubtaskGraph: chunk_to_colors[c] = op_to_colors[c.op] color_to_chunks = defaultdict(list) for chunk, color in chunk_to_colors.items(): - color_to_chunks[color].append(chunk) + if not isinstance(chunk.op, Fetch): + color_to_chunks[color].append(chunk) # gen subtask graph subtask_graph = SubtaskGraph() @@ -370,7 +377,8 @@ def gen_subtask_graph(self) -> SubtaskGraph: visited = set() logic_key_to_subtasks = defaultdict(list) for chunk in self._chunk_graph.topological_iter(): - if chunk in visited: + if chunk in visited or isinstance(chunk.op, Fetch): + # skip fetch chunk continue color = chunk_to_colors[chunk] diff --git a/mars/services/task/analyzer/tests/test_assigner.py b/mars/services/task/analyzer/tests/test_assigner.py new file mode 100644 index 0000000000..57dc64cc26 --- /dev/null +++ b/mars/services/task/analyzer/tests/test_assigner.py @@ -0,0 +1,73 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np + +from .....config import Config +from .....core import ChunkGraph +from .....tensor.random import TensorRand +from .....tensor.arithmetic import TensorAdd +from .....tensor.fetch import TensorFetch +from .....resource import Resource +from ...core import Task +from ..analyzer import GraphAnalyzer +from ..assigner import GraphAssigner + + +def test_assigner_with_fetch_inputs(): + band_num = 8 + all_bands = [(f"address_{i}", "numa-0") for i in range(band_num)] + inputs = [ + TensorFetch(key=str(i), source_key=str(i), dtype=np.dtype(int)).new_chunk([]) + for i in range(band_num) + ] + no_fetch_inputs = [TensorRand(i).new_chunk([]) for i in range(4)] + results = [TensorAdd(lhs=inp, rhs=1).new_chunk([inp]) for inp in inputs] + cur_assigns = dict( + (fetch_chunk.op.key, band[0][0]) + for fetch_chunk, band in zip(reversed(inputs), all_bands) + ) + + chunk_graph = ChunkGraph() + for fetch_chunk, add_chunk in zip(inputs, results): + chunk_graph.add_node(fetch_chunk) + chunk_graph.add_node(add_chunk) + chunk_graph.add_edge(fetch_chunk, add_chunk) + for inp in no_fetch_inputs: + results.append(inp) + chunk_graph.add_node(inp) + chunk_graph.results = results + + band_resource = dict((band, Resource(num_cpus=1)) for band in all_bands) + + task = Task("mock_task", "mock_session") + analyzer = GraphAnalyzer(chunk_graph, band_resource, task, Config()) + subtask_graph = analyzer.gen_subtask_graph(cur_assigns) + + assigner = GraphAssigner( + chunk_graph, list(GraphAnalyzer._iter_start_ops(chunk_graph)), band_resource + ) + assigns = assigner.assign(cur_assigns) + key_to_assign = dict((c.key, band) for c, band in assigns.items()) + for subtask in subtask_graph: + input_chunks = list(subtask.chunk_graph.iter_indep()) + if all(isinstance(inp.op, TensorFetch) for inp in input_chunks): + # all inputs are fetch, expect band should be None + assert subtask.expect_band is None + else: + # if subtask has truly initial chunks, expect band should be + # same as assign results + for inp in input_chunks: + if not isinstance(inp.op, TensorFetch): + assert subtask.expect_band == key_to_assign[inp.key] diff --git a/mars/services/task/api/web.py b/mars/services/task/api/web.py index 5afc5c4200..f734747f19 100644 --- a/mars/services/task/api/web.py +++ b/mars/services/task/api/web.py @@ -247,11 +247,11 @@ async def get_last_idle_time(self) -> Union[float, None]: async def wait_task(self, task_id: str, timeout: float = None): path = f"{self._address}/api/session/{self._session_id}/task/{task_id}" - # client timeout should be longer than server timeout. - server_timeout = "" if timeout is None else str(max(timeout / 2.0, timeout - 1)) - params = {"action": "wait", "timeout": server_timeout} + # increase client timeout to handle network overhead during entire request + client_timeout = timeout + 3 if timeout else 0 + params = {"action": "wait", "timeout": "" if timeout is None else str(timeout)} res = await self._request_url( - "GET", path, params=params, request_timeout=timeout or 0 + "GET", path, params=params, request_timeout=client_timeout ) return _json_deserial_task_result(json.loads(res.body.decode())) diff --git a/mars/services/task/execution/api.py b/mars/services/task/execution/api.py index d561358c41..973ed66278 100644 --- a/mars/services/task/execution/api.py +++ b/mars/services/task/execution/api.py @@ -16,14 +16,14 @@ from dataclasses import dataclass from typing import List, Dict, Any, Type -from ....core import ChunkGraph +from ....core import ChunkGraph, Chunk from ....typing import BandType from ...subtask import SubtaskGraph, SubtaskResult @dataclass class ExecutionChunkResult: - key: str # The chunk key for fetching the result. + chunk: Chunk # The chunk key for fetching the result. meta: Dict # The chunk meta for iterative tiling. context: Any # The context info, e.g. ray.ObjectRef. diff --git a/mars/services/task/execution/mars/executor.py b/mars/services/task/execution/mars/executor.py index 8bd334a500..09e5f655bb 100644 --- a/mars/services/task/execution/mars/executor.py +++ b/mars/services/task/execution/mars/executor.py @@ -18,7 +18,7 @@ from typing import Dict, List, Optional from ..... import oscar as mo -from .....core import ChunkGraph +from .....core import ChunkGraph, TileableGraph from .....core.operand import ( Fetch, MapReduceOperand, @@ -36,6 +36,7 @@ from ....meta.api import MetaAPI from ....scheduling import SchedulingAPI from ....subtask import Subtask, SubtaskResult, SubtaskStatus, SubtaskGraph +from ...core import Task from ..api import TaskExecutor, register_executor_cls from .resource import ResourceEvaluator from .stage import TaskStageProcessor @@ -61,14 +62,14 @@ class MarsTaskExecutor(TaskExecutor): def __init__( self, - config, - task, - tileable_graph, - tile_context, - cluster_api, - lifecycle_api, - scheduling_api, - meta_api, + config: Dict, + task: Task, + tileable_graph: TileableGraph, + tile_context: Dict[TileableType, TileableType], + cluster_api: ClusterAPI, + lifecycle_api: LifecycleAPI, + scheduling_api: SchedulingAPI, + meta_api: MetaAPI, ): self._config = config self._task = task diff --git a/mars/services/task/execution/mars/stage.py b/mars/services/task/execution/mars/stage.py index c088c62bdb..5d238acffc 100644 --- a/mars/services/task/execution/mars/stage.py +++ b/mars/services/task/execution/mars/stage.py @@ -118,7 +118,7 @@ async def _get_stage_result(self): metas = await self._meta_api.get_chunk_meta.batch(*get_meta) for chunk, meta in zip(chunks, metas): execution_chunk_results.append( - ExecutionChunkResult(key=chunk.key, meta=meta, context=None) + ExecutionChunkResult(chunk=chunk, meta=meta, context=None) ) return execution_chunk_results diff --git a/mars/services/task/supervisor/preprocessor.py b/mars/services/task/supervisor/preprocessor.py index 1f3c401646..95858b52bb 100644 --- a/mars/services/task/supervisor/preprocessor.py +++ b/mars/services/task/supervisor/preprocessor.py @@ -206,13 +206,14 @@ def analyze( chunk_graph: ChunkGraph, available_bands: Dict[BandType, Resource], stage_id: str = None, + op_to_bands: Dict[str, BandType] = None, ) -> SubtaskGraph: logger.debug("Start to gen subtask graph for task %s", self._task.task_id) task = self._task analyzer = GraphAnalyzer( chunk_graph, available_bands, task, self._config, stage_id=stage_id ) - graph = analyzer.gen_subtask_graph() + graph = analyzer.gen_subtask_graph(op_to_bands) logger.debug( "Generated subtask graph of %s subtasks for task %s", len(graph), diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index 5d657a5b6f..83b0875325 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -182,12 +182,26 @@ async def _process_stage_chunk_graph( chunk_graph: ChunkGraph, ): available_bands = await self._executor.get_available_band_slots() + meta_api = self._executor._meta_api + get_meta_tasks = [] + fetch_op_keys = [] + for c in chunk_graph.iter_indep(): + if isinstance(c.op, Fetch): + get_meta_tasks.append( + meta_api.get_chunk_meta.delay(c.key, fields=["bands"]) + ) + fetch_op_keys.append(c.op.key) + key_to_bands = await meta_api.get_chunk_meta.batch(*get_meta_tasks) + fetch_op_to_bands = dict( + (key, meta["bands"][0]) for key, meta in zip(fetch_op_keys, key_to_bands) + ) with Timer() as timer: subtask_graph = await asyncio.to_thread( self._preprocessor.analyze, chunk_graph, available_bands, stage_id=stage_id, + op_to_bands=fetch_op_to_bands, ) stage_profiler.set(f"gen_subtask_graph({len(subtask_graph)})", timer.duration) logger.info( @@ -239,8 +253,8 @@ async def _update_meta( ): result_chunks = [c for c in chunk_graph.results if not isinstance(c.op, Fetch)] chunk_to_band = { - c: r.meta["bands"][0][0] - for c, r in zip(result_chunks, execution_chunk_results) + result.chunk: result.meta["bands"][0][0] + for result in execution_chunk_results } update_meta_chunks = set(result_chunks) update_meta_tileables = dict() diff --git a/mars/services/task/supervisor/tests/task_preprocessor.py b/mars/services/task/supervisor/tests/task_preprocessor.py index cb26034972..b9b0a8011e 100644 --- a/mars/services/task/supervisor/tests/task_preprocessor.py +++ b/mars/services/task/supervisor/tests/task_preprocessor.py @@ -142,6 +142,7 @@ def analyze( chunk_graph: ChunkGraph, available_bands: Dict[BandType, Resource], stage_id: str, + op_to_bands: Dict[str, BandType] = None, ) -> SubtaskGraph: # record shapes generated in tile for n in chunk_graph: