Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import org.apache.spark.sql.execution.streaming.state.StateMessage
import org.apache.spark.sql.execution.streaming.state.StateMessage.{AppendList, AppendValue, Clear, ContainsKey, DeleteTimer, Exists, ExpiryTimerRequest, Get, GetProcessingTime, GetValue, GetWatermark, HandleState, Keys, ListStateCall, ListStateGet, ListStatePut, ListTimers, MapStateCall, ParseStringSchema, RegisterTimer, RemoveKey, SetHandleState, StateCallCommand, StatefulProcessorCall, TimerRequest, TimerStateCallCommand, TimerValueRequest, UpdateValue, UtilsRequest, Values, ValueStateCall, ValueStateUpdate}
import org.apache.spark.sql.streaming.{ListState, MapState, TTLConfig, ValueState}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.tags.SlowSQLTest

@SlowSQLTest
class TransformWithStateInPySparkStateServerSuite extends SparkFunSuite with BeforeAndAfterEach {
val stateName = "test"
val iteratorId = "testId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithEncodingTypes, AlsoTestWithRocksDBFeatures, RocksDBStateStoreProvider}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.tags.SlowSQLTest

case class InputRow(key: String, action: String, value: String)

Expand Down Expand Up @@ -221,6 +222,7 @@ class ToggleSaveAndEmitProcessor
}
}

@SlowSQLTest
class TransformWithListStateSuite extends StreamTest
with AlsoTestWithRocksDBFeatures with AlsoTestWithEncodingTypes {
import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.streaming.{ListStateImplWithTTL, MemoryStr
import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.tags.SlowSQLTest

// MultiStatefulVariableTTLProcessor is a StatefulProcessor that consumes a stream of
// strings and returns a stream of <string, count> pairs.
Expand Down Expand Up @@ -164,6 +165,7 @@ class ListStateTTLProcessor(ttlConfig: TTLConfig)
* Test suite for testing list state with TTL.
* We use the base TTL suite with a list state processor.
*/
@SlowSQLTest
class TransformWithListStateTTLSuite extends TransformWithStateTTLTest
with StateStoreMetricsTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithEncodingTypes, AlsoTestWithRocksDBFeatures, RocksDBStateStoreProvider}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.tags.SlowSQLTest

case class InputMapRow(key: String, action: String, value: (String, String))

Expand Down Expand Up @@ -138,6 +139,7 @@ class EvolvedMapStateProcessor extends StatefulProcessor[String, String, (String
* Class that adds integration tests for MapState types used in arbitrary stateful
* operators such as transformWithState.
*/
@SlowSQLTest
class TransformWithMapStateSuite extends StreamTest
with AlsoTestWithEncodingTypes
with AlsoTestWithRocksDBFeatures {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.streaming.{MapStateImplWithTTL, MemoryStre
import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.tags.SlowSQLTest

class MapStateSingleKeyTTLProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, InputEvent, OutputEvent] {
Expand Down Expand Up @@ -173,6 +174,7 @@ class MapStateTTLProcessor(ttlConfig: TTLConfig)
}
}

@SlowSQLTest
class TransformWithMapStateTTLSuite extends TransformWithStateTTLTest {

import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV2,
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.SlowSQLTest

@SlowSQLTest
class TransformWithStateAvroSuite extends TransformWithStateSuite {

import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution}
import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithEncodingTypes, AlsoTestWithRocksDBFeatures, RocksDBStateStoreProvider}
import org.apache.spark.sql.functions.window
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.tags.SlowSQLTest

case class InputEventRow(
key: String,
Expand Down Expand Up @@ -103,6 +104,7 @@ case class AggEventRow(
window: Window,
count: Long)

@SlowSQLTest
class TransformWithStateChainingSuite extends StreamTest
with AlsoTestWithEncodingTypes
with AlsoTestWithRocksDBFeatures {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.LocalSparkSession.withSparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.tags.SlowSQLTest

case class FruitState(
name: String,
Expand Down Expand Up @@ -132,6 +133,7 @@ trait TransformWithStateClusterSuiteBase extends SparkFunSuite {
* Test suite spawning local cluster with multiple executors to test serde of stateful
* processors along with use of implicit encoders, if applicable in transformWithState operator.
*/
@SlowSQLTest
class TransformWithStateClusterSuite extends StreamTest with TransformWithStateClusterSuiteBase {
testWithAndWithoutImplicitEncoders("streaming with transformWithState - " +
"without initial state") { (spark, useImplicits) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithEncodingTypes
import org.apache.spark.sql.functions.{col, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.tags.SlowSQLTest

case class InitInputRow(key: String, action: String, value: Double)
case class InputRowForInitialState(
Expand Down Expand Up @@ -360,6 +361,7 @@ class StatefulProcessorWithInitialStateEventTimerClass
* Class that adds tests for transformWithState stateful
* streaming operator with user-defined initial state
*/
@SlowSQLTest
class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
with AlsoTestWithEncodingTypes with AlsoTestWithRocksDBFeatures {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.functions.timestamp_seconds
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types._
import org.apache.spark.tags.SlowSQLTest

object TransformWithStateSuiteUtils {
val NUM_SHUFFLE_PARTITIONS = 5
Expand Down Expand Up @@ -2552,6 +2553,7 @@ abstract class TransformWithStateSuite extends StateStoreMetricsTest
}
}

@SlowSQLTest
class TransformWithStateValidationSuite extends StateStoreMetricsTest {
import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStoreValueSchemaNotCompatible}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.tags.SlowSQLTest

@SlowSQLTest
class TransformWithStateUnsafeRowSuite extends TransformWithStateSuite {

import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types._
import org.apache.spark.tags.SlowSQLTest

object TTLInputProcessFunction {
def processRow(
Expand Down Expand Up @@ -185,6 +186,7 @@ class TTLProcessorWithCompositeTypes(
}
}

@SlowSQLTest
class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest {

import testImplicits._
Expand Down