Skip to content

Commit 68d30b5

Browse files
authored
Fix store duplicate chunk and meta per subtask (mars-project#2845)
1 parent 4ab8a46 commit 68d30b5

File tree

2 files changed

+42
-11
lines changed

2 files changed

+42
-11
lines changed

mars/services/subtask/worker/processor.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -284,20 +284,20 @@ async def _store_data(self, chunk_graph: ChunkGraph):
284284
]
285285

286286
# store data into storage
287-
puts = []
288-
stored_keys = []
287+
data_key_to_puts = {}
289288
for result_chunk in result_chunks:
290289
data_key = result_chunk.key
291290
if data_key in self._datastore:
292291
# non shuffle op
293-
stored_keys.append(data_key)
294292
result_data = self._datastore[data_key]
295293
# update meta
296294
if not isinstance(result_data, tuple):
297295
result_chunk.params = result_chunk.get_params_from_data(result_data)
298-
296+
# check data_key after update meta
297+
if data_key in data_key_to_puts:
298+
continue
299299
put = self._storage_api.put.delay(data_key, result_data)
300-
puts.append(put)
300+
data_key_to_puts[data_key] = put
301301
else:
302302
assert isinstance(result_chunk.op, MapReduceOperand)
303303
keys = [
@@ -306,12 +306,15 @@ async def _store_data(self, chunk_graph: ChunkGraph):
306306
if isinstance(store_key, tuple) and store_key[0] == data_key
307307
]
308308
for key in keys:
309-
stored_keys.append(key)
309+
if key in data_key_to_puts:
310+
continue
310311
result_data = self._datastore[key]
311312
put = self._storage_api.put.delay(key, result_data)
312-
puts.append(put)
313+
data_key_to_puts[key] = put
314+
stored_keys = list(data_key_to_puts.keys())
315+
puts = data_key_to_puts.values()
313316
logger.debug(
314-
"Start putting data keys: %s, " "subtask id: %s",
317+
"Start putting data keys: %s, subtask id: %s",
315318
stored_keys,
316319
self.subtask.subtask_id,
317320
)
@@ -327,20 +330,20 @@ async def _store_data(self, chunk_graph: ChunkGraph):
327330
data_key_to_memory_size[store_key] = store_info.memory_size
328331
data_key_to_object_id[store_key] = store_info.object_id
329332
logger.debug(
330-
"Finish putting data keys: %s, " "subtask id: %s",
333+
"Finish putting data keys: %s, subtask id: %s",
331334
stored_keys,
332335
self.subtask.subtask_id,
333336
)
334337
except asyncio.CancelledError:
335338
logger.debug(
336-
"Cancelling put data keys: %s, " "subtask id: %s",
339+
"Cancelling put data keys: %s, subtask id: %s",
337340
stored_keys,
338341
self.subtask.subtask_id,
339342
)
340343
put_infos.cancel()
341344

342345
logger.debug(
343-
"Cancelled put data keys: %s, " "subtask id: %s",
346+
"Cancelled put data keys: %s, subtask id: %s",
344347
stored_keys,
345348
self.subtask.subtask_id,
346349
)

mars/services/subtask/worker/tests/subtask_processor.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,33 @@
1919
from ...worker.processor import SubtaskProcessor
2020

2121

22+
class CheckStorageAPI:
23+
def __init__(self, storage_api):
24+
self._storage_api = storage_api
25+
self._put_data_keys = set()
26+
27+
def __getattr__(self, item):
28+
return getattr(self._storage_api, item)
29+
30+
@property
31+
def put(self):
32+
owner = self
33+
put = self._storage_api.put
34+
35+
class _PutWrapper:
36+
def delay(self, data_key: str, obj: object, level=None):
37+
if data_key in owner._put_data_keys:
38+
raise Exception(f"Duplicate data put: {data_key}, obj: {obj}")
39+
else:
40+
owner._put_data_keys.add(data_key)
41+
return put.delay(data_key, obj, level)
42+
43+
def __getattr__(self, item):
44+
return getattr(put, item)
45+
46+
return _PutWrapper()
47+
48+
2249
class CheckedSubtaskProcessor(ObjectCheckMixin, SubtaskProcessor):
2350
def __init__(self, *args, **kwargs):
2451
super().__init__(*args, **kwargs)
@@ -37,6 +64,7 @@ def __init__(self, *args, **kwargs):
3764
check_options[key] = kwargs.get(key, True)
3865
self._check_options = check_options
3966
self._check_keys = kwargs.get("check_keys")
67+
self._storage_api = CheckStorageAPI(self._storage_api)
4068

4169
def _execute_operand(self, ctx: Dict[str, Any], op: OperandType):
4270
super()._execute_operand(ctx, op)

0 commit comments

Comments
 (0)