diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 301d978c9038..6c0447e1a325 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy import scala.collection.{mutable, Map} +import scala.collection.mutable.ListBuffer import scala.ref.WeakReference import scala.util.Try @@ -57,7 +58,11 @@ class RocksDB( hadoopConf: Configuration = new Configuration, loggingId: String = "") extends Logging { - case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) { + case class RocksDBSnapshot( + checkpointDir: File, + version: Long, + numKeys: Long, + capturedFileMappings: RocksDBFileMappings) { def close(): Unit = { silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version") } @@ -65,6 +70,7 @@ class RocksDB( @volatile private var latestSnapshot: Option[RocksDBSnapshot] = None @volatile private var lastSnapshotVersion = 0L + private val oldSnapshots = new ListBuffer[RocksDBSnapshot] RocksDBLoader.loadLibrary() @@ -148,6 +154,9 @@ class RocksDB( try { if (loadedVersion != version) { closeDB() + // deep copy is needed to avoid race condition + // between maintenance and task threads + fileManager.copyFileMapping() val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version) val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) loadedVersion = latestSnapshotVersion @@ -156,7 +165,6 @@ class RocksDB( if (lastSnapshotVersion > latestSnapshotVersion) { // discard any newer snapshots lastSnapshotVersion = 0L - latestSnapshot = None } openDB() @@ -368,10 +376,17 @@ class RocksDB( // inside the uploadSnapshot() called below. // If changelog checkpointing is enabled, snapshot will be uploaded asynchronously // during state store maintenance. - latestSnapshot.foreach(_.close()) - latestSnapshot = Some( - RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion)) - lastSnapshotVersion = newVersion + synchronized { + if (latestSnapshot.isDefined) { + oldSnapshots += latestSnapshot.get + } + latestSnapshot = Some( + RocksDBSnapshot(checkpointDir, + newVersion, + numKeysOnWritingVersion, + fileManager.captureFileMapReference())) + lastSnapshotVersion = newVersion + } } } @@ -421,22 +436,34 @@ class RocksDB( } private def uploadSnapshot(): Unit = { + var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil val localCheckpoint = synchronized { val checkpoint = latestSnapshot latestSnapshot = None + + // Convert mutable list buffer to immutable to prevent + // race condition with commit where old snapshot is added + oldSnapshotsImmutable = oldSnapshots.toList + oldSnapshots.clear() + checkpoint } localCheckpoint match { - case Some(RocksDBSnapshot(localDir, version, numKeys)) => + case Some(RocksDBSnapshot(localDir, version, numKeys, capturedFileMappings)) => try { val uploadTime = timeTakenMs { - fileManager.saveCheckpointToDfs(localDir, version, numKeys) + fileManager.saveCheckpointToDfs(localDir, version, numKeys, capturedFileMappings) fileManagerMetrics = fileManager.latestSaveCheckpointMetrics } logInfo(s"$loggingId: Upload snapshot of version $version," + s" time taken: $uploadTime ms") } finally { localCheckpoint.foreach(_.close()) + + // Clean up old latestSnapshots + for (snapshot <- oldSnapshotsImmutable) { + snapshot.close() + } } case _ => } @@ -457,20 +484,7 @@ class RocksDB( def doMaintenance(): Unit = { if (enableChangelogCheckpointing) { - // There is race to update latestSnapshot between load(), commit() - // and uploadSnapshot(). - // The load method will reset latestSnapshot to discard any snapshots taken - // from newer versions (when a old version is reloaded). - // commit() method deletes the existing snapshot while creating a new snapshot. - // In order to ensure that the snapshot being uploaded would not be modified - // concurrently, we need to synchronize the snapshot access between task thread - // and maintenance thread. - acquire() - try { - uploadSnapshot() - } finally { - release() - } + uploadSnapshot() } val cleanupTime = timeTakenMs { fileManager.deleteOldVersions(conf.minVersionsToRetain) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index c527a6a03ae9..b4fe3e22e888 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -131,16 +131,6 @@ class RocksDBFileManager( import RocksDBImmutableFile._ - private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] - - - // used to keep a mapping of the exact Dfs file that was used to create a local SST file. - // The reason this is a separate map because versionToRocksDBFiles can contain multiple similar - // SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and - // 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility - // across SST files and RocksDB manifest. - private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile] - private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf) private val onlyZipFiles = new PathFilter { @@ -154,6 +144,30 @@ class RocksDBFileManager( private def codec = CompressionCodec.createCodec(sparkConf, codecName) + @volatile private var fileMappings = RocksDBFileMappings( + new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]], + new ConcurrentHashMap[String, RocksDBImmutableFile] + ) + + /** + * Make a deep copy of versionToRocksDBFiles and localFilesToDfsFiles to avoid + * current task thread from overwriting the file mapping whenever background maintenance + * thread attempts to upload a snapshot + */ + def copyFileMapping() : Unit = { + val newVersionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] + val newLocalFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile] + + newVersionToRocksDBFiles.putAll(fileMappings.versionToRocksDBFiles) + newLocalFilesToDfsFiles.putAll(fileMappings.localFilesToDfsFiles) + + fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles, newLocalFilesToDfsFiles) + } + + def captureFileMapReference(): RocksDBFileMappings = { + fileMappings + } + def getChangeLogWriter(version: Long): StateStoreChangelogWriter = { val rootDir = new Path(dfsRootDir) val changelogFile = dfsChangelogFile(version) @@ -185,10 +199,14 @@ class RocksDBFileManager( def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics /** Save all the files in given local checkpoint directory as a committed version in DFS */ - def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { + def saveCheckpointToDfs( + checkpointDir: File, + version: Long, + numKeys: Long, + capturedFileMappings: RocksDBFileMappings): Unit = { logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version") val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir) - val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles) + val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, capturedFileMappings) val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys) val metadataFile = localMetadataFile(checkpointDir) metadata.writeToFile(metadataFile) @@ -219,10 +237,10 @@ class RocksDBFileManager( // The unique ids of SST files are checked when opening a rocksdb instance. The SST files // in larger versions can't be reused even if they have the same size and name because // they belong to another rocksdb instance. - versionToRocksDBFiles.keySet().removeIf(_ >= version) + fileMappings.versionToRocksDBFiles.keySet().removeIf(_ >= version) val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) - localFilesToDfsFiles.clear() + fileMappings.localFilesToDfsFiles.clear() localDir.mkdirs() RocksDBCheckpointMetadata(Seq.empty, 0) } else { @@ -235,7 +253,7 @@ class RocksDBFileManager( val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}") loadImmutableFilesFromDfs(metadata.immutableFiles, localDir) - versionToRocksDBFiles.put(version, metadata.immutableFiles) + fileMappings.versionToRocksDBFiles.put(version, metadata.immutableFiles) metadataFile.delete() metadata } @@ -389,9 +407,9 @@ class RocksDBFileManager( // Resolve RocksDB files for all the versions and find the max version each file is used val fileToMaxUsedVersion = new mutable.HashMap[String, Long] sortedSnapshotVersions.foreach { version => - val files = Option(versionToRocksDBFiles.get(version)).getOrElse { + val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse { val newResolvedFiles = getImmutableFilesFromVersionZip(version) - versionToRocksDBFiles.put(version, newResolvedFiles) + fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles) newResolvedFiles } files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) = @@ -436,7 +454,7 @@ class RocksDBFileManager( val versionFile = dfsBatchZipFile(version) try { fm.delete(versionFile) - versionToRocksDBFiles.remove(version) + fileMappings.versionToRocksDBFiles.remove(version) logDebug(s"Deleted version $version") } catch { case e: Exception => @@ -455,7 +473,8 @@ class RocksDBFileManager( /** Save immutable files to DFS directory */ private def saveImmutableFilesToDfs( version: Long, - localFiles: Seq[File]): Seq[RocksDBImmutableFile] = { + localFiles: Seq[File], + capturedFileMappings: RocksDBFileMappings): Seq[RocksDBImmutableFile] = { // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version logInfo(s"Saving RocksDB files to DFS for $version") @@ -465,7 +484,8 @@ class RocksDBFileManager( var filesReused = 0L val immutableFiles = localFiles.map { localFile => - val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName) + val existingDfsFile = + capturedFileMappings.localFilesToDfsFiles.asScala.get(localFile.getName) if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) { val dfsFile = existingDfsFile.get filesReused += 1 @@ -487,14 +507,14 @@ class RocksDBFileManager( bytesCopied += localFileSize val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) - localFilesToDfsFiles.put(localFileName, immutableDfsFile) + capturedFileMappings.localFilesToDfsFiles.put(localFileName, immutableDfsFile) immutableDfsFile } } logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" + s" DFS for version $version. $filesReused files reused without copying.") - versionToRocksDBFiles.put(version, immutableFiles) + capturedFileMappings.versionToRocksDBFiles.put(version, immutableFiles) // Cleanup locally deleted files from the localFilesToDfsFiles map // Locally, SST Files can be deleted due to RocksDB compaction. These files need @@ -534,7 +554,7 @@ class RocksDBFileManager( .foreach { existingFile => val existingFileSize = existingFile.length() val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName) - val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName) + val prevDfsFile = fileMappings.localFilesToDfsFiles.asScala.get(existingFile.getName) val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) { requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName && existingFile.length() == requiredFile.get.sizeBytes @@ -544,7 +564,7 @@ class RocksDBFileManager( if (!isSameFile) { existingFile.delete() - localFilesToDfsFiles.remove(existingFile.getName) + fileMappings.localFilesToDfsFiles.remove(existingFile.getName) logInfo(s"Deleted local file $existingFile with size $existingFileSize mapped" + s" to previous dfsFile ${prevDfsFile.getOrElse("null")}") } else { @@ -574,7 +594,7 @@ class RocksDBFileManager( } filesCopied += 1 bytesCopied += localFileSize - localFilesToDfsFiles.put(localFileName, file) + fileMappings.localFilesToDfsFiles.put(localFileName, file) logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes") } else { filesReused += 1 @@ -592,13 +612,13 @@ class RocksDBFileManager( private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = { // clean up deleted SST files from the localFilesToDfsFiles Map val currentLocalFiles = localFiles.map(_.getName).toSet - val mappingsToClean = localFilesToDfsFiles.asScala + val mappingsToClean = fileMappings.localFilesToDfsFiles.asScala .keys .filterNot(currentLocalFiles.contains) mappingsToClean.foreach { f => logInfo(s"cleaning $f from the localFilesToDfsFiles map") - localFilesToDfsFiles.remove(f) + fileMappings.localFilesToDfsFiles.remove(f) } } @@ -705,6 +725,20 @@ class RocksDBFileManager( } } +/** + * Track file mappings in RocksDB across local and remote directories + * @param versionToRocksDBFiles Mapping of RocksDB files used across versions for maintenance + * @param localFilesToDfsFiles Mapping of the exact Dfs file used to create a local SST file + * The reason localFilesToDfsFiles is a separate map because versionToRocksDBFiles can contain + * multiple similar SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst + * in v1 and 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID + * compatibility across SST files and RocksDB manifest. + */ + +case class RocksDBFileMappings( + versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]], + localFilesToDfsFiles: ConcurrentHashMap[String, RocksDBImmutableFile]) + /** * Metrics regarding RocksDB file sync between local and DFS. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 89b4925db707..973c1e0cb3b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ import java.nio.charset.Charset +import java.util.concurrent.Executors import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ import scala.language.implicitConversions import org.apache.commons.io.FileUtils @@ -477,6 +480,41 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("RocksDBFileManager: " + + "background snapshot upload doesn't acquire RocksDB instance lock") { + // Create a custom ExecutionContext + implicit val ec: ExecutionContext = ExecutionContext + .fromExecutor(Executors.newSingleThreadExecutor()) + + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(lockAcquireTimeoutMs = 10000, minDeltasForSnapshot = 0) + new File(remoteDir).delete() // to make sure that the directory gets created + + withDB(remoteDir, conf = conf) { db => + db.load(0) + db.put("0", "0") + db.commit() + + // Acquire lock + db.load(1) + db.put("1", "1") + + // Run doMaintenance in another thread + val maintenanceFuture = Future { + db.doMaintenance() + } + + val timeout = 5.seconds + + // Ensure that maintenance task runs without being blocked by task thread + ThreadUtils.awaitResult(maintenanceFuture, timeout) + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + + // Release lock + db.commit() + } + } + testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") { val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") val fileManager = new RocksDBFileManager( @@ -1290,7 +1328,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } testWithChangelogCheckpointingEnabled("time travel 4 -" + - " validate successful RocksDB load") { + " validate successful RocksDB load when metadata file is overwritten") { val remoteDir = Utils.createTempDir().toString val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false) new File(remoteDir).delete() // to make sure that the directory gets created @@ -1305,8 +1343,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.load(1) db.put("3", "3") - // do maintenance - upload any latest snapshots so far - // would fail to acquire lock and no snapshots would be uploaded + // upload any latest snapshots so far db.doMaintenance() db.commit() // upload newly created snapshot 2.zip @@ -1318,6 +1355,47 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("time travel 5 -" + + "validate successful RocksDB load when metadata file is not overwritten") { + // Ensure commit doesn't modify the latestSnapshot that doMaintenance will upload + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "NoOverwriteFileSystemBasedCheckpointFileManager" + withTempDir { dir => + val conf = dbConf.copy(minDeltasForSnapshot = 0) // create snapshot every commit + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db => + db.load(0) + db.put("a", "1") + db.commit() + + // load previous version, and recreate the snapshot + db.load(0) + db.put("a", "1") + + // upload version 1 snapshot created above + db.doMaintenance() + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + + db.commit() // create snapshot again + + // load version 1 - should succeed + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + } + + // upload recently created snapshot + db.doMaintenance() + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + + // load version 1 again - should succeed + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + } + } + } + } + test("validate Rocks DB SST files do not have a VersionIdMismatch" + " when metadata file is not overwritten - scenario 1") { val fmClass = "org.apache.spark.sql.execution.streaming.state." + @@ -1614,7 +1692,11 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared numKeys: Int): Unit = { val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to create checkpoints generateFiles(checkpointDir, fileToLengths) - fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys) + fileManager.saveCheckpointToDfs( + checkpointDir, + version, + numKeys, + fileManager.captureFileMapReference()) } def loadAndVerifyCheckpointFiles(