Skip to content

Commit 68db395

Browse files
warrenzhu25Mridul Muralidharan
authored andcommitted
[SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false
### What changes were proposed in this pull request? Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When `keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock or potential deadlock issue. When 2 tasks try to compute same rdd with replication level of 2 and running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057] Task thread hold write lock and waiting for replication to remote executor while shuffle server thread which handling block upload request waiting on `lockForReading` in [BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24) ### Why are the changes needed? This could save unnecessary read lock acquire and avoid deadlock issue mention above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in BlockInfoManagerSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43067 from warrenzhu25/deadlock. Authored-by: Warren Zhu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 0d6fda5) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
1 parent 85bf705 commit 68db395

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,13 +383,14 @@ private[storage] class BlockInfoManager extends Logging {
383383
* then just go ahead and acquire the write lock. Otherwise, if another thread is already
384384
* writing the block, then we wait for the write to finish before acquiring the read lock.
385385
*
386-
* @return true if the block did not already exist, false otherwise. If this returns false, then
387-
* a read lock on the existing block will be held. If this returns true, a write lock on
388-
* the new block will be held.
386+
* @return true if the block did not already exist, false otherwise.
387+
* If this returns true, a write lock on the new block will be held.
388+
* If this returns false then a read lock will be held iff keepReadLock == true.
389389
*/
390390
def lockNewBlockForWriting(
391391
blockId: BlockId,
392-
newBlockInfo: BlockInfo): Boolean = {
392+
newBlockInfo: BlockInfo,
393+
keepReadLock: Boolean = true): Boolean = {
393394
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
394395
// Get the lock that will be associated with the to-be written block and lock it for the entire
395396
// duration of this operation. This way we prevent race conditions when two threads try to write
@@ -405,6 +406,8 @@ private[storage] class BlockInfoManager extends Logging {
405406
val result = lockForWriting(blockId, blocking = false)
406407
assert(result.isDefined)
407408
return true
409+
} else if (!keepReadLock) {
410+
return false
408411
} else {
409412
// Block already exists. This could happen if another thread races with us to compute
410413
// the same block. In this case we try to acquire a read lock, if the locking succeeds

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,14 +1444,10 @@ private[spark] class BlockManager(
14441444

14451445
val putBlockInfo = {
14461446
val newInfo = new BlockInfo(level, classTag, tellMaster)
1447-
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
1447+
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, keepReadLock)) {
14481448
newInfo
14491449
} else {
14501450
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
1451-
if (!keepReadLock) {
1452-
// lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
1453-
releaseLock(blockId)
1454-
}
14551451
return None
14561452
}
14571453
}

core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,20 @@ class BlockInfoManagerSuite extends SparkFunSuite {
166166
assert(blockInfoManager.get("block").get.readerCount === 1)
167167
}
168168

169+
test("lockNewBlockForWriting should not block when keepReadLock is false") {
170+
withTaskId(0) {
171+
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
172+
}
173+
val lock1Future = Future {
174+
withTaskId(1) {
175+
blockInfoManager.lockNewBlockForWriting("block", newBlockInfo(), false)
176+
}
177+
}
178+
179+
assert(!ThreadUtils.awaitResult(lock1Future, 1.seconds))
180+
assert(blockInfoManager.get("block").get.readerCount === 0)
181+
}
182+
169183
test("read locks are reentrant") {
170184
withTaskId(1) {
171185
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))

0 commit comments

Comments
 (0)