Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.jdk.CollectionConverters._
import scala.ref.WeakReference
import scala.util.Try
Expand Down Expand Up @@ -74,14 +74,19 @@ class RocksDB(
loggingId: String = "",
useColumnFamilies: Boolean = false) extends Logging {

case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) {
case class RocksDBSnapshot(
checkpointDir: File,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2 more spaces (4 spaces for params in multi-lines)

version: Long,
numKeys: Long,
capturedFileMappings: RocksDBFileMappings) {
def close(): Unit = {
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
}
}

@volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
@volatile private var lastSnapshotVersion = 0L
private val oldSnapshots = new ListBuffer[RocksDBSnapshot]

RocksDBLoader.loadLibrary()

Expand Down Expand Up @@ -181,6 +186,9 @@ class RocksDB(
try {
if (loadedVersion != version) {
closeDB()
// deep copy is needed to avoid race condition
// between maintenance and task threads
fileManager.copyFileMapping()
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion
Expand All @@ -189,7 +197,6 @@ class RocksDB(
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

Expand Down Expand Up @@ -588,10 +595,17 @@ class RocksDB(
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
// during state store maintenance.
latestSnapshot.foreach(_.close())
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
lastSnapshotVersion = newVersion
synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we trying to synchronize updates between task thread/maintenance thread here? If so, we need a lock - just synchronized would not work. This code can only be executed by one thread anyways (task thread that holds the acquire lock - so synchronized does not do anything in my opinion).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the race condition you are thinking about? Is calling close() on a snapshot being uploaded? In that case we can defer the close to the maintenance thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed your comment @chaoqin-li1123. The race condition is basically assigning to latestSnapshot variable. We assign the value of variable to newly created snapshot here, and set the variable to None in maintenance thread. Both these need to be synchronized, I think.

It seems to have been fixed in the latest revision.

if (latestSnapshot.isDefined) {
oldSnapshots += latestSnapshot.get
}
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir,
newVersion,
numKeysOnWritingVersion,
fileManager.captureFileMapReference()))
lastSnapshotVersion = newVersion
}
}
}

Expand Down Expand Up @@ -643,23 +657,36 @@ class RocksDB(
}

private def uploadSnapshot(): Unit = {
var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil
val localCheckpoint = synchronized {
val checkpoint = latestSnapshot
latestSnapshot = None
Comment on lines 662 to 663
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[unrelated to this PR - just a note here]. I think if we fail to write to DFS (lets say due to some timeout/network error), we lose the snapshot as we assign the variable to None here. Probably better to assign this variable to None after a successful upload.

However, I think its inherently complex to do what I mentioned in above paragraph, because the task thread can change the latestSnapshot to a new snapshot while upload is happening. I guess maybe its okay to skip a snapshot on transient error (for simplicity) and upload the next snapshot.

Curious, if you have any further thoughts on this @riyaverm-db @chaoqin-li1123 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snapshot uploading is best effort attempt anyway. It should be fine we we skip some uploading due to transient error.


// Convert mutable list buffer to immutable to prevent
// race condition with commit where old snapshot is added
oldSnapshotsImmutable = oldSnapshots.toList
oldSnapshots.clear()

checkpoint
}
localCheckpoint match {
case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
case Some(RocksDBSnapshot(localDir, version, numKeys, capturedFileMappings)) =>
try {
val uploadTime = timeTakenMs {
fileManager.saveCheckpointToDfs(localDir, version, numKeys)
fileManager.saveCheckpointToDfs(localDir, version, numKeys, capturedFileMappings)
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
}
logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of version " +
log"${MDC(LogKeys.VERSION_NUM, version)}," +
log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms")
} finally {
localCheckpoint.foreach(_.close())

// Clean up old latestSnapshots
for (snapshot <- oldSnapshotsImmutable) {
snapshot.close()
}

}
case _ =>
}
Expand All @@ -681,20 +708,7 @@ class RocksDB(

def doMaintenance(): Unit = {
if (enableChangelogCheckpointing) {
// There is race to update latestSnapshot between load(), commit()
// and uploadSnapshot().
// The load method will reset latestSnapshot to discard any snapshots taken
// from newer versions (when a old version is reloaded).
// commit() method deletes the existing snapshot while creating a new snapshot.
// In order to ensure that the snapshot being uploaded would not be modified
// concurrently, we need to synchronize the snapshot access between task thread
// and maintenance thread.
acquire(StoreMaintenance)
try {
uploadSnapshot()
} finally {
release(StoreMaintenance)
}
uploadSnapshot()
}
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,6 @@ class RocksDBFileManager(

import RocksDBImmutableFile._

private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]


// used to keep a mapping of the exact Dfs file that was used to create a local SST file.
// The reason this is a separate map because versionToRocksDBFiles can contain multiple similar
// SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and
// 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility
// across SST files and RocksDB manifest.
private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]

private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
private val onlyZipFiles = new PathFilter {
Expand All @@ -157,6 +147,29 @@ class RocksDBFileManager(
private def codec = CompressionCodec.createCodec(sparkConf, codecName)

@volatile private var rootDirChecked: Boolean = false
@volatile private var fileMappings = RocksDBFileMappings(
new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
new ConcurrentHashMap[String, RocksDBImmutableFile]
)

/**
* Make a deep copy of versionToRocksDBFiles and localFilesToDfsFiles to avoid
* current task thread from overwriting the file mapping whenever background maintenance
* thread attempts to upload a snapshot
*/
def copyFileMapping() : Unit = {
val newVersionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
val newLocalFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]

newVersionToRocksDBFiles.putAll(fileMappings.versionToRocksDBFiles)
newLocalFilesToDfsFiles.putAll(fileMappings.localFilesToDfsFiles)

fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles, newLocalFilesToDfsFiles)
}

def captureFileMapReference(): RocksDBFileMappings = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite get the purpose of this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It functions as a get method for the RocksDBFileMapping private var. I named it captureFileMapReference for code readability in RocksDB where it is used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Scala has a accessor style guild: https://docs.scala-lang.org/style/naming-conventions.html#accessorsmutators
Even if follow Java or other most languages' convention, getter is better to be named getFileMappings().
Since the PR is already closed. You don't have to fix it. It's not a big deal. I mentioned the comment just to complete the discussion loop.

fileMappings
}

def getChangeLogWriter(
version: Long,
Expand Down Expand Up @@ -204,11 +217,15 @@ class RocksDBFileManager(
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics

/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
def saveCheckpointToDfs(
checkpointDir: File,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here, 2 more spaces

version: Long,
numKeys: Long,
capturedFileMappings: RocksDBFileMappings): Unit = {
logFilesInDir(checkpointDir, log"Saving checkpoint files " +
log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, capturedFileMappings)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
Expand Down Expand Up @@ -243,10 +260,10 @@ class RocksDBFileManager(
// The unique ids of SST files are checked when opening a rocksdb instance. The SST files
// in larger versions can't be reused even if they have the same size and name because
// they belong to another rocksdb instance.
versionToRocksDBFiles.keySet().removeIf(_ >= version)
fileMappings.versionToRocksDBFiles.keySet().removeIf(_ >= version)
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localFilesToDfsFiles.clear()
fileMappings.localFilesToDfsFiles.clear()
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
Expand All @@ -260,7 +277,7 @@ class RocksDBFileManager(
logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" +
log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
versionToRocksDBFiles.put(version, metadata.immutableFiles)
fileMappings.versionToRocksDBFiles.put(version, metadata.immutableFiles)
metadataFile.delete()
metadata
}
Expand Down Expand Up @@ -417,9 +434,9 @@ class RocksDBFileManager(
// Resolve RocksDB files for all the versions and find the max version each file is used
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
sortedSnapshotVersions.foreach { version =>
val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse {
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
versionToRocksDBFiles.put(version, newResolvedFiles)
fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
Expand Down Expand Up @@ -466,7 +483,7 @@ class RocksDBFileManager(
val versionFile = dfsBatchZipFile(version)
try {
fm.delete(versionFile)
versionToRocksDBFiles.remove(version)
fileMappings.versionToRocksDBFiles.remove(version)
logDebug(s"Deleted version $version")
} catch {
case e: Exception =>
Expand All @@ -487,7 +504,8 @@ class RocksDBFileManager(
/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
localFiles: Seq[File],
capturedFileMappings: RocksDBFileMappings): Seq[RocksDBImmutableFile] = {
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(log"Saving RocksDB files to DFS for ${MDC(LogKeys.VERSION_NUM, version)}")
Expand All @@ -497,7 +515,8 @@ class RocksDBFileManager(
var filesReused = 0L

val immutableFiles = localFiles.map { localFile =>
val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName)
val existingDfsFile =
capturedFileMappings.localFilesToDfsFiles.asScala.get(localFile.getName)
if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) {
val dfsFile = existingDfsFile.get
filesReused += 1
Expand All @@ -521,7 +540,7 @@ class RocksDBFileManager(
bytesCopied += localFileSize

val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
localFilesToDfsFiles.put(localFileName, immutableDfsFile)
capturedFileMappings.localFilesToDfsFiles.put(localFileName, immutableDfsFile)

immutableDfsFile
}
Expand All @@ -530,7 +549,7 @@ class RocksDBFileManager(
log"(${MDC(LogKeys.NUM_BYTES, bytesCopied)} bytes) from local to" +
log" DFS for version ${MDC(LogKeys.VERSION_NUM, version)}. " +
log"${MDC(LogKeys.NUM_FILES_REUSED, filesReused)} files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)
capturedFileMappings.versionToRocksDBFiles.put(version, immutableFiles)

// Cleanup locally deleted files from the localFilesToDfsFiles map
// Locally, SST Files can be deleted due to RocksDB compaction. These files need
Expand Down Expand Up @@ -570,7 +589,7 @@ class RocksDBFileManager(
.foreach { existingFile =>
val existingFileSize = existingFile.length()
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
val prevDfsFile = fileMappings.localFilesToDfsFiles.asScala.get(existingFile.getName)
val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName &&
existingFile.length() == requiredFile.get.sizeBytes
Expand All @@ -580,7 +599,7 @@ class RocksDBFileManager(

if (!isSameFile) {
existingFile.delete()
localFilesToDfsFiles.remove(existingFile.getName)
fileMappings.localFilesToDfsFiles.remove(existingFile.getName)
logInfo(log"Deleted local file ${MDC(LogKeys.FILE_NAME, existingFile)} " +
log"with size ${MDC(LogKeys.NUM_BYTES, existingFileSize)} mapped" +
log" to previous dfsFile ${MDC(LogKeys.DFS_FILE, prevDfsFile.getOrElse("null"))}")
Expand Down Expand Up @@ -612,7 +631,7 @@ class RocksDBFileManager(
}
filesCopied += 1
bytesCopied += localFileSize
localFilesToDfsFiles.put(localFileName, file)
fileMappings.localFilesToDfsFiles.put(localFileName, file)
logInfo(log"Copied ${MDC(LogKeys.DFS_FILE, dfsFile)} to " +
log"${MDC(LogKeys.FILE_NAME, localFile)} - " +
log"${MDC(LogKeys.NUM_BYTES, localFileSize)} bytes")
Expand All @@ -633,13 +652,13 @@ class RocksDBFileManager(
private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = {
// clean up deleted SST files from the localFilesToDfsFiles Map
val currentLocalFiles = localFiles.map(_.getName).toSet
val mappingsToClean = localFilesToDfsFiles.asScala
val mappingsToClean = fileMappings.localFilesToDfsFiles.asScala
.keys
.filterNot(currentLocalFiles.contains)

mappingsToClean.foreach { f =>
logInfo(log"cleaning ${MDC(LogKeys.FILE_NAME, f)} from the localFilesToDfsFiles map")
localFilesToDfsFiles.remove(f)
fileMappings.localFilesToDfsFiles.remove(f)
}
}

Expand Down Expand Up @@ -749,6 +768,20 @@ class RocksDBFileManager(
}
}

/**
* Track file mappings in RocksDB across local and remote directories
* @param versionToRocksDBFiles Mapping of RocksDB files used across versions for maintenance
* @param localFilesToDfsFiles Mapping of the exact Dfs file used to create a local SST file
* The reason localFilesToDfsFiles is a separate map because versionToRocksDBFiles can contain
* multiple similar SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst
* in v1 and 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID
* compatibility across SST files and RocksDB manifest.
*/

case class RocksDBFileMappings(
versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2 more spaces

localFilesToDfsFiles: ConcurrentHashMap[String, RocksDBImmutableFile])

/**
* Metrics regarding RocksDB file sync between local and DFS.
*/
Expand Down
Loading