From 6b45086ab65d0842addd6fe234059d096d6b5b7f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 1 Aug 2025 11:09:50 -0700 Subject: [PATCH] [SPARK-53068][SQL][TESTS] Mark `TransformWith*Suite` as `SlowSQLTest` in `sql/core` --- .../streaming/TransformWithStateInPySparkStateServerSuite.scala | 2 ++ .../spark/sql/streaming/TransformWithListStateSuite.scala | 2 ++ .../spark/sql/streaming/TransformWithListStateTTLSuite.scala | 2 ++ .../apache/spark/sql/streaming/TransformWithMapStateSuite.scala | 2 ++ .../spark/sql/streaming/TransformWithMapStateTTLSuite.scala | 2 ++ .../spark/sql/streaming/TransformWithStateAvroSuite.scala | 2 ++ .../spark/sql/streaming/TransformWithStateChainingSuite.scala | 2 ++ .../spark/sql/streaming/TransformWithStateClusterSuite.scala | 2 ++ .../sql/streaming/TransformWithStateInitialStateSuite.scala | 2 ++ .../apache/spark/sql/streaming/TransformWithStateSuite.scala | 2 ++ .../spark/sql/streaming/TransformWithStateUnsafeRowSuite.scala | 2 ++ .../spark/sql/streaming/TransformWithValueStateTTLSuite.scala | 2 ++ 12 files changed, 24 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServerSuite.scala index 1321ba528e11..76c9a1fbbd4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServerSuite.scala @@ -35,7 +35,9 @@ import org.apache.spark.sql.execution.streaming.state.StateMessage import org.apache.spark.sql.execution.streaming.state.StateMessage.{AppendList, AppendValue, Clear, ContainsKey, DeleteTimer, Exists, ExpiryTimerRequest, Get, GetProcessingTime, GetValue, GetWatermark, HandleState, Keys, ListStateCall, ListStateGet, ListStatePut, ListTimers, MapStateCall, ParseStringSchema, RegisterTimer, RemoveKey, SetHandleState, StateCallCommand, StatefulProcessorCall, TimerRequest, TimerStateCallCommand, TimerValueRequest, UpdateValue, UtilsRequest, Values, ValueStateCall, ValueStateUpdate} import org.apache.spark.sql.streaming.{ListState, MapState, TTLConfig, ValueState} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.tags.SlowSQLTest +@SlowSQLTest class TransformWithStateInPySparkStateServerSuite extends SparkFunSuite with BeforeAndAfterEach { val stateName = "test" val iteratorId = "testId" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala index 2fbc1d6afbbc..efa95d152bb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.Encoders import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithEncodingTypes, AlsoTestWithRocksDBFeatures, RocksDBStateStoreProvider} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.tags.SlowSQLTest case class InputRow(key: String, action: String, value: String) @@ -221,6 +222,7 @@ class ToggleSaveAndEmitProcessor } } +@SlowSQLTest class TransformWithListStateSuite extends StreamTest with AlsoTestWithRocksDBFeatures with AlsoTestWithEncodingTypes { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala index bd3667b16591..b6e7c6596075 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.streaming.{ListStateImplWithTTL, MemoryStr import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.tags.SlowSQLTest // MultiStatefulVariableTTLProcessor is a StatefulProcessor that consumes a stream of // strings and returns a stream of pairs. @@ -164,6 +165,7 @@ class ListStateTTLProcessor(ttlConfig: TTLConfig) * Test suite for testing list state with TTL. * We use the base TTL suite with a list state processor. */ +@SlowSQLTest class TransformWithListStateTTLSuite extends TransformWithStateTTLTest with StateStoreMetricsTest { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala index c157f0a00839..62effb8d7d18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.Encoders import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithEncodingTypes, AlsoTestWithRocksDBFeatures, RocksDBStateStoreProvider} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.tags.SlowSQLTest case class InputMapRow(key: String, action: String, value: (String, String)) @@ -138,6 +139,7 @@ class EvolvedMapStateProcessor extends StatefulProcessor[String, String, (String * Class that adds integration tests for MapState types used in arbitrary stateful * operators such as transformWithState. */ +@SlowSQLTest class TransformWithMapStateSuite extends StreamTest with AlsoTestWithEncodingTypes with AlsoTestWithRocksDBFeatures { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala index c845059f20fe..ef2e4f5a919b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.streaming.{MapStateImplWithTTL, MemoryStre import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.tags.SlowSQLTest class MapStateSingleKeyTTLProcessor(ttlConfig: TTLConfig) extends StatefulProcessor[String, InputEvent, OutputEvent] { @@ -173,6 +174,7 @@ class MapStateTTLProcessor(ttlConfig: TTLConfig) } } +@SlowSQLTest class TransformWithMapStateTTLSuite extends TransformWithStateTTLTest { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala index ce0f2113eac5..8da9edda02ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala @@ -33,7 +33,9 @@ import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV2, import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types.StructType +import org.apache.spark.tags.SlowSQLTest +@SlowSQLTest class TransformWithStateAvroSuite extends TransformWithStateSuite { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala index beb229ee513e..553e28d4c0e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithEncodingTypes, AlsoTestWithRocksDBFeatures, RocksDBStateStoreProvider} import org.apache.spark.sql.functions.window import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.tags.SlowSQLTest case class InputEventRow( key: String, @@ -103,6 +104,7 @@ case class AggEventRow( window: Window, count: Long) +@SlowSQLTest class TransformWithStateChainingSuite extends StreamTest with AlsoTestWithEncodingTypes with AlsoTestWithRocksDBFeatures { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateClusterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateClusterSuite.scala index 4f6c5268a7be..3f0bb060d36d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateClusterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateClusterSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.LocalSparkSession.withSparkSession import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.tags.SlowSQLTest case class FruitState( name: String, @@ -132,6 +133,7 @@ trait TransformWithStateClusterSuiteBase extends SparkFunSuite { * Test suite spawning local cluster with multiple executors to test serde of stateful * processors along with use of implicit encoders, if applicable in transformWithState operator. */ +@SlowSQLTest class TransformWithStateClusterSuite extends StreamTest with TransformWithStateClusterSuiteBase { testWithAndWithoutImplicitEncoders("streaming with transformWithState - " + "without initial state") { (spark, useImplicits) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala index 056453fe2db1..e7435fb02aad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithEncodingTypes import org.apache.spark.sql.functions.{col, timestamp_seconds} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.tags.SlowSQLTest case class InitInputRow(key: String, action: String, value: Double) case class InputRowForInitialState( @@ -360,6 +361,7 @@ class StatefulProcessorWithInitialStateEventTimerClass * Class that adds tests for transformWithState stateful * streaming operator with user-defined initial state */ +@SlowSQLTest class TransformWithStateInitialStateSuite extends StateStoreMetricsTest with AlsoTestWithEncodingTypes with AlsoTestWithRocksDBFeatures { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index ece3b8bf942b..6127bf5395f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.functions.timestamp_seconds import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types._ +import org.apache.spark.tags.SlowSQLTest object TransformWithStateSuiteUtils { val NUM_SHUFFLE_PARTITIONS = 5 @@ -2552,6 +2553,7 @@ abstract class TransformWithStateSuite extends StateStoreMetricsTest } } +@SlowSQLTest class TransformWithStateValidationSuite extends StateStoreMetricsTest { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateUnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateUnsafeRowSuite.scala index b8d29560521a..1988a458ed1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateUnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateUnsafeRowSuite.scala @@ -25,7 +25,9 @@ import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStoreValueSchemaNotCompatible} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.tags.SlowSQLTest +@SlowSQLTest class TransformWithStateUnsafeRowSuite extends TransformWithStateSuite { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala index 2b33b3feb307..3070de83b67b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types._ +import org.apache.spark.tags.SlowSQLTest object TTLInputProcessFunction { def processRow( @@ -185,6 +186,7 @@ class TTLProcessorWithCompositeTypes( } } +@SlowSQLTest class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { import testImplicits._