Skip to content
Prev Previous commit
Next Next commit
rebase
  • Loading branch information
ericm-db committed Jul 16, 2024
commit 43bf78fd82db85867a5213981e82cb2f33d9dcef
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.io.{Source => IOSource}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion

Expand All @@ -40,7 +39,7 @@ import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVers
*/
class StateSchemaV3File(
hadoopConf: Configuration,
path: String) extends Logging {
path: String) {

val metadataPath = new Path(path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{NextIterator, ThreadUtils, Utils}

Expand Down Expand Up @@ -292,14 +293,14 @@ object KeyStateEncoderSpec {
// match on type
m("keyStateEncoderType").asInstanceOf[String] match {
case "NoPrefixKeyStateEncoderSpec" =>
NoPrefixKeyStateEncoderSpec(keySchema)
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA)
case "RangeKeyScanStateEncoderSpec" =>
val orderingOrdinals = m("orderingOrdinals").
asInstanceOf[List[_]].map(_.asInstanceOf[BigInt].toInt)
RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals)
case "PrefixKeyScanStateEncoderSpec" =>
val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt]
PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey.toInt)
PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, numColsPrefixKey.toInt)
}
}
}
Expand Down