Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
removing ': Array[StateStoreMetadata]'
  • Loading branch information
ericm-db committed Jul 9, 2024
commit ef86e37d1f45573ad2d32f8a11f31b49c1124417
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,31 @@ class StateMetadataPartitionReader(
private[state] lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
allOperatorStateMetadata.flatMap { operatorStateMetadata =>
require(operatorStateMetadata.version == 1 || operatorStateMetadata.version == 2)
val operatorProperties = operatorStateMetadata match {
case _: OperatorStateMetadataV1 => ""
case v2: OperatorStateMetadataV2 => v2.operatorPropertiesJson
}
operatorStateMetadata.stateStoreInfo.map { stateStoreMetadata =>
StateMetadataTableEntry(operatorStateMetadata.operatorInfo.operatorId,
operatorStateMetadata.operatorInfo.operatorName,
stateStoreMetadata.storeName,
stateStoreMetadata.numPartitions,
if (batchIds.nonEmpty) batchIds.head else -1,
if (batchIds.nonEmpty) batchIds.last else -1,
operatorProperties,
stateStoreMetadata.numColsPrefixKey
)
operatorStateMetadata match {
case v1: OperatorStateMetadataV1 =>
v1.stateStoreInfo.map { stateStoreMetadata =>
StateMetadataTableEntry(v1.operatorInfo.operatorId,
v1.operatorInfo.operatorName,
stateStoreMetadata.storeName,
stateStoreMetadata.numPartitions,
if (batchIds.nonEmpty) batchIds.head else -1,
if (batchIds.nonEmpty) batchIds.last else -1,
"",
stateStoreMetadata.numColsPrefixKey
)
}
case v2: OperatorStateMetadataV2 =>
v2.stateStoreInfo.map { stateStoreMetadata =>
StateMetadataTableEntry(v2.operatorInfo.operatorId,
v2.operatorInfo.operatorName,
stateStoreMetadata.storeName,
stateStoreMetadata.numPartitions,
if (batchIds.nonEmpty) batchIds.head else -1,
if (batchIds.nonEmpty) batchIds.last else -1,
v2.operatorPropertiesJson,
stateStoreMetadata.numColsPrefixKey
)
}
}
}
}.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ case class StreamingSymmetricHashJoinExec(
override def operatorStateMetadata(): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo: Array[StateStoreMetadata] =
val stateStoreInfo =
stateStoreNames.map(StateStoreMetadataV1(_, 0, info.numPartitions)).toArray
OperatorStateMetadataV1(operatorInfo, stateStoreInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ case class TransformWithStateExec(
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
// stateSchemaFilePath should be populated at this point
assert(info.stateSchemaPath.isDefined)
val stateStoreInfo: Array[StateStoreMetadata] =
val stateStoreInfo =
Array(StateStoreMetadataV2(
StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions, info.stateSchemaPath.get))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,17 @@ trait OperatorStateMetadata {
def version: Int

def operatorInfo: OperatorInfo

def stateStoreInfo: Array[StateStoreMetadata]
}

case class OperatorStateMetadataV1(
operatorInfo: OperatorInfoV1,
stateStoreInfo: Array[StateStoreMetadata]) extends OperatorStateMetadata {
stateStoreInfo: Array[StateStoreMetadataV1]) extends OperatorStateMetadata {
override def version: Int = 1
}

case class OperatorStateMetadataV2(
operatorInfo: OperatorInfoV1,
stateStoreInfo: Array[StateStoreMetadata],
stateStoreInfo: Array[StateStoreMetadataV2],
operatorPropertiesJson: String) extends OperatorStateMetadata {
override def version: Int = 2
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp
def operatorStateMetadata(): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo: Array[StateStoreMetadata] =
val stateStoreInfo =
Array(StateStoreMetadataV1(StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions))
OperatorStateMetadataV1(operatorInfo, stateStoreInfo)
}
Expand Down Expand Up @@ -922,7 +922,7 @@ case class SessionWindowStateStoreSaveExec(
override def operatorStateMetadata(): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo: Array[StateStoreMetadata] = Array(StateStoreMetadataV1(
val stateStoreInfo = Array(StateStoreMetadataV1(
StateStoreId.DEFAULT_STORE_NAME, stateManager.getNumColsForPrefixKey, info.numPartitions))
OperatorStateMetadataV1(operatorInfo, stateStoreInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
StopStream
)

val expectedStateStoreInfo: Array[StateStoreMetadata] = Array(
val expectedStateStoreInfo = Array(
StateStoreMetadataV1("left-keyToNumValues", 0, numShufflePartitions),
StateStoreMetadataV1("left-keyWithIndexToValue", 0, numShufflePartitions),
StateStoreMetadataV1("right-keyToNumValues", 0, numShufflePartitions),
Expand Down