Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
validating state variable creation
  • Loading branch information
ericm-db committed Jun 14, 2024
commit e935292aae4bf0320c9971f10246907d857161bb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class StatefulProcessorHandleImpl(
timeMode: TimeMode,
isStreaming: Boolean = true,
batchTimestampMs: Option[Long] = None,
metrics: Map[String, SQLMetric] = Map.empty)
metrics: Map[String, SQLMetric] = Map.empty,
existingColumnFamilies: Map[String, ColumnFamilySchema] = Map.empty)
extends StatefulProcessorHandle with Logging {
import StatefulProcessorHandleState._

Expand Down Expand Up @@ -132,6 +133,19 @@ class StatefulProcessorHandleImpl(

def getHandleState: StatefulProcessorHandleState = currState

private def verifyStateVariableCreation(columnFamilySchema: ColumnFamilySchema): Unit = {
columnFamilySchema match {
case c1: ColumnFamilySchemaV1 if existingColumnFamilies.contains(c1.columnFamilyName) =>
val existingColumnFamily = existingColumnFamilies(c1.columnFamilyName)
if (existingColumnFamily.json != columnFamilySchema.json) {
throw new RuntimeException(
s"State variable with name ${c1.columnFamilyName} already exists " +
s"with different schema.")
}
case _ =>
}
}

override def getValueState[T](
stateName: String,
valEncoder: Encoder[T]): ValueState[T] = {
Expand All @@ -143,6 +157,7 @@ class StatefulProcessorHandleImpl(
case None =>
stateVariables.add(new StateVariableInfo(stateName, ValueState, false))
val colFamilySchema = ValueStateImpl.columnFamilySchema(stateName)
verifyStateVariableCreation(colFamilySchema)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand All @@ -164,6 +179,7 @@ class StatefulProcessorHandleImpl(
case None =>
stateVariables.add(new StateVariableInfo(stateName, ValueState, true))
val colFamilySchema = ValueStateImplWithTTL.columnFamilySchema(stateName)
verifyStateVariableCreation(colFamilySchema)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand Down Expand Up @@ -254,9 +270,11 @@ class StatefulProcessorHandleImpl(
* @param stateName - name of the state variable
*/
override def deleteIfExists(stateName: String): Unit = {
verifyStateVarOperations("delete_if_exists")
if (store.get.removeColFamilyIfExists(stateName)) {
incrementMetric("numDeletedStateVars")
if (store.isDefined) {
verifyStateVarOperations("delete_if_exists")
if (store.get.removeColFamilyIfExists(stateName)) {
incrementMetric("numDeletedStateVars")
}
}
}

Expand All @@ -269,6 +287,7 @@ class StatefulProcessorHandleImpl(
case None =>
stateVariables.add(new StateVariableInfo(stateName, ListState, false))
val colFamilySchema = ListStateImpl.columnFamilySchema(stateName)
verifyStateVariableCreation(colFamilySchema)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand Down Expand Up @@ -306,6 +325,7 @@ class StatefulProcessorHandleImpl(
case None =>
stateVariables.add(new StateVariableInfo(stateName, ListState, true))
val colFamilySchema = ListStateImplWithTTL.columnFamilySchema(stateName)
verifyStateVariableCreation(colFamilySchema)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand All @@ -323,6 +343,7 @@ class StatefulProcessorHandleImpl(
case None =>
stateVariables.add(new StateVariableInfo(stateName, ValueState, false))
val colFamilySchema = MapStateImpl.columnFamilySchema(stateName)
verifyStateVariableCreation(colFamilySchema)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand All @@ -345,6 +366,7 @@ class StatefulProcessorHandleImpl(
case None =>
stateVariables.add(new StateVariableInfo(stateName, MapState, true))
val colFamilySchema = MapStateImplWithTTL.columnFamilySchema(stateName)
verifyStateVariableCreation(colFamilySchema)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,14 @@ case class TransformWithStateExec(

validateTimeMode()

val existingColumnFamilies = columnFamilySchemas().map {
case c1: ColumnFamilySchemaV1 =>
c1.columnFamilyName -> c1
}.toMap

val driverProcessorHandle = new StatefulProcessorHandleImpl(
None, getStateInfo.queryRunId, keyEncoder, timeMode,
isStreaming, batchTimestampMs, metrics)
isStreaming, batchTimestampMs, metrics, existingColumnFamilies)

driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT)
statefulProcessor.setHandle(driverProcessorHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,13 @@ class TransformWithStateSuite extends StateStoreMetricsTest
),
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("a", "1")),
StopStream
Execute { q =>
val e = intercept[Exception] {
q.processAllAvailable()
}
assert(e.getMessage.contains("State variable with name" +
" countState already exists with different schema"))
}
)
}
}
Expand Down