Skip to content
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.jdk.CollectionConverters._
import scala.ref.WeakReference
import scala.util.Try
Expand Down Expand Up @@ -74,14 +74,20 @@ class RocksDB(
loggingId: String = "",
useColumnFamilies: Boolean = false) extends Logging {

case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) {
case class RocksDBSnapshot(
checkpointDir: File,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2 more spaces (4 spaces for params in multi-lines)

version: Long,
numKeys: Long,
capturedFileMappings: RocksDBFileMappings) {
def close(): Unit = {
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
}
}


@volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
@volatile private var lastSnapshotVersion = 0L
private val oldSnapshots = new ListBuffer[Option[RocksDBSnapshot]]

RocksDBLoader.loadLibrary()

Expand Down Expand Up @@ -181,6 +187,9 @@ class RocksDB(
try {
if (loadedVersion != version) {
closeDB()
// deep copy is needed to avoid race condition
// between maintenance and task threads
fileManager.copyFileMapping()
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion
Expand All @@ -189,7 +198,6 @@ class RocksDB(
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

Expand Down Expand Up @@ -588,9 +596,12 @@ class RocksDB(
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
// during state store maintenance.
latestSnapshot.foreach(_.close())
oldSnapshots += latestSnapshot
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if latestSnapshot is None before we append it to the list?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a List[Option[Snapshot]], so checking is not needed. But I feel changing it to a List[Snapshot] and doing the null check make the code much cleaner.

latestSnapshot = Some(
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
RocksDBSnapshot(checkpointDir,
newVersion,
numKeysOnWritingVersion,
fileManager.captureFileMapReference()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to synchronize this part to prevent race with maintenance thread on mutating latestSnapshot.

lastSnapshotVersion = newVersion
}
}
Expand Down Expand Up @@ -649,17 +660,31 @@ class RocksDB(
checkpoint
}
localCheckpoint match {
case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
case Some(RocksDBSnapshot(localDir, version, numKeys, capturedFileMappings)) =>
try {
val uploadTime = timeTakenMs {
fileManager.saveCheckpointToDfs(localDir, version, numKeys)
fileManager.saveCheckpointToDfs(localDir, version, numKeys, capturedFileMappings)
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
}
logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of version " +
log"${MDC(LogKeys.VERSION_NUM, version)}," +
log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms")
} finally {
localCheckpoint.foreach(_.close())

// Clean up old latestSnapshots
// Convert mutable list buffer to immutable to prevent
// race condition with commit where old snapshot is added
var oldSnapshotsImmutable: List[Option[RocksDBSnapshot]] = Nil
synchronized {
oldSnapshotsImmutable = oldSnapshots.toList
oldSnapshots.clear()
}

for (snapshot <- oldSnapshotsImmutable) {
snapshot.foreach(_.close())
}

}
case _ =>
}
Expand All @@ -681,20 +706,7 @@ class RocksDB(

def doMaintenance(): Unit = {
if (enableChangelogCheckpointing) {
// There is race to update latestSnapshot between load(), commit()
// and uploadSnapshot().
// The load method will reset latestSnapshot to discard any snapshots taken
// from newer versions (when a old version is reloaded).
// commit() method deletes the existing snapshot while creating a new snapshot.
// In order to ensure that the snapshot being uploaded would not be modified
// concurrently, we need to synchronize the snapshot access between task thread
// and maintenance thread.
acquire(StoreMaintenance)
try {
uploadSnapshot()
} finally {
release(StoreMaintenance)
}
uploadSnapshot()
}
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,6 @@ class RocksDBFileManager(

import RocksDBImmutableFile._

private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]


// used to keep a mapping of the exact Dfs file that was used to create a local SST file.
// The reason this is a separate map because versionToRocksDBFiles can contain multiple similar
// SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and
// 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility
// across SST files and RocksDB manifest.
private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]

private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
private val onlyZipFiles = new PathFilter {
Expand All @@ -157,6 +147,29 @@ class RocksDBFileManager(
private def codec = CompressionCodec.createCodec(sparkConf, codecName)

@volatile private var rootDirChecked: Boolean = false
@volatile private var fileMappings = RocksDBFileMappings(
new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
new ConcurrentHashMap[String, RocksDBImmutableFile]
)

/**
* Make a deep copy of versionToRocksDBFiles and localFilesToDfsFiles to avoid
* current task thread from overwriting the file mapping whenever background maintenance
* thread attempts to upload a snapshot
*/
def copyFileMapping() : Unit = {
val newVersionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
val newLocalFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]

newVersionToRocksDBFiles.putAll(fileMappings.versionToRocksDBFiles)
newLocalFilesToDfsFiles.putAll(fileMappings.localFilesToDfsFiles)

fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles, newLocalFilesToDfsFiles)
}

def captureFileMapReference(): RocksDBFileMappings = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite get the purpose of this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It functions as a get method for the RocksDBFileMapping private var. I named it captureFileMapReference for code readability in RocksDB where it is used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Scala has a accessor style guild: https://docs.scala-lang.org/style/naming-conventions.html#accessorsmutators
Even if follow Java or other most languages' convention, getter is better to be named getFileMappings().
Since the PR is already closed. You don't have to fix it. It's not a big deal. I mentioned the comment just to complete the discussion loop.

fileMappings
}

def getChangeLogWriter(
version: Long,
Expand Down Expand Up @@ -204,11 +217,15 @@ class RocksDBFileManager(
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics

/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
def saveCheckpointToDfs(
checkpointDir: File,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here, 2 more spaces

version: Long,
numKeys: Long,
capturedFileMappings: RocksDBFileMappings): Unit = {
logFilesInDir(checkpointDir, log"Saving checkpoint files " +
log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, capturedFileMappings)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
Expand Down Expand Up @@ -243,10 +260,10 @@ class RocksDBFileManager(
// The unique ids of SST files are checked when opening a rocksdb instance. The SST files
// in larger versions can't be reused even if they have the same size and name because
// they belong to another rocksdb instance.
versionToRocksDBFiles.keySet().removeIf(_ >= version)
fileMappings.versionToRocksDBFiles.keySet().removeIf(_ >= version)
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localFilesToDfsFiles.clear()
fileMappings.localFilesToDfsFiles.clear()
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
Expand All @@ -260,7 +277,7 @@ class RocksDBFileManager(
logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" +
log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
versionToRocksDBFiles.put(version, metadata.immutableFiles)
fileMappings.versionToRocksDBFiles.put(version, metadata.immutableFiles)
metadataFile.delete()
metadata
}
Expand Down Expand Up @@ -417,9 +434,9 @@ class RocksDBFileManager(
// Resolve RocksDB files for all the versions and find the max version each file is used
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
sortedSnapshotVersions.foreach { version =>
val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse {
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
versionToRocksDBFiles.put(version, newResolvedFiles)
fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
Expand Down Expand Up @@ -466,7 +483,7 @@ class RocksDBFileManager(
val versionFile = dfsBatchZipFile(version)
try {
fm.delete(versionFile)
versionToRocksDBFiles.remove(version)
fileMappings.versionToRocksDBFiles.remove(version)
logDebug(s"Deleted version $version")
} catch {
case e: Exception =>
Expand All @@ -487,7 +504,8 @@ class RocksDBFileManager(
/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
localFiles: Seq[File],
capturedFileMappings: RocksDBFileMappings): Seq[RocksDBImmutableFile] = {
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(log"Saving RocksDB files to DFS for ${MDC(LogKeys.VERSION_NUM, version)}")
Expand All @@ -497,7 +515,8 @@ class RocksDBFileManager(
var filesReused = 0L

val immutableFiles = localFiles.map { localFile =>
val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName)
val existingDfsFile =
capturedFileMappings.localFilesToDfsFiles.asScala.get(localFile.getName)
if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) {
val dfsFile = existingDfsFile.get
filesReused += 1
Expand All @@ -521,7 +540,7 @@ class RocksDBFileManager(
bytesCopied += localFileSize

val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
localFilesToDfsFiles.put(localFileName, immutableDfsFile)
capturedFileMappings.localFilesToDfsFiles.put(localFileName, immutableDfsFile)

immutableDfsFile
}
Expand All @@ -530,7 +549,7 @@ class RocksDBFileManager(
log"(${MDC(LogKeys.NUM_BYTES, bytesCopied)} bytes) from local to" +
log" DFS for version ${MDC(LogKeys.VERSION_NUM, version)}. " +
log"${MDC(LogKeys.NUM_FILES_REUSED, filesReused)} files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)
capturedFileMappings.versionToRocksDBFiles.put(version, immutableFiles)

// Cleanup locally deleted files from the localFilesToDfsFiles map
// Locally, SST Files can be deleted due to RocksDB compaction. These files need
Expand Down Expand Up @@ -570,7 +589,7 @@ class RocksDBFileManager(
.foreach { existingFile =>
val existingFileSize = existingFile.length()
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
val prevDfsFile = fileMappings.localFilesToDfsFiles.asScala.get(existingFile.getName)
val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName &&
existingFile.length() == requiredFile.get.sizeBytes
Expand All @@ -580,7 +599,7 @@ class RocksDBFileManager(

if (!isSameFile) {
existingFile.delete()
localFilesToDfsFiles.remove(existingFile.getName)
fileMappings.localFilesToDfsFiles.remove(existingFile.getName)
logInfo(log"Deleted local file ${MDC(LogKeys.FILE_NAME, existingFile)} " +
log"with size ${MDC(LogKeys.NUM_BYTES, existingFileSize)} mapped" +
log" to previous dfsFile ${MDC(LogKeys.DFS_FILE, prevDfsFile.getOrElse("null"))}")
Expand Down Expand Up @@ -612,7 +631,7 @@ class RocksDBFileManager(
}
filesCopied += 1
bytesCopied += localFileSize
localFilesToDfsFiles.put(localFileName, file)
fileMappings.localFilesToDfsFiles.put(localFileName, file)
logInfo(log"Copied ${MDC(LogKeys.DFS_FILE, dfsFile)} to " +
log"${MDC(LogKeys.FILE_NAME, localFile)} - " +
log"${MDC(LogKeys.NUM_BYTES, localFileSize)} bytes")
Expand All @@ -633,13 +652,13 @@ class RocksDBFileManager(
private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = {
// clean up deleted SST files from the localFilesToDfsFiles Map
val currentLocalFiles = localFiles.map(_.getName).toSet
val mappingsToClean = localFilesToDfsFiles.asScala
val mappingsToClean = fileMappings.localFilesToDfsFiles.asScala
.keys
.filterNot(currentLocalFiles.contains)

mappingsToClean.foreach { f =>
logInfo(log"cleaning ${MDC(LogKeys.FILE_NAME, f)} from the localFilesToDfsFiles map")
localFilesToDfsFiles.remove(f)
fileMappings.localFilesToDfsFiles.remove(f)
}
}

Expand Down Expand Up @@ -749,6 +768,20 @@ class RocksDBFileManager(
}
}

/**
* Track file mappings in RocksDB across local and remote directories
* @param versionToRocksDBFiles Mapping of RocksDB files used across versions for maintenance
* @param localFilesToDfsFiles Mapping of the exact Dfs file used to create a local SST file
* The reason localFilesToDfsFiles is a separate map because versionToRocksDBFiles can contain
* multiple similar SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst
* in v1 and 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID
* compatibility across SST files and RocksDB manifest.
*/

case class RocksDBFileMappings(
versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2 more spaces

localFilesToDfsFiles: ConcurrentHashMap[String, RocksDBImmutableFile])

/**
* Metrics regarding RocksDB file sync between local and DFS.
*/
Expand Down
Loading