Skip to content
Closed

init #24

Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9f038aa
[SPARK-50112] Moving Avro files to sql/core so they can be used by Tr…
ericm-db Oct 25, 2024
28c3dbd
moving scala to scala dir
ericm-db Oct 25, 2024
2e33fd1
adding deprecated one
ericm-db Oct 25, 2024
b037859
init
ericm-db Oct 25, 2024
c1db91d
adding enum
ericm-db Oct 25, 2024
a30a29d
feedback and test
ericm-db Oct 25, 2024
2ebf6a8
creating utils class
ericm-db Oct 25, 2024
0559480
micheal feedback
ericm-db Oct 31, 2024
d3845a5
ValueState post-refactor
ericm-db Nov 1, 2024
35b3b0d
multivalue state encoder
ericm-db Nov 1, 2024
dcf0df7
encodeToUnsafeRow avro method
ericm-db Nov 2, 2024
dfc6b1e
using correct val
ericm-db Nov 4, 2024
5b98aa6
comments
ericm-db Nov 4, 2024
0d37ffd
calling encodeUnsafeRow
ericm-db Nov 4, 2024
9a1f825
merge into upstream/master
ericm-db Nov 5, 2024
5c8dd33
Merge remote-tracking branch 'upstream/master' into avro
ericm-db Nov 5, 2024
9b8dd5d
[SPARK-50127] Implement Avro encoding for MapState and PrefixKeyScanS…
ericm-db Nov 7, 2024
448ea76
making schema conversion lazy
ericm-db Nov 7, 2024
386fbf1
batch succeeds
ericm-db Nov 7, 2024
896e24f
actually enabling ttl
ericm-db Nov 7, 2024
15c5f71
including hidden files
ericm-db Nov 7, 2024
1f5e5f7
testWithEncodingTypes
ericm-db Nov 7, 2024
1826d5a
no longer relying on unsaferow
ericm-db Nov 8, 2024
c5ef895
everything but batch works
ericm-db Nov 8, 2024
e22e1a2
splitting it up
ericm-db Nov 8, 2024
730cae0
easy feedback to address
ericm-db Nov 9, 2024
754ce6c
batch works
ericm-db Nov 9, 2024
b6dbfdb
added test suite for non-contiguous ordinals
ericm-db Nov 11, 2024
e6f0b7a
using negative/null val marker
ericm-db Nov 11, 2024
ca660c0
removing log line
ericm-db Nov 11, 2024
41de8ae
getAvroEnc
ericm-db Nov 11, 2024
c49acd2
init
ericm-db Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
easy feedback to address
  • Loading branch information
ericm-db committed Nov 9, 2024
commit 730cae08f5d6e3308a0f56ee6a06cec385b814a8
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ class IncrementalExecution(
}
}

object StateStoreColumnFamilySchemas extends SparkPlanPartialRule {
object StateStoreColumnFamilySchemasRule extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case statefulOp: StatefulOperator =>
statefulOp match {
case transformWithStateExec: TransformWithStateExec =>
transformWithStateExec.copy(
columnFamilySchemas = transformWithStateExec.getColFamilySchemas()
case op: TransformWithStateExec =>
op.copy(
columnFamilySchemas = op.getColFamilySchemas()
)
case _ => statefulOp
}
Expand Down Expand Up @@ -565,7 +565,7 @@ class IncrementalExecution(
// The rule below doesn't change the plan but can cause the side effect that
// metadata/schema is written in the checkpoint directory of stateful operator.
planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule
val planWithStateSchemas = planWithStateOpId transform StateStoreColumnFamilySchemas.rule
val planWithStateSchemas = planWithStateOpId transform StateStoreColumnFamilySchemasRule.rule
simulateWatermarkPropagation(planWithStateSchemas)
planWithStateSchemas transform WatermarkPropagationRule.rule
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,56 +303,6 @@ object StreamingSymmetricHashJoinHelper extends Logging {
}
}

/**
* A custom RDD that allows partitions to be "zipped" together, while ensuring the tasks'
* preferred location is based on which executors have the required join state stores already
* loaded. This class is a variant of [[org.apache.spark.rdd.ZippedPartitionsRDD2]] which only
* changes signature of `f` by taking in a map of column family schemas. This is used for
* passing the column family schemas when there is initial state for the TransformWithStateExec
* operator
*/
class StateStoreAwareZipPartitionsRDDWithSchemas[A: ClassTag, B: ClassTag, V: ClassTag](
sc: SparkContext,
var f: (Int, Iterator[A], Iterator[B], Map[String, StateStoreColFamilySchema]) => Iterator[V],
var rdd1: RDD[A],
var rdd2: RDD[B],
stateInfo: StatefulOperatorStateInfo,
stateStoreNames: Seq[String],
@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
schemas: Map[String, StateStoreColFamilySchema])
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {

/**
* Set the preferred location of each partition using the executor that has the related
* [[StateStoreProvider]] already loaded.
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
stateStoreNames.flatMap { storeName =>
val stateStoreProviderId = StateStoreProviderId(stateInfo, partition.index, storeName)
storeCoordinator.flatMap(_.getLocation(stateStoreProviderId))
}.distinct
}

override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
if (partitions(0).index != partitions(1).index) {
throw new IllegalStateException(s"Partition ID should be same in both side: " +
s"left ${partitions(0).index} , right ${partitions(1).index}")
}

val partitionId = partitions(0).index
f(partitionId, rdd1.iterator(partitions(0), context),
rdd2.iterator(partitions(1), context), schemas)
}

override def clearDependencies(): Unit = {
super.clearDependencies()
rdd1 = null
rdd2 = null
f = null
}
}

implicit class StateStoreAwareZipPartitionsHelper[T: ClassTag](dataRDD: RDD[T]) {
/**
* Function used by `StreamingSymmetricHashJoinExec` to zip together the partitions of two
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,9 @@ class RangeKeyScanStateEncoder(
}

def encodePrefixKeyForRangeScan(
row: UnsafeRow, avroType: Schema): Array[Byte] = {
row: UnsafeRow,
avroType: Schema
): Array[Byte] = {
val record = new GenericData.Record(avroType)
var fieldIdx = 0
rangeScanKeyFieldsWithOrdinal.zipWithIndex.foreach { case (fieldWithOrdinal, idx) =>
Expand Down Expand Up @@ -887,11 +889,11 @@ class RangeKeyScanStateEncoder(
* It uses the first byte of the generated byte array to store the version the describes how the
* row is encoded in the rest of the byte array. Currently, the default version is 0,
*
* If the avroEnc is specified, we are using Avro encoding for this column family's keys
* VERSION 0: [ VERSION (1 byte) | ROW (N bytes) ]
* The bytes of a UnsafeRow is written unmodified to starting from offset 1
* (offset 0 is the version byte of value 0). That is, if the unsafe row has N bytes,
* then the generated array byte will be N+1 bytes.
* If the avroEnc is specified, we are using Avro encoding for this column family's keys
*/
class NoPrefixKeyStateEncoder(
keySchema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,49 +89,6 @@ package object state {
extraOptions,
useMultipleValuesPerKey)
}

/** Map each partition of an RDD along with data in a [[StateStore]] that passes the
* column family schemas to the storeUpdateFunction. Used to pass Avro encoders/decoders
* to executors */
def mapPartitionsWithStateStoreWithSchemas[U: ClassTag](
stateInfo: StatefulOperatorStateInfo,
keySchema: StructType,
valueSchema: StructType,
keyStateEncoderSpec: KeyStateEncoderSpec,
sessionState: SessionState,
storeCoordinator: Option[StateStoreCoordinatorRef],
useColumnFamilies: Boolean = false,
extraOptions: Map[String, String] = Map.empty,
useMultipleValuesPerKey: Boolean = false,
columnFamilySchemas: Map[String, StateStoreColFamilySchema] = Map.empty)(
storeUpdateFunction: (StateStore, Iterator[T], Map[String, StateStoreColFamilySchema]) => Iterator[U]): StateStoreRDD[T, U] = {

val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
val wrappedF = (store: StateStore, iter: Iterator[T]) => {
// Abort the state store in case of error
TaskContext.get().addTaskCompletionListener[Unit](_ => {
if (!store.hasCommitted) store.abort()
})
cleanedF(store, iter, columnFamilySchemas)
}

new StateStoreRDD(
dataRDD,
wrappedF,
stateInfo.checkpointLocation,
stateInfo.queryRunId,
stateInfo.operatorId,
stateInfo.storeVersion,
stateInfo.stateStoreCkptIds,
keySchema,
valueSchema,
keyStateEncoderSpec,
sessionState,
storeCoordinator,
useColumnFamilies,
extraOptions,
useMultipleValuesPerKey)
}
// scalastyle:on

/** Map each partition of an RDD along with data in a [[ReadStateStore]]. */
Expand Down
Loading