Skip to content

Commit fabe7fa

Browse files
authored
Fix duplicate node iteration in GraphAssigner (mars-project#2857)
1 parent e12963d commit fabe7fa

File tree

2 files changed

+50
-3
lines changed

2 files changed

+50
-3
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Copyright 1999-2022 Alibaba Group Holding Ltd.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import mars.tensor as mt
16+
import mars.dataframe as md
17+
from mars.core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
18+
from mars.services.task.analyzer import GraphAnalyzer
19+
from mars.services.task.analyzer.assigner import GraphAssigner
20+
21+
22+
class ChunkGraphAssignerSuite:
23+
"""
24+
Benchmark that times performance of chunk graph assigner
25+
"""
26+
27+
def setup(self):
28+
num_rows = 10000
29+
df1 = md.DataFrame(
30+
mt.random.rand(num_rows, 4, chunk_size=10), columns=list("abcd")
31+
)
32+
df2 = md.DataFrame(
33+
mt.random.rand(num_rows, 4, chunk_size=10), columns=list("abcd")
34+
)
35+
merged_df = df1.merge(df2, left_on="a", right_on="a")
36+
graph = TileableGraph([merged_df.data])
37+
next(TileableGraphBuilder(graph).build())
38+
self.chunk_graph = next(ChunkGraphBuilder(graph, fuse_enabled=False).build())
39+
40+
def time_assigner(self):
41+
start_ops = list(GraphAnalyzer._iter_start_ops(self.chunk_graph))
42+
band_slots = {(f"worker-{i}", "numa-0"): 16 for i in range(50)}
43+
current_assign = {}
44+
assigner = GraphAssigner(self.chunk_graph, start_ops, band_slots)
45+
assigned_result = assigner.assign(current_assign)
46+
assert len(assigned_result) == len(start_ops)

mars/services/task/analyzer/assigner.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from abc import ABC, abstractmethod
1616
from collections import defaultdict
1717
from operator import itemgetter
18-
from typing import List, Dict, Set
18+
from typing import List, Dict, Set, Union
1919

2020
import numpy as np
2121

@@ -130,7 +130,7 @@ def _assign_by_bfs(
130130
initial_sizes: Dict[BandType, int],
131131
spread_limits: Dict[BandType, float],
132132
key_to_assign: Set[str],
133-
assigned_record: Dict[str, int],
133+
assigned_record: Dict[str, Union[str, BandType]],
134134
):
135135
"""
136136
Assign initial nodes using breath-first search given initial sizes and
@@ -151,9 +151,10 @@ def _assign_by_bfs(
151151
if op_key in assigned_record:
152152
continue
153153
spread_range += 1
154+
# `op_key` may not be in `key_to_assign`, but we need to record it to avoid iterate the node repeatedly.
155+
assigned_record[op_key] = band
154156
if op_key not in key_to_assign:
155157
continue
156-
assigned_record[op_key] = band
157158
assigned += 1
158159
if spread_range >= spread_limits[band] or assigned >= initial_sizes[band]:
159160
break

0 commit comments

Comments
 (0)