Skip to content

Commit f94b1c2

Browse files
authored
Use cython to accelerate message serialization (mars-project#2932)
1 parent 1c4fbf7 commit f94b1c2

File tree

16 files changed

+836
-468
lines changed

16 files changed

+836
-468
lines changed

asv_bench/benchmarks/serialize.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
from mars.core.operand import MapReduceOperand
2020
from mars.dataframe.operands import DataFrameOperandMixin
21+
from mars.oscar.core import ActorRef
22+
23+
from mars.oscar.backends.message import SendMessage, new_message_id, ActorRefMessage
2124
from mars.serialization import serialize, deserialize
2225
from mars.serialization.serializables import (
2326
Serializable,
@@ -38,7 +41,7 @@
3841
TupleField,
3942
DictField,
4043
)
41-
from mars.services.subtask import Subtask
44+
from mars.services.subtask import Subtask, SubtaskResult, SubtaskStatus
4245
from mars.services.task import new_task_id
4346
from mars.utils import tokenize
4447

@@ -215,3 +218,49 @@ def time_pickle_serialize_fetch_shuffle_chunks(self):
215218
header, buffers = serialize(fetch_chunk)
216219
serialized = cloudpickle.dumps((header, buffers))
217220
deserialize(*cloudpickle.loads(serialized))
221+
222+
223+
class SerializeMessageSuite:
224+
def setup(self):
225+
self.send_messages = []
226+
self.actor_ref_messages = []
227+
for i in range(10000):
228+
ref = ActorRef(
229+
"ray://mars_cluster_1649927648/17/0",
230+
b"F20Wyerq6EiqltB8jAVs7L3N_task_manager",
231+
)
232+
new_result = SubtaskResult(
233+
subtask_id=new_task_id(),
234+
session_id=new_task_id(),
235+
task_id=new_task_id(),
236+
stage_id=new_task_id(),
237+
status=SubtaskStatus.succeeded,
238+
progress=1.0,
239+
data_size=1000000.0,
240+
bands=[("ray://mars_cluster_1649927648/17/0", "numa-0")],
241+
execution_start_time=1646125099.622051,
242+
execution_end_time=1646125104.448726,
243+
)
244+
send_message = SendMessage(
245+
new_message_id(),
246+
ref,
247+
new_result,
248+
protocol=0,
249+
)
250+
self.send_messages.append(send_message)
251+
actor_ref_message = ActorRefMessage(
252+
message_id=new_message_id(),
253+
actor_ref=ref,
254+
protocol=0,
255+
)
256+
self.actor_ref_messages.append(actor_ref_message)
257+
258+
def time_pickle_serialize_deserialize_send_messages(self):
259+
deserialize(
260+
*cloudpickle.loads(cloudpickle.dumps(serialize(self.send_messages)))
261+
)
262+
263+
def time_pickle_serialize_deserialize_actor_ref_messages(self):
264+
deserialize(
265+
*cloudpickle.loads(cloudpickle.dumps(serialize(self.actor_ref_messages)))
266+
)

mars/oscar/backends/context.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
from dataclasses import dataclass
1617
from typing import Tuple, Union, Type
1718

18-
from ...utils import to_binary
19+
from ...utils import to_binary, dataslots
1920
from ..api import Actor
2021
from ..core import ActorRef, create_local_actor_ref
2122
from ..context import BaseActorContext
@@ -38,11 +39,16 @@
3839
CancelMessage,
3940
ControlMessage,
4041
ControlMessageType,
41-
ProfilingContext,
4242
)
4343
from .router import Router
4444

4545

46+
@dataslots
47+
@dataclass
48+
class ProfilingContext:
49+
task_id: str
50+
51+
4652
class MarsActorContext(BaseActorContext):
4753
__slots__ = "_address", "_caller"
4854

mars/oscar/backends/core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
import copy
1717
from typing import Dict, Union
1818

19+
from ...oscar.profiling import ProfilingData
20+
from ...utils import Timer
1921
from ..errors import ServerClosed
2022
from .communication import Client
2123
from .message import _MessageBase, ResultMessage, ErrorMessage, DeserializeMessageFailed
2224
from .router import Router
23-
from ...oscar.profiling import ProfilingData
24-
from ...utils import Timer
2525

2626

2727
ResultMessageType = Union[ResultMessage, ErrorMessage]
@@ -60,10 +60,10 @@ async def _listen(self, client: Client):
6060
) from None
6161
future = self._client_to_message_futures[client].pop(message.message_id)
6262
future.set_result(message)
63-
except DeserializeMessageFailed as e: # pragma: no cover
63+
except DeserializeMessageFailed as e:
6464
message_id = e.message_id
6565
future = self._client_to_message_futures[client].pop(message_id)
66-
future.set_exception(e)
66+
future.set_exception(e.__cause__)
6767
except Exception as e: # noqa: E722 # pylint: disable=bare-except
6868
message_futures = self._client_to_message_futures.get(client)
6969
self._client_to_message_futures[client] = dict()

0 commit comments

Comments
 (0)