Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.{mutable, Map}
import scala.collection.mutable.ListBuffer
import scala.ref.WeakReference
import scala.util.Try

Expand Down Expand Up @@ -57,14 +58,19 @@ class RocksDB(
hadoopConf: Configuration = new Configuration,
loggingId: String = "") extends Logging {

case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) {
case class RocksDBSnapshot(
checkpointDir: File,
version: Long,
numKeys: Long,
capturedFileMappings: RocksDBFileMappings) {
def close(): Unit = {
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
}
}

@volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
@volatile private var lastSnapshotVersion = 0L
private val oldSnapshots = new ListBuffer[RocksDBSnapshot]

RocksDBLoader.loadLibrary()

Expand Down Expand Up @@ -148,6 +154,9 @@ class RocksDB(
try {
if (loadedVersion != version) {
closeDB()
// deep copy is needed to avoid race condition
// between maintenance and task threads
fileManager.copyFileMapping()
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion
Expand All @@ -156,7 +165,6 @@ class RocksDB(
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

Expand Down Expand Up @@ -368,10 +376,17 @@ class RocksDB(
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
// during state store maintenance.
latestSnapshot.foreach(_.close())
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
lastSnapshotVersion = newVersion
synchronized {
if (latestSnapshot.isDefined) {
oldSnapshots += latestSnapshot.get
}
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir,
newVersion,
numKeysOnWritingVersion,
fileManager.captureFileMapReference()))
lastSnapshotVersion = newVersion
}
}
}

Expand Down Expand Up @@ -421,22 +436,34 @@ class RocksDB(
}

private def uploadSnapshot(): Unit = {
var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil
val localCheckpoint = synchronized {
val checkpoint = latestSnapshot
latestSnapshot = None

// Convert mutable list buffer to immutable to prevent
// race condition with commit where old snapshot is added
oldSnapshotsImmutable = oldSnapshots.toList
oldSnapshots.clear()

checkpoint
}
localCheckpoint match {
case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
case Some(RocksDBSnapshot(localDir, version, numKeys, capturedFileMappings)) =>
try {
val uploadTime = timeTakenMs {
fileManager.saveCheckpointToDfs(localDir, version, numKeys)
fileManager.saveCheckpointToDfs(localDir, version, numKeys, capturedFileMappings)
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
}
logInfo(s"$loggingId: Upload snapshot of version $version," +
s" time taken: $uploadTime ms")
} finally {
localCheckpoint.foreach(_.close())

// Clean up old latestSnapshots
for (snapshot <- oldSnapshotsImmutable) {
snapshot.close()
}
}
case _ =>
}
Expand All @@ -457,20 +484,7 @@ class RocksDB(

def doMaintenance(): Unit = {
if (enableChangelogCheckpointing) {
// There is race to update latestSnapshot between load(), commit()
// and uploadSnapshot().
// The load method will reset latestSnapshot to discard any snapshots taken
// from newer versions (when a old version is reloaded).
// commit() method deletes the existing snapshot while creating a new snapshot.
// In order to ensure that the snapshot being uploaded would not be modified
// concurrently, we need to synchronize the snapshot access between task thread
// and maintenance thread.
acquire()
try {
uploadSnapshot()
} finally {
release()
}
uploadSnapshot()
}
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,6 @@ class RocksDBFileManager(

import RocksDBImmutableFile._

private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]


// used to keep a mapping of the exact Dfs file that was used to create a local SST file.
// The reason this is a separate map because versionToRocksDBFiles can contain multiple similar
// SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and
// 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility
// across SST files and RocksDB manifest.
private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]

private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
private val onlyZipFiles = new PathFilter {
Expand All @@ -154,6 +144,30 @@ class RocksDBFileManager(

private def codec = CompressionCodec.createCodec(sparkConf, codecName)

@volatile private var fileMappings = RocksDBFileMappings(
new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
new ConcurrentHashMap[String, RocksDBImmutableFile]
)

/**
* Make a deep copy of versionToRocksDBFiles and localFilesToDfsFiles to avoid
* current task thread from overwriting the file mapping whenever background maintenance
* thread attempts to upload a snapshot
*/
def copyFileMapping() : Unit = {
val newVersionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
val newLocalFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]

newVersionToRocksDBFiles.putAll(fileMappings.versionToRocksDBFiles)
newLocalFilesToDfsFiles.putAll(fileMappings.localFilesToDfsFiles)

fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles, newLocalFilesToDfsFiles)
}

def captureFileMapReference(): RocksDBFileMappings = {
fileMappings
}

def getChangeLogWriter(version: Long): StateStoreChangelogWriter = {
val rootDir = new Path(dfsRootDir)
val changelogFile = dfsChangelogFile(version)
Expand Down Expand Up @@ -185,10 +199,14 @@ class RocksDBFileManager(
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics

/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
def saveCheckpointToDfs(
checkpointDir: File,
version: Long,
numKeys: Long,
capturedFileMappings: RocksDBFileMappings): Unit = {
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, capturedFileMappings)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
Expand Down Expand Up @@ -219,10 +237,10 @@ class RocksDBFileManager(
// The unique ids of SST files are checked when opening a rocksdb instance. The SST files
// in larger versions can't be reused even if they have the same size and name because
// they belong to another rocksdb instance.
versionToRocksDBFiles.keySet().removeIf(_ >= version)
fileMappings.versionToRocksDBFiles.keySet().removeIf(_ >= version)
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localFilesToDfsFiles.clear()
fileMappings.localFilesToDfsFiles.clear()
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
Expand All @@ -235,7 +253,7 @@ class RocksDBFileManager(
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}")
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
versionToRocksDBFiles.put(version, metadata.immutableFiles)
fileMappings.versionToRocksDBFiles.put(version, metadata.immutableFiles)
metadataFile.delete()
metadata
}
Expand Down Expand Up @@ -389,9 +407,9 @@ class RocksDBFileManager(
// Resolve RocksDB files for all the versions and find the max version each file is used
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
sortedSnapshotVersions.foreach { version =>
val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse {
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
versionToRocksDBFiles.put(version, newResolvedFiles)
fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
Expand Down Expand Up @@ -436,7 +454,7 @@ class RocksDBFileManager(
val versionFile = dfsBatchZipFile(version)
try {
fm.delete(versionFile)
versionToRocksDBFiles.remove(version)
fileMappings.versionToRocksDBFiles.remove(version)
logDebug(s"Deleted version $version")
} catch {
case e: Exception =>
Expand All @@ -455,7 +473,8 @@ class RocksDBFileManager(
/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
localFiles: Seq[File],
capturedFileMappings: RocksDBFileMappings): Seq[RocksDBImmutableFile] = {
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
Expand All @@ -465,7 +484,8 @@ class RocksDBFileManager(
var filesReused = 0L

val immutableFiles = localFiles.map { localFile =>
val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName)
val existingDfsFile =
capturedFileMappings.localFilesToDfsFiles.asScala.get(localFile.getName)
if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) {
val dfsFile = existingDfsFile.get
filesReused += 1
Expand All @@ -487,14 +507,14 @@ class RocksDBFileManager(
bytesCopied += localFileSize

val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
localFilesToDfsFiles.put(localFileName, immutableDfsFile)
capturedFileMappings.localFilesToDfsFiles.put(localFileName, immutableDfsFile)

immutableDfsFile
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)
capturedFileMappings.versionToRocksDBFiles.put(version, immutableFiles)

// Cleanup locally deleted files from the localFilesToDfsFiles map
// Locally, SST Files can be deleted due to RocksDB compaction. These files need
Expand Down Expand Up @@ -534,7 +554,7 @@ class RocksDBFileManager(
.foreach { existingFile =>
val existingFileSize = existingFile.length()
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
val prevDfsFile = fileMappings.localFilesToDfsFiles.asScala.get(existingFile.getName)
val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName &&
existingFile.length() == requiredFile.get.sizeBytes
Expand All @@ -544,7 +564,7 @@ class RocksDBFileManager(

if (!isSameFile) {
existingFile.delete()
localFilesToDfsFiles.remove(existingFile.getName)
fileMappings.localFilesToDfsFiles.remove(existingFile.getName)
logInfo(s"Deleted local file $existingFile with size $existingFileSize mapped" +
s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
} else {
Expand Down Expand Up @@ -574,7 +594,7 @@ class RocksDBFileManager(
}
filesCopied += 1
bytesCopied += localFileSize
localFilesToDfsFiles.put(localFileName, file)
fileMappings.localFilesToDfsFiles.put(localFileName, file)
logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
} else {
filesReused += 1
Expand All @@ -592,13 +612,13 @@ class RocksDBFileManager(
private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = {
// clean up deleted SST files from the localFilesToDfsFiles Map
val currentLocalFiles = localFiles.map(_.getName).toSet
val mappingsToClean = localFilesToDfsFiles.asScala
val mappingsToClean = fileMappings.localFilesToDfsFiles.asScala
.keys
.filterNot(currentLocalFiles.contains)

mappingsToClean.foreach { f =>
logInfo(s"cleaning $f from the localFilesToDfsFiles map")
localFilesToDfsFiles.remove(f)
fileMappings.localFilesToDfsFiles.remove(f)
}
}

Expand Down Expand Up @@ -705,6 +725,20 @@ class RocksDBFileManager(
}
}

/**
* Track file mappings in RocksDB across local and remote directories
* @param versionToRocksDBFiles Mapping of RocksDB files used across versions for maintenance
* @param localFilesToDfsFiles Mapping of the exact Dfs file used to create a local SST file
* The reason localFilesToDfsFiles is a separate map because versionToRocksDBFiles can contain
* multiple similar SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst
* in v1 and 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID
* compatibility across SST files and RocksDB manifest.
*/

case class RocksDBFileMappings(
versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
localFilesToDfsFiles: ConcurrentHashMap[String, RocksDBImmutableFile])

/**
* Metrics regarding RocksDB file sync between local and DFS.
*/
Expand Down
Loading