-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48586][SS] Remove lock acquisition in doMaintenance() by making a deep copy of file mappings in RocksDBFileManager in load() #46942
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
8d5814b to
aa56251
Compare
|
Can you fill in testing description in the PR description? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we be more specific and name it "background snapshot upload doesn't acquire rocksdb instance lock"?
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we trying to synchronize updates between task thread/maintenance thread here? If so, we need a lock - just synchronized would not work. This code can only be executed by one thread anyways (task thread that holds the acquire lock - so synchronized does not do anything in my opinion).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the race condition you are thinking about? Is calling close() on a snapshot being uploaded? In that case we can defer the close to the maintenance thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed your comment @chaoqin-li1123. The race condition is basically assigning to latestSnapshot variable. We assign the value of variable to newly created snapshot here, and set the variable to None in maintenance thread. Both these need to be synchronized, I think.
It seems to have been fixed in the latest revision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Probably better to name it as snapshotFileManager.
bb7bbfd to
9db51f3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition for oldSnapshots here between maintenance operations and commit addOne()
9db51f3 to
c57f958
Compare
siying
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on it. I don't have any other concern anymore.
| fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles, newLocalFilesToDfsFiles) | ||
| } | ||
|
|
||
| def captureFileMapReference(): RocksDBFileMappings = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't quite get the purpose of this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It functions as a get method for the RocksDBFileMapping private var. I named it captureFileMapReference for code readability in RocksDB where it is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe Scala has a accessor style guild: https://docs.scala-lang.org/style/naming-conventions.html#accessorsmutators
Even if follow Java or other most languages' convention, getter is better to be named getFileMappings().
Since the PR is already closed. You don't have to fix it. It's not a big deal. I mentioned the comment just to complete the discussion loop.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
| oldSnapshots += latestSnapshot | ||
| latestSnapshot = Some( | ||
| RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion)) | ||
| RocksDBSnapshot(checkpointDir, | ||
| newVersion, | ||
| numKeysOnWritingVersion, | ||
| fileManager.captureFileMapReference())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to synchronize this part to prevent race with maintenance thread on mutating latestSnapshot.
| // If changelog checkpointing is enabled, snapshot will be uploaded asynchronously | ||
| // during state store maintenance. | ||
| latestSnapshot.foreach(_.close()) | ||
| oldSnapshots += latestSnapshot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check if latestSnapshot is None before we append it to the list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a List[Option[Snapshot]], so checking is not needed. But I feel changing it to a List[Snapshot] and doing the null check make the code much cleaner.
e0bf8b6 to
bcb2307
Compare
sahnib
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. thanks for making these changes. Its super helpful to avoid contention between maintenance and task thread for performance, and ensuring snapshots keep on uploading at regular cadence.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed your comment @chaoqin-li1123. The race condition is basically assigning to latestSnapshot variable. We assign the value of variable to newly created snapshot here, and set the variable to None in maintenance thread. Both these need to be synchronized, I think.
It seems to have been fixed in the latest revision.
| val checkpoint = latestSnapshot | ||
| latestSnapshot = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[unrelated to this PR - just a note here]. I think if we fail to write to DFS (lets say due to some timeout/network error), we lose the snapshot as we assign the variable to None here. Probably better to assign this variable to None after a successful upload.
However, I think its inherently complex to do what I mentioned in above paragraph, because the task thread can change the latestSnapshot to a new snapshot while upload is happening. I guess maybe its okay to skip a snapshot on transient error (for simplicity) and upload the next snapshot.
Curious, if you have any further thoughts on this @riyaverm-db @chaoqin-li1123 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Snapshot uploading is best effort attempt anyway. It should be fine we we skip some uploading due to transient error.
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given there are three approvals and the PR has been up for 2 weeks, I wouldn't require round of review to fix nits. Please file a new FOLLOWUP PR to address the comments.
+1
|
|
||
| case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) { | ||
| case class RocksDBSnapshot( | ||
| checkpointDir: File, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces (4 spaces for params in multi-lines)
| /** Save all the files in given local checkpoint directory as a committed version in DFS */ | ||
| def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { | ||
| def saveCheckpointToDfs( | ||
| checkpointDir: File, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same here, 2 more spaces
| */ | ||
|
|
||
| case class RocksDBFileMappings( | ||
| versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
|
Thanks! Merging to master/3.5/3.4. |
|
@riyaverm-db Looks like there is merge conflict in 3.5 (and probably 3.4 as well). Could you please help crafting a PR for 3.5 and 3.4? Thanks in advance! |
…making a deep copy of file mappings in RocksDBFileManager in load() Backports apache#46942 to 3.5 When change log checkpointing is enabled, the lock of the **RocksDB** state store is acquired when uploading the snapshot inside maintenance tasks, which causes lock contention between query processing tasks and state maintenance thread. This PR fixes lock contention issue introduced by apache#45724. The changes include: 1. Removing lock acquisition in `doMaintenance()` 2. Adding a `copyFileMappings()` method to **RocksDBFileManager**, and using this method to deep copy the file manager state, specifically the file mappings `versionToRocksDBFiles` and `localFilesToDfsFiles`, in `load()` 3. Capture the reference to the file mappings in `commit()`. We want to eliminate lock contention to decrease latency of streaming queries so lock acquisition inside maintenance tasks should be avoided. This can introduce race conditions between task and maintenance threads. By making a deep copy of `versionToRocksDBFiles` and `localFilesToDfsFiles` in **RocksDBFileManager**, we can ensure that the file manager state is not updated by task thread when background snapshot uploading tasks attempt to upload a snapshot. No Added unit test cases. No Closes apache#46942 from riyaverm-db/remove-lock-contention-between-maintenance-and-task. Authored-by: Riya Verma <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
… changes ### What changes were proposed in this pull request? This is a follow up PR to #46942 addressing the style changes that were requested. ### Why are the changes needed? Style changes added. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Not applicable. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47136 from riyaverm-db/rocks-db-style. Authored-by: Riya Verma <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…making a deep copy of file mappings in RocksDBFileManager in load() Backports #46942 to 3.5 ### What changes were proposed in this pull request? When change log checkpointing is enabled, the lock of the **RocksDB** state store is acquired when uploading the snapshot inside maintenance tasks, which causes lock contention between query processing tasks and state maintenance thread. This PR fixes lock contention issue introduced by #45724. The changes include: 1. Removing lock acquisition in `doMaintenance()` 2. Adding a `copyFileMappings()` method to **RocksDBFileManager**, and using this method to deep copy the file manager state, specifically the file mappings `versionToRocksDBFiles` and `localFilesToDfsFiles`, in `load()` 3. Capture the reference to the file mappings in `commit()`. ### Why are the changes needed? We want to eliminate lock contention to decrease latency of streaming queries so lock acquisition inside maintenance tasks should be avoided. This can introduce race conditions between task and maintenance threads. By making a deep copy of `versionToRocksDBFiles` and `localFilesToDfsFiles` in **RocksDBFileManager**, we can ensure that the file manager state is not updated by task thread when background snapshot uploading tasks attempt to upload a snapshot. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47130 from riyaverm-db/remove-lock-contention-between-maintenance-and-task-3.5. Lead-authored-by: Riya Verma <[email protected]> Co-authored-by: Riya Verma <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
What changes were proposed in this pull request?
When change log checkpointing is enabled, the lock of the RocksDB state store is acquired when uploading the snapshot inside maintenance tasks, which causes lock contention between query processing tasks and state maintenance thread. This PR fixes lock contention issue introduced by #45724.
The changes include:
doMaintenance()copyFileMappings()method to RocksDBFileManager, and using this method to deep copy the file manager state, specifically the file mappingsversionToRocksDBFilesandlocalFilesToDfsFiles, inload()commit().Why are the changes needed?
We want to eliminate lock contention to decrease latency of streaming queries so lock acquisition inside maintenance tasks should be avoided. This can introduce race conditions between task and maintenance threads. By making a deep copy of
versionToRocksDBFilesandlocalFilesToDfsFilesin RocksDBFileManager, we can ensure that the file manager state is not updated by task thread when background snapshot uploading tasks attempt to upload a snapshot.Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit test cases.
Was this patch authored or co-authored using generative AI tooling?
No