Skip to content

Commit 78aba00

Browse files
Ngone51dongjoon-hyun
authored andcommitted
[SPARK-53581][CORE] Fix potential thread-safety issue for mapTaskIds.add()
### What changes were proposed in this pull request? This a followup for #47037. This PR wraps up the synchronize lock for invocation of `OpenHashSet.add()` in `IndexShuffleBlockResolver`. ### Why are the changes needed? `OpenHashSet` is not thread safe. We should enfoce synchronize lock when invokes the add function to ensure the thread-safety. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? N/A. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52337 from Ngone51/fix-thread-safety. Authored-by: Yi Wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 4aa934e commit 78aba00

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,13 +310,13 @@ private[spark] class IndexShuffleBlockResolver(
310310
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
311311
shuffleId, _ => new OpenHashSet[Long](8)
312312
)
313-
mapTaskIds.add(mapId)
313+
mapTaskIds.synchronized { mapTaskIds.add(mapId) }
314314

315315
case ShuffleDataBlockId(shuffleId, mapId, _) =>
316316
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
317317
shuffleId, _ => new OpenHashSet[Long](8)
318318
)
319-
mapTaskIds.add(mapId)
319+
mapTaskIds.synchronized { mapTaskIds.add(mapId) }
320320

321321
case _ => // Unreachable
322322
}

0 commit comments

Comments
 (0)