Skip to content
Closed

init #24

Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9f038aa
[SPARK-50112] Moving Avro files to sql/core so they can be used by Tr…
ericm-db Oct 25, 2024
28c3dbd
moving scala to scala dir
ericm-db Oct 25, 2024
2e33fd1
adding deprecated one
ericm-db Oct 25, 2024
b037859
init
ericm-db Oct 25, 2024
c1db91d
adding enum
ericm-db Oct 25, 2024
a30a29d
feedback and test
ericm-db Oct 25, 2024
2ebf6a8
creating utils class
ericm-db Oct 25, 2024
0559480
micheal feedback
ericm-db Oct 31, 2024
d3845a5
ValueState post-refactor
ericm-db Nov 1, 2024
35b3b0d
multivalue state encoder
ericm-db Nov 1, 2024
dcf0df7
encodeToUnsafeRow avro method
ericm-db Nov 2, 2024
dfc6b1e
using correct val
ericm-db Nov 4, 2024
5b98aa6
comments
ericm-db Nov 4, 2024
0d37ffd
calling encodeUnsafeRow
ericm-db Nov 4, 2024
9a1f825
merge into upstream/master
ericm-db Nov 5, 2024
5c8dd33
Merge remote-tracking branch 'upstream/master' into avro
ericm-db Nov 5, 2024
9b8dd5d
[SPARK-50127] Implement Avro encoding for MapState and PrefixKeyScanS…
ericm-db Nov 7, 2024
448ea76
making schema conversion lazy
ericm-db Nov 7, 2024
386fbf1
batch succeeds
ericm-db Nov 7, 2024
896e24f
actually enabling ttl
ericm-db Nov 7, 2024
15c5f71
including hidden files
ericm-db Nov 7, 2024
1f5e5f7
testWithEncodingTypes
ericm-db Nov 7, 2024
1826d5a
no longer relying on unsaferow
ericm-db Nov 8, 2024
c5ef895
everything but batch works
ericm-db Nov 8, 2024
e22e1a2
splitting it up
ericm-db Nov 8, 2024
730cae0
easy feedback to address
ericm-db Nov 9, 2024
754ce6c
batch works
ericm-db Nov 9, 2024
b6dbfdb
added test suite for non-contiguous ordinals
ericm-db Nov 11, 2024
e6f0b7a
using negative/null val marker
ericm-db Nov 11, 2024
ca660c0
removing log line
ericm-db Nov 11, 2024
41de8ae
getAvroEnc
ericm-db Nov 11, 2024
c49acd2
init
ericm-db Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
micheal feedback
  • Loading branch information
ericm-db committed Oct 31, 2024
commit 0559480044fa9f9fcb24c37971268b339adbe61c
Original file line number Diff line number Diff line change
Expand Up @@ -2211,7 +2211,7 @@ object SQLConf {
.version("4.0.0")
.stringConf
.checkValue(v => Set("UnsafeRow", "Avro").contains(v),
"Valid versions are 'UnsafeRow' and 'Avro'")
"Valid values are 'UnsafeRow' and 'Avro'")
.createWithDefault("UnsafeRow")

// The feature is still in development, so it is still internal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.types.StructType
* @param stateName - name of logical state partition
* @param keyExprEnc - Spark SQL encoder for key
* @param valEncoder - Spark SQL encoder for value
* @param avroEnc: Optional Avro encoder and decoder to convert between S and Avro row
* @param metrics - metrics to be updated as part of stateful processing
* @tparam S - data type of object that will be stored in the list
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.util.NextIterator
* @param valEncoder - Spark SQL encoder for value
* @param ttlConfig - TTL configuration for values stored in this state
* @param batchTimestampMs - current batch processing timestamp.
* @param avroEnc: Optional Avro encoder and decoder to convert between S and Avro row
* @param metrics - metrics to be updated as part of stateful processing
* @tparam S - data type of object that will be stored
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.types.StructType
* @param stateName - name of logical state partition
* @param keyExprEnc - Spark SQL encoder for key
* @param valEncoder - Spark SQL encoder for value
* @param avroEnc: Optional Avro encoder and decoder to convert between S and Avro row
* @param metrics - metrics to be updated as part of stateful processing
* @tparam K - type of key for map state variable
* @tparam V - type of value for map state variable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.util.NextIterator
* @param valEncoder - SQL encoder for state variable
* @param ttlConfig - the ttl configuration (time to live duration etc.)
* @param batchTimestampMs - current batch processing timestamp.
* @param avroEnc: Optional Avro encoder and decoder to convert between S and Avro row
* @param metrics - metrics to be updated as part of stateful processing
* @tparam K - type of key for map state variable
* @tparam V - type of value for map state variable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ object TransformWithStateKeyValueRowSchemaUtils {
}
}

/**
* A trait that defines encoding and decoding operations for state types in Structured Streaming.
* This encoder handles the conversion between user-defined types and storage types for both
* state keys and values, with optional TTL (Time-To-Live) support.
*
* @tparam V The user-defined value type to be stored in state
* @tparam S The storage type used to represent the state (e.g., UnsafeRow or Array[Byte])
*/
trait StateTypesEncoder[V, S] {
def encodeGroupingKey(): S
def encodeValue(value: V): S
Expand All @@ -78,7 +86,7 @@ trait StateTypesEncoder[V, S] {
* Helper class providing APIs to encode the grouping key, and user provided values
* to Spark [[UnsafeRow]].
*
* CAUTION: StateTypesEncoder class instance is *not* thread-safe.
* CAUTION: UnsafeRowTypesEncoder class instance is *not* thread-safe.
* This class reuses the keyProjection and valueProjection for encoding grouping
* key and state value respectively. As UnsafeProjection is not thread safe, this
* class is also not thread safe.
Expand Down Expand Up @@ -171,16 +179,13 @@ object UnsafeRowTypesEncoder {

/**
* Helper class providing APIs to encode the grouping key, and user provided values
* to Spark [[UnsafeRow]].
*
* CAUTION: StateTypesEncoder class instance is *not* thread-safe.
* This class reuses the keyProjection and valueProjection for encoding grouping
* key and state value respectively. As UnsafeProjection is not thread safe, this
* class is also not thread safe.
* to an Avro Byte Array.
*
* @param keyEncoder - SQL encoder for the grouping key, key type is implicit
* @param valEncoder - SQL encoder for value of type `S`
* @param stateName - name of logical state partition
* @param hasTtl - whether or not TTL is enabled for this state variable
* @param avroEnc = Avro encoder that should be specified to encode keys and values to byte arrays
* @tparam V - value type
*/
class AvroTypesEncoder[V](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,8 @@ object TransformWithStateExec {
queryRunId = UUID.randomUUID(),
operatorId = 0,
storeVersion = 0,
numPartitions = shufflePartitions
numPartitions = shufflePartitions,
stateStoreCkptIds = None
)

new TransformWithStateExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.streaming.ValueState
* @param keyExprEnc - Spark SQL encoder for key
* @param valEncoder - Spark SQL encoder for value
* @param metrics - metrics to be updated as part of stateful processing
* @param avroEnc: Optional Avro encoder and decoder to convert between S and Avro row
* @tparam S - data type of object that will be stored
*/
class ValueStateImpl[S](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.streaming.{TTLConfig, ValueState}
* @param valEncoder - Spark SQL encoder for value
* @param ttlConfig - TTL configuration for values stored in this state
* @param batchTimestampMs - current batch processing timestamp.
* @param avroEnc: Optional Avro encoder and decoder to convert between S and Avro row
* @param metrics - metrics to be updated as part of stateful processing
* @tparam S - data type of object that will be stored
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", "HDFSStateStore")
}


override def get(key: Array[Byte], colFamilyName: String): Array[Byte] = {
throw StateStoreErrors.unsupportedOperationException("Byte array method", "HDFSStateStore")
}
Expand Down
Loading