Skip to content
Closed

init #24

Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9f038aa
[SPARK-50112] Moving Avro files to sql/core so they can be used by Tr…
ericm-db Oct 25, 2024
28c3dbd
moving scala to scala dir
ericm-db Oct 25, 2024
2e33fd1
adding deprecated one
ericm-db Oct 25, 2024
b037859
init
ericm-db Oct 25, 2024
c1db91d
adding enum
ericm-db Oct 25, 2024
a30a29d
feedback and test
ericm-db Oct 25, 2024
2ebf6a8
creating utils class
ericm-db Oct 25, 2024
0559480
micheal feedback
ericm-db Oct 31, 2024
d3845a5
ValueState post-refactor
ericm-db Nov 1, 2024
35b3b0d
multivalue state encoder
ericm-db Nov 1, 2024
dcf0df7
encodeToUnsafeRow avro method
ericm-db Nov 2, 2024
dfc6b1e
using correct val
ericm-db Nov 4, 2024
5b98aa6
comments
ericm-db Nov 4, 2024
0d37ffd
calling encodeUnsafeRow
ericm-db Nov 4, 2024
9a1f825
merge into upstream/master
ericm-db Nov 5, 2024
5c8dd33
Merge remote-tracking branch 'upstream/master' into avro
ericm-db Nov 5, 2024
9b8dd5d
[SPARK-50127] Implement Avro encoding for MapState and PrefixKeyScanS…
ericm-db Nov 7, 2024
448ea76
making schema conversion lazy
ericm-db Nov 7, 2024
386fbf1
batch succeeds
ericm-db Nov 7, 2024
896e24f
actually enabling ttl
ericm-db Nov 7, 2024
15c5f71
including hidden files
ericm-db Nov 7, 2024
1f5e5f7
testWithEncodingTypes
ericm-db Nov 7, 2024
1826d5a
no longer relying on unsaferow
ericm-db Nov 8, 2024
c5ef895
everything but batch works
ericm-db Nov 8, 2024
e22e1a2
splitting it up
ericm-db Nov 8, 2024
730cae0
easy feedback to address
ericm-db Nov 9, 2024
754ce6c
batch works
ericm-db Nov 9, 2024
b6dbfdb
added test suite for non-contiguous ordinals
ericm-db Nov 11, 2024
e6f0b7a
using negative/null val marker
ericm-db Nov 11, 2024
ca660c0
removing log line
ericm-db Nov 11, 2024
41de8ae
getAvroEnc
ericm-db Nov 11, 2024
c49acd2
init
ericm-db Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
comments
  • Loading branch information
ericm-db committed Nov 4, 2024
commit 5b98aa68937a600bb276c0774db0637c78684198
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.sql.types.StructType
* @param keyExprEnc - Spark SQL encoder for key
* @param valEncoder - Spark SQL encoder for value
* @param metrics - metrics to be updated as part of stateful processing
* @param avroEnc - optional Avro serializer and deserializer for this state variable that
* is used by the StateStore to encode state in Avro format
* @tparam S - data type of object that will be stored in the list
*/
class ListStateImpl[S](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,18 @@ object StateStoreColumnFamilySchemaUtils {
new StateStoreColumnFamilySchemaUtils(initializeAvroSerde)
}

/**
*
* @param initializeAvroSerde Whether or not to create the Avro serializers and deserializers
* for this state type. This class is used to create the
* StateStoreColumnFamilySchema for each state variable from the driver
*/
class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean) {

/**
* If initializeAvroSerde is true, this method will create an Avro Serializer and Deserializer
* for a particular key and value schema.
*/
private def getAvroSerde(
keySchema: StructType, valSchema: StructType): Option[AvroEncoderSpec] = {
if (initializeAvroSerde) {
Expand Down Expand Up @@ -87,14 +97,12 @@ class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean) {
valEncoder: Encoder[V],
hasTtl: Boolean): StateStoreColFamilySchema = {
val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema)
val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl)
StateStoreColFamilySchema(
stateName,
compositeKeySchema,
getValueSchemaWithTTL(valEncoder.schema, hasTtl),
Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
Some(userKeyEnc.schema),
avroEnc = getAvroSerde(compositeKeySchema, valSchema))
Some(userKeyEnc.schema))
}

def getTimerStateSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class QueryInfoImpl(
* @param isStreaming - defines whether the query is streaming or batch
* @param batchTimestampMs - timestamp for the current batch if available
* @param metrics - metrics to be updated as part of stateful processing
* @param schemas - StateStoreColumnFamilySchemas that include Avro serializers and deserializers
* for each state variable, if Avro encoding is enabled for this query
*/
class StatefulProcessorHandleImpl(
store: StateStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.sql.streaming.ValueState
* @param keyExprEnc - Spark SQL encoder for key
* @param valEncoder - Spark SQL encoder for value
* @param metrics - metrics to be updated as part of stateful processing
* @param avroEnc - optional Avro serializer and deserializer for this state variable that
* is used by the StateStore to encode state in Avro format
* @tparam S - data type of object that will be stored
*/
class ValueStateImpl[S](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,22 @@ object RocksDBStateEncoder {
encodedBytes
}

/**
* This method takes an UnsafeRow, and serializes to a byte array using Avro encoding.
*/
def encodeUnsafeRow(
row: UnsafeRow,
avroSerializer: AvroSerializer,
valueAvroType: Schema,
out: ByteArrayOutputStream): Array[Byte] = {
// InternalRow -> Avro.GenericDataRecord
val avroData =
avroSerializer.serialize(row) // InternalRow -> GenericDataRecord
avroSerializer.serialize(row)
out.reset()
val encoder = EncoderFactory.get().directBinaryEncoder(out, null)
val writer = new GenericDatumWriter[Any](
valueAvroType) // Defining Avro writer for this struct type
writer.write(avroData, encoder) // GenericDataRecord -> bytes
writer.write(avroData, encoder) // Avro.GenericDataRecord -> byte array
encoder.flush()
out.toByteArray
}
Expand All @@ -180,17 +184,23 @@ object RocksDBStateEncoder {
}
}


/**
* This method takes a byte array written using Avro encoding, and
* deserializes to an UnsafeRow using the Avro deserializer
*/
def decodeToUnsafeRow(
valueBytes: Array[Byte],
avroDeserializer: AvroDeserializer,
valueAvroType: Schema,
valueProj: UnsafeProjection): UnsafeRow = {
val reader = new GenericDatumReader[Any](valueAvroType)
val decoder = DecoderFactory.get().binaryDecoder(valueBytes, 0, valueBytes.length, null)
val genericData = reader.read(null, decoder) // bytes -> GenericDataRecord
// bytes -> Avro.GenericDataRecord
val genericData = reader.read(null, decoder)
// Avro.GenericDataRecord -> InternalRow
val internalRow = avroDeserializer.deserialize(
genericData).orNull.asInstanceOf[InternalRow]
// InternalRow -> UnsafeRow
valueProj.apply(internalRow)
}

Expand All @@ -214,6 +224,8 @@ object RocksDBStateEncoder {
* @param keySchema - schema of the key to be encoded
* @param numColsPrefixKey - number of columns to be used for prefix key
* @param useColumnFamilies - if column family is enabled for this encoder
* @param avroEnc - if Avro encoding is specified for this StateEncoder, this encoder will
* be defined
*/
class PrefixKeyScanStateEncoder(
keySchema: StructType,
Expand Down Expand Up @@ -308,7 +320,6 @@ class PrefixKeyScanStateEncoder(
}

override def supportPrefixKeyScan: Boolean = true

}

/**
Expand Down Expand Up @@ -341,6 +352,8 @@ class PrefixKeyScanStateEncoder(
* @param keySchema - schema of the key to be encoded
* @param orderingOrdinals - the ordinals for which the range scan is constructed
* @param useColumnFamilies - if column family is enabled for this encoder
* @param avroEnc - if Avro encoding is specified for this StateEncoder, this encoder will
* be defined
*/
class RangeKeyScanStateEncoder(
keySchema: StructType,
Expand Down Expand Up @@ -700,6 +713,7 @@ class RangeKeyScanStateEncoder(
* The bytes of a UnsafeRow is written unmodified to starting from offset 1
* (offset 0 is the version byte of value 0). That is, if the unsafe row has N bytes,
* then the generated array byte will be N+1 bytes.
* If the avroEnc is specified, we are using Avro encoding for this column family's keys
*/
class NoPrefixKeyStateEncoder(
keySchema: StructType,
Expand All @@ -711,6 +725,7 @@ class NoPrefixKeyStateEncoder(
import RocksDBStateEncoder._

// Reusable objects
private val usingAvroEncoding = avroEnc.isDefined
private val keyRow = new UnsafeRow(keySchema.size)
private val keyAvroType = SchemaConverters.toAvroType(keySchema)

Expand All @@ -720,7 +735,7 @@ class NoPrefixKeyStateEncoder(
} else {
// If avroEnc is defined, we know that we need to use Avro to
// encode this UnsafeRow to Avro bytes
val bytesToEncode = if (avroEnc.isDefined) {
val bytesToEncode = if (usingAvroEncoding) {
val avroData = avroEnc.get.keySerializer.serialize(row)
out.reset()
val encoder = EncoderFactory.get().directBinaryEncoder(out, null)
Expand Down Expand Up @@ -782,6 +797,7 @@ class NoPrefixKeyStateEncoder(
* This encoder supports RocksDB StringAppendOperator merge operator. Values encoded can be
* merged in RocksDB using merge operation, and all merged values can be read using decodeValues
* operation.
* If the avroEnc is specified, we are using Avro encoding for this column family's values
*/
class MultiValuedStateEncoder(
valueSchema: StructType,
Expand All @@ -790,14 +806,15 @@ class MultiValuedStateEncoder(

import RocksDBStateEncoder._

private val usingAvroEncoding = avroEnc.isDefined
// Reusable objects
private val out = new ByteArrayOutputStream
private val valueRow = new UnsafeRow(valueSchema.size)
private val valueAvroType = SchemaConverters.toAvroType(valueSchema)
private val valueProj = UnsafeProjection.create(valueSchema)

override def encodeValue(row: UnsafeRow): Array[Byte] = {
val bytes = if (avroEnc.isDefined) {
val bytes = if (usingAvroEncoding) {
encodeUnsafeRow(row, avroEnc.get.valueSerializer, valueAvroType, out)
} else {
encodeUnsafeRow(row)
Expand All @@ -820,7 +837,7 @@ class MultiValuedStateEncoder(
val encodedValue = new Array[Byte](numBytes)
Platform.copyMemory(valueBytes, java.lang.Integer.BYTES + Platform.BYTE_ARRAY_OFFSET,
encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes)
if (avroEnc.isDefined) {
if (usingAvroEncoding) {
decodeToUnsafeRow(
encodedValue, avroEnc.get.valueDeserializer, valueAvroType, valueProj)
} else {
Expand Down Expand Up @@ -851,7 +868,7 @@ class MultiValuedStateEncoder(

pos += numBytes
pos += 1 // eat the delimiter character
if (avroEnc.isDefined) {
if (usingAvroEncoding) {
decodeToUnsafeRow(
encodedValue, avroEnc.get.valueDeserializer, valueAvroType, valueProj)
} else {
Expand All @@ -876,6 +893,7 @@ class MultiValuedStateEncoder(
* The bytes of a UnsafeRow is written unmodified to starting from offset 1
* (offset 0 is the version byte of value 0). That is, if the unsafe row has N bytes,
* then the generated array byte will be N+1 bytes.
* If the avroEnc is specified, we are using Avro encoding for this column family's values
*/
class SingleValueStateEncoder(
valueSchema: StructType,
Expand All @@ -884,14 +902,15 @@ class SingleValueStateEncoder(

import RocksDBStateEncoder._

private val usingAvroEncoding = avroEnc.isDefined
// Reusable objects
private val out = new ByteArrayOutputStream
private val valueRow = new UnsafeRow(valueSchema.size)
private val valueAvroType = SchemaConverters.toAvroType(valueSchema)
private val valueProj = UnsafeProjection.create(valueSchema)

override def encodeValue(row: UnsafeRow): Array[Byte] = {
if (avroEnc.isDefined) {
if (usingAvroEncoding) {
encodeUnsafeRow(row, avroEnc.get.valueSerializer, valueAvroType, out)
} else {
encodeUnsafeRow(row)
Expand All @@ -908,7 +927,7 @@ class SingleValueStateEncoder(
if (valueBytes == null) {
return null
}
if (avroEnc.isDefined) {
if (usingAvroEncoding) {
decodeToUnsafeRow(
valueBytes, avroEnc.get.valueDeserializer, valueAvroType, valueProj)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ case class StateSchemaValidationResult(
schemaPath: String
)

// Avro encoder that is used by the RocksDBStateStoreProvider and RocksDBStateEncoder
// in order to serialize from UnsafeRow to a byte array of Avro encoding.
case class AvroEncoderSpec(
keySerializer: AvroSerializer,
keyDeserializer: AvroDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ trait StateStore extends ReadStateStore {

/**
* Create column family with given name, if absent.
*
* If Avro encoding is enabled for this query, we expect the avroEncoderSpec to
* be defined so that the Key and Value StateEncoders will use this.
* @return column family ID
*/
def createColFamilyIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,12 @@ class TransformWithListStateSuite extends StreamTest
AddData(inputData, InputRow("k5", "append", "v4")),
AddData(inputData, InputRow("k5", "put", "v5,v6")),
AddData(inputData, InputRow("k5", "emitAllInState", "")),
CheckNewAnswer(("k5", "v5"), ("k5", "v6"))
// TODO: Uncomment once we have implemented ListStateMetrics for Avro encoding
// Execute { q =>
// assert(q.lastProgress.stateOperators(0).customMetrics.get("numListStateVars") > 0)
// assert(q.lastProgress.stateOperators(0).numRowsUpdated === 2)
// assert(q.lastProgress.stateOperators(0).numRowsRemoved === 2)
// }
CheckNewAnswer(("k5", "v5"), ("k5", "v6")),
Execute { q =>
assert(q.lastProgress.stateOperators(0).customMetrics.get("numListStateVars") > 0)
assert(q.lastProgress.stateOperators(0).numRowsUpdated === 2)
assert(q.lastProgress.stateOperators(0).numRowsRemoved === 2)
}
)
}
}
Expand Down
Loading