Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added test case and error msg
  • Loading branch information
ericm-db committed Jul 16, 2024
commit bc24c1fa3c5a5bce8a198e2fe6e0808610e808eb
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3804,6 +3804,12 @@
],
"sqlState" : "42802"
},
"STATE_STORE_INVALID_CONFIG_AFTER_RESTART" : {
"message" : [
"<configName> <oldConfig> is not equal to <newConfig>. Please set <configName> to <oldConfig>, or restart with a new checkpoint directory."
],
"sqlState" : "42K06"
},
"STATE_STORE_INVALID_PROVIDER" : {
"message" : [
"The given State Store Provider <inputClass> does not extend org.apache.spark.sql.execution.streaming.state.StateStoreProvider."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ class IncrementalExecution(
checkpointLocation, tws.getStateInfo.operatorId.toString))
val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession,
metadataPath.toString)
// check if old metadata is present. if it is, validate with this metadata
operatorStateMetadataLog.getLatest() match {
case Some((_, oldMetadata)) =>
tws.validateMetadatas(oldMetadata, metadata)
case None =>
}
operatorStateMetadataLog.add(currentBatchId, metadata)
case _ =>
val metadataWriter = new OperatorStateMetadataWriter(new Path(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import java.util.concurrent.TimeUnit.NANOSECONDS

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.json4s.{DefaultFormats, JString}
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.JString
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -431,10 +432,39 @@ case class TransformWithStateExec(
}
}

private def validateMetadatas(
def validateMetadatas(
oldMetadata: OperatorStateMetadata,
newMetadata: OperatorStateMetadata): Unit = {

// if both metadatas are instance of OperatorStateMetadatV2
(oldMetadata, newMetadata) match {
case (oldMetadataV2: OperatorStateMetadataV2,
newMetadataV2: OperatorStateMetadataV2) =>
val oldJsonString = oldMetadataV2.operatorPropertiesJson
val newJsonString = newMetadataV2.operatorPropertiesJson
// verify that timeMode, outputMode are the same
implicit val formats: DefaultFormats.type = DefaultFormats
val oldJsonProps = JsonMethods.parse(oldJsonString).extract[Map[String, Any]]
val newJsonProps = JsonMethods.parse(newJsonString).extract[Map[String, Any]]
val oldTimeMode = oldJsonProps("timeMode").asInstanceOf[String]
val oldOutputMode = oldJsonProps("outputMode").asInstanceOf[String]
val newTimeMode = newJsonProps("timeMode").asInstanceOf[String]
val newOutputMode = newJsonProps("outputMode").asInstanceOf[String]
if (oldTimeMode != newTimeMode) {
throw new StateStoreInvalidConfigAfterRestart(
"timeMode",
oldTimeMode,
newTimeMode
)
}
if (oldOutputMode != newOutputMode) {
throw new StateStoreInvalidConfigAfterRestart(
"outputMode",
oldOutputMode,
newOutputMode
)
}
case (_, _) =>
}
}

/** Metadata of this stateful operator and its states stores. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,23 @@ object StateStoreErrors {
StateStoreProviderDoesNotSupportFineGrainedReplay = {
new StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass)
}

def invalidConfigChangedAfterRestart(configName: String, oldConfig: String, newConfig: String):
StateStoreInvalidConfigAfterRestart = {
new StateStoreInvalidConfigAfterRestart(configName, oldConfig, newConfig)
}
}

class StateStoreInvalidConfigAfterRestart(configName: String, oldConfig: String, newConfig: String)
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_INVALID_CONFIG_AFTER_RESTART",
messageParameters = Map(
"configName" -> configName,
"oldConfig" -> oldConfig,
"newConfig" -> newConfig
)
)

class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: String)
extends SparkUnsupportedOperationException(
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.{Dataset, Encoders, Row}
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA}
import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, OperatorInfoV1, OperatorStateMetadataV2, POJOTestClass, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StatefulProcessorCannotPerformOperationWithInvalidHandleState, StateSchemaV3File, StateStoreMetadataV2, StateStoreMultipleColumnFamiliesNotSupportedException, StateStoreValueSchemaNotCompatible, TestClass}
import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, OperatorInfoV1, OperatorStateMetadataV2, POJOTestClass, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StatefulProcessorCannotPerformOperationWithInvalidHandleState, StateSchemaV3File, StateStoreInvalidConfigAfterRestart, StateStoreMetadataV2, StateStoreMultipleColumnFamiliesNotSupportedException, StateStoreValueSchemaNotCompatible, TestClass}
import org.apache.spark.sql.functions.timestamp_seconds
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
Expand Down Expand Up @@ -1175,6 +1175,42 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

test("test that different timeMode, outputMode after query restart fails") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
withTempDir { checkpointDir =>
val inputData = MemoryStream[String]
val result1 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())

testStream(result1, OutputMode.Update())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(("a", "1")),
StopStream
)
val result2 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
TimeMode.None(),
OutputMode.Append())
testStream(result2, OutputMode.Update())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
ExpectFailure[StateStoreInvalidConfigAfterRestart] { t =>
assert(t.getMessage.contains("outputMode"))
assert(t.getMessage.contains("is not equal"))
}
)
}
}
}

test("transformWithState - verify StateSchemaV3 writes correct SQL schema of key/value") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
Expand Down