diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 088242b4246e..e1b6e34639a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy import scala.collection.{mutable, Map} -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.jdk.CollectionConverters._ import scala.ref.WeakReference import scala.util.Try @@ -74,7 +74,11 @@ class RocksDB( loggingId: String = "", useColumnFamilies: Boolean = false) extends Logging { - case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) { + case class RocksDBSnapshot( + checkpointDir: File, + version: Long, + numKeys: Long, + capturedFileMappings: RocksDBFileMappings) { def close(): Unit = { silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version") } @@ -82,6 +86,7 @@ class RocksDB( @volatile private var latestSnapshot: Option[RocksDBSnapshot] = None @volatile private var lastSnapshotVersion = 0L + private val oldSnapshots = new ListBuffer[RocksDBSnapshot] RocksDBLoader.loadLibrary() @@ -181,6 +186,9 @@ class RocksDB( try { if (loadedVersion != version) { closeDB() + // deep copy is needed to avoid race condition + // between maintenance and task threads + fileManager.copyFileMapping() val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version) val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) loadedVersion = latestSnapshotVersion @@ -189,7 +197,6 @@ class RocksDB( if (lastSnapshotVersion > latestSnapshotVersion) { // discard any newer snapshots lastSnapshotVersion = 0L - latestSnapshot = None } openDB() @@ -588,10 +595,17 @@ class RocksDB( // inside the uploadSnapshot() called below. // If changelog checkpointing is enabled, snapshot will be uploaded asynchronously // during state store maintenance. - latestSnapshot.foreach(_.close()) - latestSnapshot = Some( - RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion)) - lastSnapshotVersion = newVersion + synchronized { + if (latestSnapshot.isDefined) { + oldSnapshots += latestSnapshot.get + } + latestSnapshot = Some( + RocksDBSnapshot(checkpointDir, + newVersion, + numKeysOnWritingVersion, + fileManager.captureFileMapReference())) + lastSnapshotVersion = newVersion + } } } @@ -643,16 +657,23 @@ class RocksDB( } private def uploadSnapshot(): Unit = { + var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil val localCheckpoint = synchronized { val checkpoint = latestSnapshot latestSnapshot = None + + // Convert mutable list buffer to immutable to prevent + // race condition with commit where old snapshot is added + oldSnapshotsImmutable = oldSnapshots.toList + oldSnapshots.clear() + checkpoint } localCheckpoint match { - case Some(RocksDBSnapshot(localDir, version, numKeys)) => + case Some(RocksDBSnapshot(localDir, version, numKeys, capturedFileMappings)) => try { val uploadTime = timeTakenMs { - fileManager.saveCheckpointToDfs(localDir, version, numKeys) + fileManager.saveCheckpointToDfs(localDir, version, numKeys, capturedFileMappings) fileManagerMetrics = fileManager.latestSaveCheckpointMetrics } logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of version " + @@ -660,6 +681,12 @@ class RocksDB( log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms") } finally { localCheckpoint.foreach(_.close()) + + // Clean up old latestSnapshots + for (snapshot <- oldSnapshotsImmutable) { + snapshot.close() + } + } case _ => } @@ -681,20 +708,7 @@ class RocksDB( def doMaintenance(): Unit = { if (enableChangelogCheckpointing) { - // There is race to update latestSnapshot between load(), commit() - // and uploadSnapshot(). - // The load method will reset latestSnapshot to discard any snapshots taken - // from newer versions (when a old version is reloaded). - // commit() method deletes the existing snapshot while creating a new snapshot. - // In order to ensure that the snapshot being uploaded would not be modified - // concurrently, we need to synchronize the snapshot access between task thread - // and maintenance thread. - acquire(StoreMaintenance) - try { - uploadSnapshot() - } finally { - release(StoreMaintenance) - } + uploadSnapshot() } val cleanupTime = timeTakenMs { fileManager.deleteOldVersions(conf.minVersionsToRetain) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 0a460ece2400..402dddda1480 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -133,16 +133,6 @@ class RocksDBFileManager( import RocksDBImmutableFile._ - private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] - - - // used to keep a mapping of the exact Dfs file that was used to create a local SST file. - // The reason this is a separate map because versionToRocksDBFiles can contain multiple similar - // SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and - // 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility - // across SST files and RocksDB manifest. - private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile] - private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf) private val onlyZipFiles = new PathFilter { @@ -157,6 +147,29 @@ class RocksDBFileManager( private def codec = CompressionCodec.createCodec(sparkConf, codecName) @volatile private var rootDirChecked: Boolean = false + @volatile private var fileMappings = RocksDBFileMappings( + new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]], + new ConcurrentHashMap[String, RocksDBImmutableFile] + ) + + /** + * Make a deep copy of versionToRocksDBFiles and localFilesToDfsFiles to avoid + * current task thread from overwriting the file mapping whenever background maintenance + * thread attempts to upload a snapshot + */ + def copyFileMapping() : Unit = { + val newVersionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] + val newLocalFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile] + + newVersionToRocksDBFiles.putAll(fileMappings.versionToRocksDBFiles) + newLocalFilesToDfsFiles.putAll(fileMappings.localFilesToDfsFiles) + + fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles, newLocalFilesToDfsFiles) + } + + def captureFileMapReference(): RocksDBFileMappings = { + fileMappings + } def getChangeLogWriter( version: Long, @@ -204,11 +217,15 @@ class RocksDBFileManager( def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics /** Save all the files in given local checkpoint directory as a committed version in DFS */ - def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { + def saveCheckpointToDfs( + checkpointDir: File, + version: Long, + numKeys: Long, + capturedFileMappings: RocksDBFileMappings): Unit = { logFilesInDir(checkpointDir, log"Saving checkpoint files " + log"for version ${MDC(LogKeys.VERSION_NUM, version)}") val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir) - val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles) + val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, capturedFileMappings) val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys) val metadataFile = localMetadataFile(checkpointDir) metadata.writeToFile(metadataFile) @@ -243,10 +260,10 @@ class RocksDBFileManager( // The unique ids of SST files are checked when opening a rocksdb instance. The SST files // in larger versions can't be reused even if they have the same size and name because // they belong to another rocksdb instance. - versionToRocksDBFiles.keySet().removeIf(_ >= version) + fileMappings.versionToRocksDBFiles.keySet().removeIf(_ >= version) val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) - localFilesToDfsFiles.clear() + fileMappings.localFilesToDfsFiles.clear() localDir.mkdirs() RocksDBCheckpointMetadata(Seq.empty, 0) } else { @@ -260,7 +277,7 @@ class RocksDBFileManager( logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" + log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}") loadImmutableFilesFromDfs(metadata.immutableFiles, localDir) - versionToRocksDBFiles.put(version, metadata.immutableFiles) + fileMappings.versionToRocksDBFiles.put(version, metadata.immutableFiles) metadataFile.delete() metadata } @@ -417,9 +434,9 @@ class RocksDBFileManager( // Resolve RocksDB files for all the versions and find the max version each file is used val fileToMaxUsedVersion = new mutable.HashMap[String, Long] sortedSnapshotVersions.foreach { version => - val files = Option(versionToRocksDBFiles.get(version)).getOrElse { + val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse { val newResolvedFiles = getImmutableFilesFromVersionZip(version) - versionToRocksDBFiles.put(version, newResolvedFiles) + fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles) newResolvedFiles } files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) = @@ -466,7 +483,7 @@ class RocksDBFileManager( val versionFile = dfsBatchZipFile(version) try { fm.delete(versionFile) - versionToRocksDBFiles.remove(version) + fileMappings.versionToRocksDBFiles.remove(version) logDebug(s"Deleted version $version") } catch { case e: Exception => @@ -487,7 +504,8 @@ class RocksDBFileManager( /** Save immutable files to DFS directory */ private def saveImmutableFilesToDfs( version: Long, - localFiles: Seq[File]): Seq[RocksDBImmutableFile] = { + localFiles: Seq[File], + capturedFileMappings: RocksDBFileMappings): Seq[RocksDBImmutableFile] = { // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version logInfo(log"Saving RocksDB files to DFS for ${MDC(LogKeys.VERSION_NUM, version)}") @@ -497,7 +515,8 @@ class RocksDBFileManager( var filesReused = 0L val immutableFiles = localFiles.map { localFile => - val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName) + val existingDfsFile = + capturedFileMappings.localFilesToDfsFiles.asScala.get(localFile.getName) if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) { val dfsFile = existingDfsFile.get filesReused += 1 @@ -521,7 +540,7 @@ class RocksDBFileManager( bytesCopied += localFileSize val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) - localFilesToDfsFiles.put(localFileName, immutableDfsFile) + capturedFileMappings.localFilesToDfsFiles.put(localFileName, immutableDfsFile) immutableDfsFile } @@ -530,7 +549,7 @@ class RocksDBFileManager( log"(${MDC(LogKeys.NUM_BYTES, bytesCopied)} bytes) from local to" + log" DFS for version ${MDC(LogKeys.VERSION_NUM, version)}. " + log"${MDC(LogKeys.NUM_FILES_REUSED, filesReused)} files reused without copying.") - versionToRocksDBFiles.put(version, immutableFiles) + capturedFileMappings.versionToRocksDBFiles.put(version, immutableFiles) // Cleanup locally deleted files from the localFilesToDfsFiles map // Locally, SST Files can be deleted due to RocksDB compaction. These files need @@ -570,7 +589,7 @@ class RocksDBFileManager( .foreach { existingFile => val existingFileSize = existingFile.length() val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName) - val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName) + val prevDfsFile = fileMappings.localFilesToDfsFiles.asScala.get(existingFile.getName) val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) { requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName && existingFile.length() == requiredFile.get.sizeBytes @@ -580,7 +599,7 @@ class RocksDBFileManager( if (!isSameFile) { existingFile.delete() - localFilesToDfsFiles.remove(existingFile.getName) + fileMappings.localFilesToDfsFiles.remove(existingFile.getName) logInfo(log"Deleted local file ${MDC(LogKeys.FILE_NAME, existingFile)} " + log"with size ${MDC(LogKeys.NUM_BYTES, existingFileSize)} mapped" + log" to previous dfsFile ${MDC(LogKeys.DFS_FILE, prevDfsFile.getOrElse("null"))}") @@ -612,7 +631,7 @@ class RocksDBFileManager( } filesCopied += 1 bytesCopied += localFileSize - localFilesToDfsFiles.put(localFileName, file) + fileMappings.localFilesToDfsFiles.put(localFileName, file) logInfo(log"Copied ${MDC(LogKeys.DFS_FILE, dfsFile)} to " + log"${MDC(LogKeys.FILE_NAME, localFile)} - " + log"${MDC(LogKeys.NUM_BYTES, localFileSize)} bytes") @@ -633,13 +652,13 @@ class RocksDBFileManager( private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = { // clean up deleted SST files from the localFilesToDfsFiles Map val currentLocalFiles = localFiles.map(_.getName).toSet - val mappingsToClean = localFilesToDfsFiles.asScala + val mappingsToClean = fileMappings.localFilesToDfsFiles.asScala .keys .filterNot(currentLocalFiles.contains) mappingsToClean.foreach { f => logInfo(log"cleaning ${MDC(LogKeys.FILE_NAME, f)} from the localFilesToDfsFiles map") - localFilesToDfsFiles.remove(f) + fileMappings.localFilesToDfsFiles.remove(f) } } @@ -749,6 +768,20 @@ class RocksDBFileManager( } } +/** + * Track file mappings in RocksDB across local and remote directories + * @param versionToRocksDBFiles Mapping of RocksDB files used across versions for maintenance + * @param localFilesToDfsFiles Mapping of the exact Dfs file used to create a local SST file + * The reason localFilesToDfsFiles is a separate map because versionToRocksDBFiles can contain + * multiple similar SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst + * in v1 and 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID + * compatibility across SST files and RocksDB manifest. + */ + +case class RocksDBFileMappings( + versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]], + localFilesToDfsFiles: ConcurrentHashMap[String, RocksDBImmutableFile]) + /** * Metrics regarding RocksDB file sync between local and DFS. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 6086fd43846f..ea54fb997ca2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ import java.nio.charset.Charset +import java.util.concurrent.Executors import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ import scala.language.implicitConversions import org.apache.commons.io.FileUtils @@ -874,6 +877,41 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared ) } + testWithChangelogCheckpointingEnabled("RocksDBFileManager: " + + "background snapshot upload doesn't acquire RocksDB instance lock") { + // Create a custom ExecutionContext + implicit val ec: ExecutionContext = ExecutionContext + .fromExecutor(Executors.newSingleThreadExecutor()) + + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(lockAcquireTimeoutMs = 10000, minDeltasForSnapshot = 0) + new File(remoteDir).delete() // to make sure that the directory gets created + + withDB(remoteDir, conf = conf) { db => + db.load(0) + db.put("0", "0") + db.commit() + + // Acquire lock + db.load(1) + db.put("1", "1") + + // Run doMaintenance in another thread + val maintenanceFuture = Future { + db.doMaintenance() + } + + val timeout = 5.seconds + + // Ensure that maintenance task runs without being blocked by task thread + ThreadUtils.awaitResult(maintenanceFuture, timeout) + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + + // Release lock + db.commit() + } + } + testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") { val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") val fileManager = new RocksDBFileManager( @@ -1941,7 +1979,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } testWithChangelogCheckpointingEnabled("time travel 4 -" + - " validate successful RocksDB load") { + " validate successful RocksDB load when metadata file is overwritten") { val remoteDir = Utils.createTempDir().toString val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false) new File(remoteDir).delete() // to make sure that the directory gets created @@ -1956,8 +1994,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.load(1) db.put("3", "3") - // do maintenance - upload any latest snapshots so far - // would fail to acquire lock and no snapshots would be uploaded + // upload any latest snapshots so far db.doMaintenance() db.commit() // upload newly created snapshot 2.zip @@ -1969,6 +2006,47 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("time travel 5 -" + + "validate successful RocksDB load when metadata file is not overwritten") { + // Ensure commit doesn't modify the latestSnapshot that doMaintenance will upload + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "NoOverwriteFileSystemBasedCheckpointFileManager" + withTempDir { dir => + val conf = dbConf.copy(minDeltasForSnapshot = 0) // create snapshot every commit + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db => + db.load(0) + db.put("a", "1") + db.commit() + + // load previous version, and recreate the snapshot + db.load(0) + db.put("a", "1") + + // upload version 1 snapshot created above + db.doMaintenance() + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + + db.commit() // create snapshot again + + // load version 1 - should succeed + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + } + + // upload recently created snapshot + db.doMaintenance() + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + + // load version 1 again - should succeed + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + } + } + } + } + test("validate Rocks DB SST files do not have a VersionIdMismatch" + " when metadata file is not overwritten - scenario 1") { val fmClass = "org.apache.spark.sql.execution.streaming.state." + @@ -2268,7 +2346,11 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared numKeys: Int): Unit = { val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to create checkpoints generateFiles(checkpointDir, fileToLengths) - fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys) + fileManager.saveCheckpointToDfs( + checkpointDir, + version, + numKeys, + fileManager.captureFileMapReference()) } def loadAndVerifyCheckpointFiles(