Skip to content
Prev Previous commit
Next Next commit
Address comments
  • Loading branch information
xuanyuanking committed Jun 17, 2020
commit fd74ff9c337d06f4cb4ccfc638d837b5ea3d0e11
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ object UnsafeRowUtils {
if ((row.getLong(index) >> 32) != 0L) return false
case _ =>
}
case (field, index) if field.dataType == NullType =>
if (!row.isNullAt(index) || row.getLong(index) != 0L) return false
case _ =>
}
if (bitSetWidthInBytes + 8 * row.numFields + varLenFieldsSizeInBytes > rowSizeInBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
@volatile private var storeConf: StateStoreConf = _
@volatile private var hadoopConf: Configuration = _
@volatile private var numberOfVersionsToRetainInMemory: Int = _
// TODO: The validation should be moved to a higher level so that it works for all state store
// implementations
@volatile private var isValidated = false
Copy link
Contributor

@cloud-fan cloud-fan Jun 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a TODO that this validation should be moved to a higher level so that it works for all state store implementations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, add the TODO in fd74ff9.


private lazy val loadedMaps = new util.TreeMap[Long, MapType](Ordering[Long].reverse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{ThreadUtils, Utils}

Expand Down Expand Up @@ -149,7 +148,7 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat
* An exception thrown when an invalid UnsafeRow is detected in state store.
*/
class InvalidUnsafeRowException
extends SparkException("The streaming query failed by state format invalidation. " +
extends RuntimeException("The streaming query failed by state format invalidation. " +
"The following reasons may cause this: 1. An old Spark version wrote the checkpoint that is " +
"incompatible with the current one; 2. Broken checkpoint files; 3. The query is changed " +
"among restart. For the first case, you can try to restart the application without " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first case: I think it's for the cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resolution is for the first case. For the rest cases listing, they should be considered as user problems.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class StateStoreConf(

/** Whether validate the value format when the format invalidation enabled. */
val formatValidationCheckValue: Boolean =
extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG, "false") == "true"
extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG, "true") == "true"

/**
* Additional configurations related to state store. This will capture all configs in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ package object state {
valueSchema,
indexOrdinal,
sessionState,
storeCoordinator)
storeCoordinator,
extraOptions)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ case class StreamingDeduplicateExec(
Some(sqlContext.streams.stateStoreCoordinator),
// We won't check value row in state store since the value StreamingDeduplicateExec.EMPTY_ROW
// is unrelated to the output schema.
Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "true")) { (store, iter) =>
Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "false")) { (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
val numOutputRows = longMetric("numOutputRows")
val numTotalStateRows = longMetric("numTotalStateRows")
Expand Down