Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mars/services/subtask/worker/tests/test_subtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ async def actor_pool():
await mo.create_actor(
TaskConfigurationActor,
dict(),
dict(),
uid=TaskConfigurationActor.default_uid(),
address=pool.external_address,
)
Expand Down
15 changes: 15 additions & 0 deletions mars/services/task/execution/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .mars import *
91 changes: 91 additions & 0 deletions mars/services/task/execution/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Any, Type

from ....core import ChunkGraph
from ....typing import BandType
from ...subtask import SubtaskGraph, SubtaskResult


@dataclass
class ExecutionChunkResult:
key: str # The chunk key for fetching the result.
meta: Dict # The chunk meta for iterative tiling.
context: Any # The context info, e.g. ray.ObjectRef.


class TaskExecutor(ABC):
name = None

@classmethod
@abstractmethod
async def create(
cls, config: Dict, *, session_id: str, address: str, task, **kwargs
) -> "TaskExecutor":
name = config.get("backend", "mars")
backend_config = config.get(name, {})
executor_cls = _name_to_task_executor_cls[name]
if executor_cls.create.__func__ is TaskExecutor.create.__func__:
raise NotImplementedError(
f"The {executor_cls} should implement the abstract classmethod `create`."
)
return await executor_cls.create(
backend_config, session_id=session_id, address=address, task=task, **kwargs
)

async def __aenter__(self):
"""Called when begin to execute the task."""

@abstractmethod
async def execute_subtask_graph(
self,
stage_id: str,
subtask_graph: SubtaskGraph,
chunk_graph: ChunkGraph,
context: Any = None,
) -> List[ExecutionChunkResult]:
"""Execute a subtask graph and returns result."""

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Called when finish the task."""

@abstractmethod
async def get_available_band_slots(self) -> Dict[BandType, int]:
"""Get available band slots."""

@abstractmethod
async def get_progress(self) -> float:
"""Get the execution progress."""

@abstractmethod
async def cancel(self):
"""Cancel execution."""

# The following APIs are for compatible with mars backend, they
# will be removed as soon as possible.
async def set_subtask_result(self, subtask_result: SubtaskResult):
"""Set the subtask result."""

def get_stage_processors(self):
"""Get stage processors."""


_name_to_task_executor_cls: Dict[str, Type[TaskExecutor]] = {}


def register_executor_cls(executor_cls: Type[TaskExecutor]):
_name_to_task_executor_cls[executor_cls.name] = executor_cls
15 changes: 15 additions & 0 deletions mars/services/task/execution/mars/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .executor import MarsTaskExecutor
Loading