Skip to content
Prev Previous commit
Next Next commit
Fixed synchronization bug
  • Loading branch information
tdas committed Apr 13, 2018
commit ef05009e491d1ffdca2a37ba0441ea8507756e3d
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
verify(state == UPDATING, "Cannot commit after already committed or aborted")

try {
finalizeDeltaFile(compressedStream)
loadedMaps.put(newVersion, mapToUpdate)
commitUpdates(newVersion, mapToUpdate, compressedStream)
state = COMMITTED
logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile")
newVersion
Expand Down Expand Up @@ -250,6 +249,13 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit

private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)

private def commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit = {
synchronized {
finalizeDeltaFile(output)
loadedMaps.put(newVersion, map)
}
}

/**
* Get iterator of all the data of the latest version of the store.
* Note that this will look up the files to determined the latest known version.
Expand Down