Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tests pass
  • Loading branch information
ericm-db committed Jun 14, 2024
commit 95d5b2a862e14da493e66589510de7201752e744
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSch
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors}
import org.apache.spark.sql.streaming.ListState

object ListStateImpl {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA,
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA),
true)
}
}
/**
* Provides concrete implementation for list of values associated with a state variable
* used in the streaming transformWithState operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoP
import org.apache.spark.sql.streaming.{ListState, TTLConfig}
import org.apache.spark.util.NextIterator

object ListStateImplWithTTL {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA_WITH_TTL,
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA),
true)
}
}
/**
* Class that provides a concrete implementation for a list state state associated with state
* variables (with ttl expiration support) used in the streaming transformWithState operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSch
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors, UnsafeRowPair}
import org.apache.spark.sql.streaming.MapState

object MapStateImpl {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
COMPOSITE_KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA,
PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false)
}
}

class MapStateImpl[K, V](
store: StateStore,
stateName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, Pre
import org.apache.spark.sql.streaming.{MapState, TTLConfig}
import org.apache.spark.util.NextIterator

object MapStateImplWithTTL {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
COMPOSITE_KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA_WITH_TTL,
PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false)
}
}
/**
* Class that provides a concrete implementation for map state associated with state
* variables (with ttl expiration support) used in the streaming transformWithState operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class StatefulProcessorHandleImpl(
new ValueStateImpl[T](store, stateName, keyEncoder, valEncoder)
case None =>
stateVariables.add(new StateVariableInfo(stateName, ValueState, false))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = ValueStateImpl.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand All @@ -163,7 +163,7 @@ class StatefulProcessorHandleImpl(
valueStateWithTTL
case None =>
stateVariables.add(new StateVariableInfo(stateName, ValueState, true))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = ValueStateImplWithTTL.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand Down Expand Up @@ -268,6 +268,8 @@ class StatefulProcessorHandleImpl(
new ListStateImpl[T](store, stateName, keyEncoder, valEncoder)
case None =>
stateVariables.add(new StateVariableInfo(stateName, ListState, false))
val colFamilySchema = ListStateImpl.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
}
Expand Down Expand Up @@ -303,7 +305,7 @@ class StatefulProcessorHandleImpl(
listStateWithTTL
case None =>
stateVariables.add(new StateVariableInfo(stateName, ListState, true))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = ListStateImplWithTTL.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand All @@ -320,7 +322,7 @@ class StatefulProcessorHandleImpl(
new MapStateImpl[K, V](store, stateName, keyEncoder, userKeyEnc, valEncoder)
case None =>
stateVariables.add(new StateVariableInfo(stateName, ValueState, false))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = MapStateImpl.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand All @@ -342,7 +344,7 @@ class StatefulProcessorHandleImpl(
mapStateWithTTL
case None =>
stateVariables.add(new StateVariableInfo(stateName, MapState, true))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = MapStateImplWithTTL.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSch
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore}
import org.apache.spark.sql.streaming.{TTLConfig, ValueState}

object ValueStateImplWithTTL {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA_WITH_TTL,
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA),
false)
}
}

/**
* Class that provides a concrete implementation for a single value state associated with state
* variables (with ttl expiration support) used in the streaming transformWithState operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JValue
import org.json4s.jackson.Serialization

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil}
import org.apache.spark.util.AccumulatorV2

/**
* Metadata for a state store instance.
Expand Down