Skip to content

Commit 9a71c4c

Browse files
HeartSaVioRgengliangwang
authored andcommitted
[SPARK-36619][SS] Fix bugs around prefix-scan for HDFS backed state store and RocksDB state store
### What changes were proposed in this pull request? This PR proposes to fix bugs around prefix-scan for both HDFS backed state store and RocksDB state store. > HDFS backed state store We did "shallow-copy" on copying prefix map, which leads the values of prefix map (mutable Set) to be "same instances" across multiple versions. This PR fixes it via creating a new mutable Set and copying elements. > RocksDB state store Prefix-scan iterators are only closed on RocksDB.rollback(), which is only called in RocksDBStateStore.abort(). While `RocksDBStateStore.abort()` method will be called for streaming session window (since it has two physical plans for read and write), other stateful operators which only have read-write physical plan will call either commit or abort, and don't close the iterators on committing. These unclosed iterators can be "reused" and produce incorrect outputs. This PR ensures that resetting prefix-scan iterators is done on loading RocksDB, which was only done in rollback. ### Why are the changes needed? Please refer the above section on explanation of bugs and treatments. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified UT which failed without this PR and passes with this PR. Closes apache#33870 from HeartSaVioR/SPARK-36619. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Gengliang Wang <[email protected]> (cherry picked from commit 60a72c9) Signed-off-by: Gengliang Wang <[email protected]>
1 parent 8be53c3 commit 9a71c4c

File tree

3 files changed

+60
-18
lines changed

3 files changed

+60
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,14 @@ class PrefixScannableHDFSBackedStateStoreMap(
129129
other match {
130130
case o: PrefixScannableHDFSBackedStateStoreMap =>
131131
map.putAll(o.map)
132-
prefixKeyToKeysMap.putAll(o.prefixKeyToKeysMap)
132+
o.prefixKeyToKeysMap.asScala.foreach { case (prefixKey, keySet) =>
133+
// Here we create a copy version of Set. Shallow-copying the prefix key map will lead
134+
// two maps having the same Set "instances" for values, meaning modifying the prefix map
135+
// on newer version will also affect on the prefix map on older version.
136+
val newSet = new mutable.HashSet[UnsafeRow]()
137+
newSet ++= keySet
138+
prefixKeyToKeysMap.put(prefixKey, newSet)
139+
}
133140

134141
case _ => other.iterator().foreach { pair => put(pair.key, pair.value) }
135142
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ class RocksDB(
120120
if (conf.resetStatsOnLoad) {
121121
nativeStats.reset
122122
}
123+
// reset resources to prevent side-effects from previous loaded version
124+
closePrefixScanIterators()
123125
writeBatch.clear()
124126
logInfo(s"Loaded $version")
125127
} catch {
@@ -290,8 +292,7 @@ class RocksDB(
290292
* Drop uncommitted changes, and roll back to previous version.
291293
*/
292294
def rollback(): Unit = {
293-
prefixScanReuseIter.entrySet().asScala.foreach(_.getValue.close())
294-
prefixScanReuseIter.clear()
295+
closePrefixScanIterators()
295296
writeBatch.clear()
296297
numKeysOnWritingVersion = numKeysOnLoadedVersion
297298
release()
@@ -307,8 +308,7 @@ class RocksDB(
307308

308309
/** Release all resources */
309310
def close(): Unit = {
310-
prefixScanReuseIter.entrySet().asScala.foreach(_.getValue.close())
311-
prefixScanReuseIter.clear()
311+
closePrefixScanIterators()
312312
try {
313313
closeDB()
314314

@@ -411,6 +411,11 @@ class RocksDB(
411411
acquireLock.notifyAll()
412412
}
413413

414+
private def closePrefixScanIterators(): Unit = {
415+
prefixScanReuseIter.entrySet().asScala.foreach(_.getValue.close())
416+
prefixScanReuseIter.clear()
417+
}
418+
414419
private def getDBProperty(property: String): Long = {
415420
db.getProperty(property).toLong
416421
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -803,27 +803,57 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
803803
// Verify state before starting a new set of updates
804804
assert(getLatestData(provider).isEmpty)
805805

806-
val store = provider.getStore(0)
806+
var store = provider.getStore(0)
807807

808-
val key1 = Seq("a", "b", "c")
809-
val key2 = Seq(1, 2, 3)
810-
val keys = for (k1 <- key1; k2 <- key2) yield (k1, k2)
808+
def putCompositeKeys(keys: Seq[(String, Int)]): Unit = {
809+
val randomizedKeys = scala.util.Random.shuffle(keys.toList)
810+
randomizedKeys.foreach { case (key1, key2) =>
811+
put(store, key1, key2, key2)
812+
}
813+
}
811814

812-
val randomizedKeys = scala.util.Random.shuffle(keys.toList)
815+
def verifyScan(key1: Seq[String], key2: Seq[Int]): Unit = {
816+
key1.foreach { k1 =>
817+
val keyValueSet = store.prefixScan(dataToPrefixKeyRow(k1)).map { pair =>
818+
rowPairToDataPair(pair.withRows(pair.key.copy(), pair.value.copy()))
819+
}.toSet
813820

814-
randomizedKeys.foreach { case (key1, key2) =>
815-
put(store, key1, key2, key2)
821+
assert(keyValueSet === key2.map(k2 => ((k1, k2), k2)).toSet)
822+
}
816823
}
817824

818-
key1.foreach { k1 =>
819-
val keyValueSet = store.prefixScan(dataToPrefixKeyRow(k1)).map { pair =>
820-
rowPairToDataPair(pair.withRows(pair.key.copy(), pair.value.copy()))
821-
}.toSet
825+
val key1AtVersion0 = Seq("a", "b", "c")
826+
val key2AtVersion0 = Seq(1, 2, 3)
827+
val keysAtVersion0 = for (k1 <- key1AtVersion0; k2 <- key2AtVersion0) yield (k1, k2)
822828

823-
assert(keyValueSet === key2.map(k2 => ((k1, k2), k2)).toSet)
824-
}
829+
putCompositeKeys(keysAtVersion0)
830+
verifyScan(key1AtVersion0, key2AtVersion0)
825831

826832
assert(store.prefixScan(dataToPrefixKeyRow("non-exist")).isEmpty)
833+
834+
// committing and loading the version 1 (the version being committed)
835+
store.commit()
836+
store = provider.getStore(1)
837+
838+
// before putting the new key-value pairs, verify prefix scan works for existing keys
839+
verifyScan(key1AtVersion0, key2AtVersion0)
840+
841+
val key1AtVersion1 = Seq("c", "d")
842+
val key2AtVersion1 = Seq(4, 5, 6)
843+
val keysAtVersion1 = for (k1 <- key1AtVersion1; k2 <- key2AtVersion1) yield (k1, k2)
844+
845+
// put a new key-value pairs, and verify that prefix scan reflects the changes
846+
putCompositeKeys(keysAtVersion1)
847+
verifyScan(Seq("c"), Seq(1, 2, 3, 4, 5, 6))
848+
verifyScan(Seq("d"), Seq(4, 5, 6))
849+
850+
// aborting and loading the version 1 again (keysAtVersion1 should be rolled back)
851+
store.abort()
852+
store = provider.getStore(1)
853+
854+
// prefix scan should not reflect the uncommitted changes
855+
verifyScan(key1AtVersion0, key2AtVersion0)
856+
verifyScan(Seq("d"), Seq.empty)
827857
}
828858

829859
testWithAllCodec("numKeys metrics") {

0 commit comments

Comments
 (0)