Skip to content
Prev Previous commit
Next Next commit
change the config for specific operator only
  • Loading branch information
xuanyuanking committed Jun 17, 2020
commit 01007fb9f03c003bfc00d2e2358c9029b83f16e6
Original file line number Diff line number Diff line change
Expand Up @@ -1246,16 +1246,6 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATE_STORE_FORMAT_VALIDATION_CHECK_VALUE =
buildConf("spark.sql.streaming.stateStore.formatValidation.checkValue")
.internal()
.doc("When true, check if the value UnsafeRow from the state store is valid or not when " +
"running streaming queries. For some operations, we won't check the value format since " +
"the state store save fake values, e.g. Deduplicate.")
.version("3.1.0")
.booleanConf
.createWithDefault(true)

val FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION =
buildConf("spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion")
.internal()
Expand Down Expand Up @@ -2767,9 +2757,6 @@ class SQLConf extends Serializable with Logging {

def stateStoreFormatValidationEnabled: Boolean = getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED)

def stateStoreFormatValidationCheckValue: Boolean =
getConf(STATE_STORE_FORMAT_VALIDATION_CHECK_VALUE)

def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.streaming.state
import org.apache.spark.sql.internal.SQLConf

/** A class that contains configuration parameters for [[StateStore]]s. */
class StateStoreConf(@transient private val sqlConf: SQLConf)
class StateStoreConf(
@transient private val sqlConf: SQLConf,
extraOptions: Map[String, String] = Map.empty)
extends Serializable {

def this() = this(new SQLConf)
Expand All @@ -47,16 +49,21 @@ class StateStoreConf(@transient private val sqlConf: SQLConf)
val formatValidationEnabled: Boolean = sqlConf.stateStoreFormatValidationEnabled

/** Whether validate the value format when the format invalidation enabled. */
val formatValidationCheckValue: Boolean = sqlConf.stateStoreFormatValidationCheckValue
val formatValidationCheckValue: Boolean =
extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG, "false") == "true"

/**
* Additional configurations related to state store. This will capture all configs in
* SQLConf that start with `spark.sql.streaming.stateStore.` */
* SQLConf that start with `spark.sql.streaming.stateStore.` and extraOptions for a specific
* operator.
*/
val confs: Map[String, String] =
sqlConf.getAllConfs.filter(_._1.startsWith("spark.sql.streaming.stateStore."))
sqlConf.getAllConfs.filter(_._1.startsWith("spark.sql.streaming.stateStore.")) ++ extraOptions
}

object StateStoreConf {
val FORMAT_VALIDATION_CHECK_VALUE_CONFIG = "formatValidationCheckValue"

val empty = new StateStoreConf()

def apply(conf: SQLConf): StateStoreConf = new StateStoreConf(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
valueSchema: StructType,
indexOrdinal: Option[Int],
sessionState: SessionState,
@transient private val storeCoordinator: Option[StateStoreCoordinatorRef])
@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
extraOptions: Map[String, String] = Map.empty)
extends RDD[U](dataRDD) {

private val storeConf = new StateStoreConf(sessionState.conf)
private val storeConf = new StateStoreConf(sessionState.conf, extraOptions)

// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val hadoopConfBroadcast = dataRDD.context.broadcast(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ package object state {
valueSchema: StructType,
indexOrdinal: Option[Int],
sessionState: SessionState,
storeCoordinator: Option[StateStoreCoordinatorRef])(
storeCoordinator: Option[StateStoreCoordinatorRef],
extraOptions: Map[String, String] = Map.empty)(
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {

val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,16 @@ case class StreamingDeduplicateExec(
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver

// We won't check value row in state store since the value StreamingDeduplicateExec.EMPTY_ROW
// is unrelated to the output schema.
sqlContext.sessionState.conf.setConf(SQLConf.STATE_STORE_FORMAT_VALIDATION_CHECK_VALUE, false)

child.execute().mapPartitionsWithStateStore(
getStateInfo,
keyExpressions.toStructType,
child.output.toStructType,
indexOrdinal = None,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
Some(sqlContext.streams.stateStoreCoordinator),
// We won't check value row in state store since the value StreamingDeduplicateExec.EMPTY_ROW
// is unrelated to the output schema.
Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "true")) { (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
val numOutputRows = longMetric("numOutputRows")
val numTotalStateRows = longMetric("numTotalStateRows")
Expand Down