Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
adding todo
  • Loading branch information
ericm-db committed Jul 9, 2024
commit 58e194788a8c4a91ce2059ce33c9dabfcfd4672f
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ class IncrementalExecution(
val metadata = stateStoreWriter.operatorStateMetadata()
stateStoreWriter match {
case tws: TransformWithStateExec =>
logError(s"### checkpointLocation: $checkpointLocation")
val metadataPath = OperatorStateMetadataV2.metadataFilePath(new Path(
checkpointLocation, tws.getStateInfo.operatorId.toString))
val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ class OperatorStateMetadataLog(
case 1 =>
OperatorStateMetadataV1.serialize(fsDataOutputStream, metadata)
case 2 =>
logError(s"### stateSchemaPath: ${metadata.asInstanceOf[OperatorStateMetadataV2].
stateStoreInfo.head.stateSchemaFilePath}")
OperatorStateMetadataV2.serialize(fsDataOutputStream, metadata)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,32 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S
}
}

class RunningCountStatefulProcessorInt extends StatefulProcessor[String, String, (String, String)]
with Logging {
@transient protected var _countState: ValueState[Int] = _

override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
_countState = getHandle.getValueState[Int]("countState", Encoders.scalaInt)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues,
expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String)] = {
val count = _countState.getOption().getOrElse(0) + 1
if (count == 3) {
_countState.clear()
Iterator.empty
} else {
_countState.update(count)
Iterator((key, count.toString))
}
}
}

// Class to verify stateful processor usage with adding processing time timers
class RunningCountStatefulProcessorWithProcTimeTimer extends RunningCountStatefulProcessor {
private def handleProcessingTimeBasedTimers(
Expand Down Expand Up @@ -1054,6 +1080,42 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}
}

// TODO: Enable this test and expect error to be thrown when
// github.com/apache/spark/pull/47257 is merged
ignore("test that invalid schema evolution fails query for column family") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
withTempDir { checkpointDir =>
val inputData = MemoryStream[String]
val result1 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())

testStream(result1, OutputMode.Update())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(("a", "1")),
StopStream
)
val result2 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessorInt(),
TimeMode.None(),
OutputMode.Update())
testStream(result2, OutputMode.Update())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(("a", "2")),
StopStream
)
}
}
}
}

class TransformWithStateValidationSuite extends StateStoreMetricsTest {
Expand Down