Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tests pass
  • Loading branch information
ericm-db committed Jul 9, 2024
commit 99609ee6d561009835e861d33b8828bca0f5d712
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case class StateStoreMetadataV2(
numColsPrefixKey: Int,
numPartitions: Int,
stateSchemaFilePath: String)
extends StateStoreMetadata
extends StateStoreMetadata with Serializable

object StateStoreMetadataV2 {
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
Expand All @@ -60,14 +60,14 @@ object StateStoreMetadataV2 {
/**
* Information about a stateful operator.
*/
trait OperatorInfo {
trait OperatorInfo extends Serializable {
def operatorId: Long
def operatorName: String
}

case class OperatorInfoV1(operatorId: Long, operatorName: String) extends OperatorInfo

trait OperatorStateMetadata {
trait OperatorStateMetadata extends Serializable {
def version: Int

def operatorInfo: OperatorInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,31 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

test("transformWithState - verify OperatorStateMetadataV2 serialization and deserialization" +
" works") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
withTempDir { checkpointDir =>
val metadata = OperatorStateMetadataV2(
OperatorInfoV1(0, "transformWithStateExec"),
Array(StateStoreMetadataV2("default", 0, 0, "path")),
"""{"timeMode":"NoTime","outputMode":"Update"}""")
val metadataLog = new OperatorStateMetadataLog(
spark.sessionState.newHadoopConf(),
checkpointDir.getCanonicalPath)
metadataLog.add(0, metadata)
assert(metadataLog.get(0).isDefined)
// assert that each of the fields are the same
val metadataV2 = metadataLog.get(0).get.asInstanceOf[OperatorStateMetadataV2]
assert(metadataV2.operatorInfo == metadata.operatorInfo)
assert(metadataV2.stateStoreInfo.sameElements(metadata.stateStoreInfo))
assert(metadataV2.operatorPropertiesJson == metadata.operatorPropertiesJson)
}
}
}

test("transformWithState - verify StateSchemaV3 serialization and deserialization" +
" works with one batch") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
Expand Down