Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test case
  • Loading branch information
ericm-db committed Jul 9, 2024
commit 07ccd55407896f88fb19efd5635e4af73aec9a2d
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.SparkRuntimeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Encoders}
import org.apache.spark.sql.{Dataset, Encoders, Row}
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA}
Expand Down Expand Up @@ -804,6 +804,183 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

private def fetchColumnFamilySchemas(
checkpointDir: String,
operatorId: Int): List[ColumnFamilySchema] = {
fetchStateSchemaV3File(checkpointDir, operatorId).getLatest().get._2
}

private def fetchStateSchemaV3File(
checkpointDir: String,
operatorId: Int): StateSchemaV3File = {
val hadoopConf = spark.sessionState.newHadoopConf()
val stateChkptPath = new Path(checkpointDir, s"state/$operatorId")
val stateSchemaPath = new Path(new Path(stateChkptPath, "_metadata"), "schema")
new StateSchemaV3File(hadoopConf, stateSchemaPath.toString)
}

test("transformWithState - verify StateSchemaV3 file is written correctly") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
withTempDir { chkptDir =>
val inputData = MemoryStream[String]
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(("a", "1")),
StopStream
)

val columnFamilySchemas = fetchColumnFamilySchemas(chkptDir.getCanonicalPath, 0)
assert(columnFamilySchemas.length == 1)

val expected = ColumnFamilySchemaV1(
"countState",
KEY_ROW_SCHEMA,
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA),
false, Encoders.scalaLong.schema, None
)
val actual = columnFamilySchemas.head.asInstanceOf[ColumnFamilySchemaV1]
assert(expected == actual)
}
}
}

test("transformWithState - verify StateSchemaV3 file is written correctly," +
" multiple column families") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
withTempDir { chkptDir =>
val inputData = MemoryStream[(String, String)]
val result = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, ("a", "str1")),
CheckNewAnswer(("a", "1", "")),
StopStream
)

val columnFamilySchemas = fetchColumnFamilySchemas(chkptDir.getCanonicalPath, 0)
assert(columnFamilySchemas.length == 2)

val expected = List(
ColumnFamilySchemaV1(
"countState",
KEY_ROW_SCHEMA,
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA),
false,
Encoders.scalaLong.schema,
None
),
ColumnFamilySchemaV1(
"mostRecent",
KEY_ROW_SCHEMA,
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA),
false,
Encoders.STRING.schema,
None
)
)
val actual = columnFamilySchemas.map(_.asInstanceOf[ColumnFamilySchemaV1])
assert(expected == actual)
}
}
}

test("transformWithState - verify that StateSchemaV3 files are purged") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
withTempDir { chkptDir =>
val inputData = MemoryStream[String]
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(("a", "1")),
StopStream,
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(("a", "2")),
StopStream,
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(),
StopStream
)
// If the StateSchemaV3 files are not purged, there would be
// three files, but we should have only one file.
val batchesWithSchemaV3File = fetchStateSchemaV3File(
chkptDir.getCanonicalPath, 0).listBatchesOnDisk
assert(batchesWithSchemaV3File.length == 1)
// Make sure that only the latest batch has the schema file
assert(batchesWithSchemaV3File.head == 2)
}
}
}

test("transformWithState - verify that OperatorStateMetadataV2" +
" file is being written correctly") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
withTempDir { checkpointDir =>
val inputData = MemoryStream[String]
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(("a", "1")),
StopStream,
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(inputData, "a"),
CheckNewAnswer(("a", "2")),
StopStream
)

val df = spark.read.format("state-metadata").load(checkpointDir.toString)
checkAnswer(df, Seq(
Row(0, "transformWithStateExec", "default", 5, 0L, 0L),
Row(0, "transformWithStateExec", "default", 5, 1L, 1L)
))
checkAnswer(df.select(df.metadataColumn("_operatorProperties")),
Seq(
Row("""{"timeMode":"NoTime","outputMode":"Update"}"""),
Row("""{"timeMode":"NoTime","outputMode":"Update"}""")
)
)
}
}
}

test("transformWithState - verify StateSchemaV3 serialization and deserialization" +
" works with one batch") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
Expand Down