Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert pr changes
  • Loading branch information
riyaverm-db committed Jun 25, 2024
commit 28451ca8c8f91f10eb6d67517c6d23ad72c88ced
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,12 @@ class RocksDB(
loadedVersion = latestSnapshotVersion

// reset last snapshot version
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
lastSnapshotVersion = 0L
// if (lastSnapshotVersion > latestSnapshotVersion) {
// // discard any newer snapshots
// lastSnapshotVersion = 0L
// latestSnapshot = None
// }
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
Expand Down Expand Up @@ -584,14 +585,16 @@ class RocksDB(
// background operations.
val cp = Checkpoint.create(db)
cp.createCheckpoint(checkpointDir.toString)
// if changelog checkpointing is disabled, the snapshot is uploaded synchronously
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
// during state store maintenance.
latestSnapshot.foreach(_.close())
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
lastSnapshotVersion = newVersion
synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we trying to synchronize updates between task thread/maintenance thread here? If so, we need a lock - just synchronized would not work. This code can only be executed by one thread anyways (task thread that holds the acquire lock - so synchronized does not do anything in my opinion).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the race condition you are thinking about? Is calling close() on a snapshot being uploaded? In that case we can defer the close to the maintenance thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed your comment @chaoqin-li1123. The race condition is basically assigning to latestSnapshot variable. We assign the value of variable to newly created snapshot here, and set the variable to None in maintenance thread. Both these need to be synchronized, I think.

It seems to have been fixed in the latest revision.

// if changelog checkpointing is disabled, the snapshot is uploaded synchronously
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
// during state store maintenance.
latestSnapshot.foreach(_.close())
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
lastSnapshotVersion = newVersion
}
}
}

Expand Down Expand Up @@ -681,20 +684,21 @@ class RocksDB(

def doMaintenance(): Unit = {
if (enableChangelogCheckpointing) {
// There is race to update latestSnapshot between load(), commit()
// and uploadSnapshot().
// The load method will reset latestSnapshot to discard any snapshots taken
// from newer versions (when a old version is reloaded).
// commit() method deletes the existing snapshot while creating a new snapshot.
// In order to ensure that the snapshot being uploaded would not be modified
// concurrently, we need to synchronize the snapshot access between task thread
// and maintenance thread.
acquire(StoreMaintenance)
try {
uploadSnapshot()
} finally {
release(StoreMaintenance)
}
uploadSnapshot()
// // There is race to update latestSnapshot between load(), commit()
// // and uploadSnapshot().
// // The load method will reset latestSnapshot to discard any snapshots taken
// // from newer versions (when a old version is reloaded).
// // commit() method deletes the existing snapshot while creating a new snapshot.
// // In order to ensure that the snapshot being uploaded would not be modified
// // concurrently, we need to synchronize the snapshot access between task thread
// // and maintenance thread.
// acquire(StoreMaintenance)
// try {
// uploadSnapshot()
// } finally {
// release(StoreMaintenance)
// }
}
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,50 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared

sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS, classOf[RocksDBStateStoreProvider].getName)

testWithColumnFamilies(
"MyRocksDB: reproduce mismatch condition",
TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
val remoteDir = Utils.createTempDir().toString
val conf = RocksDBConf().copy()
new File(remoteDir).delete() // to make sure that the directory gets created

withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
// Create version 1 with maintenance: branch A
db.put("a", "1")
db.commit() // trigger checkpointing, version 1 created

// v2
db.load(1)
db.put("b", "2")
db.commit() // trigger checkpointing, version 2 created

db.load(2)
db.doMaintenance() // upload snapshot

db.load(1) // load v1
db.put("c", "3")
db.commit() // trigger checkpointing, version 2 created again

db.doMaintenance() // try uploading snapshot for branch B again

db.load(2)

}

// Creates a divergent branch B
// withDB(remoteDir, version = 2, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
// // Emulate background process trying uploading snapshot from branch A
// db.doMaintenance()
// // Rollback to the previous version 1
// db.load(1)
// db.put("c", "3")
// db.commit() // trigger checkpointing, version 2 created again
// assert(db.getLatestVersion() === 2)
// assert(db.iterator().map(toStr).toSet === Set(("a", "1"), ("c", "3")))
// db.doMaintenance() // try uploading snapshot for branch B
// }
}

testWithColumnFamilies(
"RocksDB: check changelog and snapshot version",
TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
Expand Down