Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Modify deep copy test case, add comments
  • Loading branch information
riyaverm-db committed Jun 25, 2024
commit 25b9e96742f42dd34a636e9ff3893b2445b64d91
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ class RocksDB(
try {
if (loadedVersion != version) {
closeDB()
// deep copy is needed to avoid race condition
// between maintenance and task threads
fileManager = fileManager.deepCopy()
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class RocksDBFileManager(

private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]


// used to keep a mapping of the exact Dfs file that was used to create a local SST file.
// The reason this is a separate map because versionToRocksDBFiles can contain multiple similar
// SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.streaming.state

import java.io._
import java.nio.charset.Charset
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable
import scala.language.implicitConversions
Expand Down Expand Up @@ -876,45 +875,27 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}

testWithChangelogCheckpointingEnabled("RocksDBFileManager: deepCopy") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1")
val originalFileManager = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)

val file1 = RocksDBSstFile("001.sst", "dfs1", 100L)
val file2 = RocksDBSstFile("002.sst", "dfs2", 100L)

// Get access to the private ConcurrentHashMap
val origVersionToRocksDBFilesMap: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
= getPrivateField(originalFileManager, "versionToRocksDBFiles")

// Add sample files to hash maps
originalFileManager.localFilesToDfsFiles
.put("001.sst", file1)
origVersionToRocksDBFilesMap
.put(1L, Seq(file1))

// Check deepCopy duplicates the original hashmaps
val copiedFileManager = originalFileManager.deepCopy()
val copiedVersionToRocksDBFilesMap: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
= getPrivateField(copiedFileManager, "versionToRocksDBFiles")

assert(origVersionToRocksDBFilesMap.equals(copiedVersionToRocksDBFilesMap))
assert(originalFileManager.localFilesToDfsFiles
.equals(copiedFileManager.localFilesToDfsFiles))

// Add new values to original file manager hash maps
originalFileManager.localFilesToDfsFiles
.put("002.sst", file2)
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
val originalFileManager = new RocksDBFileManager(
dfsRootDir, Utils.createTempDir(), new Configuration)
val copiedFileManager = originalFileManager.deepCopy()

origVersionToRocksDBFilesMap
.put(2L, Seq(file2))
// Save a version of checkpoint files
val cpFiles = Seq(
"001.sst" -> 10,
"002.sst" -> 10,
"003.sst" -> 10
)
saveCheckpointFiles(originalFileManager, cpFiles, 1, 101)

// Check deep copied and original file manager states differ
assert(origVersionToRocksDBFilesMap.containsKey(2L))
assert(!copiedVersionToRocksDBFilesMap.containsKey(2L))
// Ensure checkpoint metrics are different
assert(originalFileManager.latestSaveCheckpointMetrics.filesCopied == 3L)
assert(originalFileManager.latestSaveCheckpointMetrics.bytesCopied == 30L)

assert(originalFileManager.localFilesToDfsFiles.containsKey("002.sst"))
assert(!copiedFileManager.localFilesToDfsFiles.containsKey("002.sst"))
assert(copiedFileManager.latestSaveCheckpointMetrics.filesCopied == 0L)
assert(copiedFileManager.latestSaveCheckpointMetrics.bytesCopied == 0L)
}
}

testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") {
Expand Down Expand Up @@ -2346,12 +2327,6 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}

def listFiles(file: String): Seq[File] = listFiles(new File(file))

def getPrivateField[T](obj: Any, fieldName: String): T = {
val field = obj.getClass.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(obj).asInstanceOf[T]
}
}

object RocksDBSuite {
Expand Down