Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-24763][SS] Remove redundant key data from value in streaming a…
…ggregation

* add option to configure enabling new feature: remove redundant key data from value
* modify code to respect new option (turning on/off feature)
* modify tests to run tests with both on/off
* Add guard in OffsetSeqMetadata to prevent modifying option after executing query
  • Loading branch information
HeartSaVioR committed Jul 20, 2018
commit 4252f41d86cefb974ed4da5f26ea4805a0fee2e4
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,16 @@ object SQLConf {
.intConf
.createWithDefault(2)

val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get worried when I see things described as "advanced features". What will go wrong if a user who's insufficiently advanced tries to use it?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not compatible with current stateful aggregation (definitely, that's the improvement of this patch) and there is no undo. So only new query can turn on this option, and once end users enable the option in the query, the option must be enabled unless end users clear out checkpoint as well as state. (I've added the new option to OffsetSeqMetadata to remember the first setting like partition count).

I'm seeing performance on far or even slightly better on specific workload (publicized in description link), but I would say I cannot try out exhaustive workloads. I actually expected a tradeoff between performance vs state memory usage, so assuming if other workloads follow the tradeoff, end users may need to try out this option in their query with non-production environment (for example, staged) to ensure enabling option doesn't break their expectation of performance with benefit of reducing state memory.

That's why I also make changes available as an option instead of modifying default behavior. If we apply this to the default behavior, we need to provide state migration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a query option instead of a SparkConf, then? I worry it will be very hard to reason about the current scenario, where the conf defines how all states are stored - except that some streams started with a different value will silently override it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, but I'm not sure if I understand your suggestion correctly. I guess defining configuration to spark conf would be easier to guard against modification after starting query, via existing approach - adding conf to OffsetSeqMetadata - whereas I'm not sure we could guard against modification of query option. I might be missing something here.
Could you elaborate a bit more? Thanks in advance!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the default value of this is false so end users will be aware of existence of this option, and have a chance to read the explanation before setting this option to true.

We might elaborate a bit more on the config: tradeoff between reduced memory usage vs possible perf. hit and suggest running this in non-production before applying this to production. If we feel safer on elaborating more on this, I'm happy to update it.

buildConf("spark.sql.streaming.advanced.removeRedundantInStatefulAggregation")
.internal()
.doc("ADVANCED: When true, stateful aggregation tries to remove redundant data " +
"between key and value in state. Enabling this option helps minimizing state size, " +
"but no longer be compatible with state with disabling this option." +
"You can't change this option after starting the query.")
.booleanConf
.createWithDefault(false)

val UNSUPPORTED_OPERATION_CHECK_ENABLED =
buildConf("spark.sql.streaming.unsupportedOperationCheck")
.internal()
Expand Down Expand Up @@ -1618,6 +1628,9 @@ class SQLConf extends Serializable with Logging {
def advancedPartitionPredicatePushdownEnabled: Boolean =
getConf(ADVANCED_PARTITION_PREDICATE_PUSHDOWN)

def advancedRemoveRedundantInStatefulAggregation: Boolean =
getConf(ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION)

def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)

def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object OffsetSeqMetadata extends Logging {
private implicit val format = Serialization.formats(NoTypeHints)
private val relevantSQLConfs = Seq(
SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY,
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION)

/**
* Default values of relevant configurations that are used for backward compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, Predicate}
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner, Predicate}
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
Expand Down Expand Up @@ -204,30 +204,64 @@ case class StateStoreRestoreExec(
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {

val removeRedundant: Boolean = sqlContext.conf.advancedRemoveRedundantInStatefulAggregation
if (removeRedundant) {
log.info("Advanced option removeRedundantInStatefulAggregation activated!")
}

val valueExpressions: Seq[Attribute] = if (removeRedundant) {
child.output.diff(keyExpressions)
} else {
child.output
}
val keyValueJoinedExpressions: Seq[Attribute] = keyExpressions ++ valueExpressions
val needToProjectToRestoreValue: Boolean = keyValueJoinedExpressions != child.output

override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")

child.execute().mapPartitionsWithStateStore(
getStateInfo,
keyExpressions.toStructType,
child.output.toStructType,
valueExpressions.toStructType,
indexOrdinal = None,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
val joiner = GenerateUnsafeRowJoiner.create(StructType.fromAttributes(keyExpressions),
StructType.fromAttributes(valueExpressions))
val restoreValueProject = GenerateUnsafeProjection.generate(
keyValueJoinedExpressions, child.output)

val hasInput = iter.hasNext
if (!hasInput && keyExpressions.isEmpty) {
// If our `keyExpressions` are empty, we're getting a global aggregation. In that case
// the `HashAggregateExec` will output a 0 value for the partial merge. We need to
// restore the value, so that we don't overwrite our state with a 0 value, but rather
// merge the 0 with existing state.
// In this case the value should represent origin row, so no need to restore.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean? I think this is not needed any more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it can be removed now. Will remove.

store.iterator().map(_.value)
} else {
iter.flatMap { row =>
val key = getKey(row)
val savedState = store.get(key)
val restoredRow = if (removeRedundant) {
if (savedState == null) {
savedState
} else {
val joinedRow = joiner.join(key, savedState)
if (needToProjectToRestoreValue) {
restoreValueProject(joinedRow)
} else {
joinedRow
}
}
} else {
savedState
}

numOutputRows += 1
Option(savedState).toSeq :+ row
Option(restoredRow).toSeq :+ row
}
}
}
Expand Down Expand Up @@ -257,6 +291,19 @@ case class StateStoreSaveExec(
child: SparkPlan)
extends UnaryExecNode with StateStoreWriter with WatermarkSupport {

val removeRedundant: Boolean = sqlContext.conf.advancedRemoveRedundantInStatefulAggregation
if (removeRedundant) {
log.info("Advanced option removeRedundantInStatefulAggregation activated!")
}

val valueExpressions: Seq[Attribute] = if (removeRedundant) {
child.output.diff(keyExpressions)
} else {
child.output
}
val keyValueJoinedExpressions: Seq[Attribute] = keyExpressions ++ valueExpressions
val needToProjectToRestoreValue: Boolean = keyValueJoinedExpressions != child.output

override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
assert(outputMode.nonEmpty,
Expand All @@ -265,11 +312,17 @@ case class StateStoreSaveExec(
child.execute().mapPartitionsWithStateStore(
getStateInfo,
keyExpressions.toStructType,
child.output.toStructType,
valueExpressions.toStructType,
indexOrdinal = None,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
val getValue = GenerateUnsafeProjection.generate(valueExpressions, child.output)
val joiner = GenerateUnsafeRowJoiner.create(StructType.fromAttributes(keyExpressions),
StructType.fromAttributes(valueExpressions))
val restoreValueProject = GenerateUnsafeProjection.generate(
keyValueJoinedExpressions, child.output)

val numOutputRows = longMetric("numOutputRows")
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
Expand All @@ -283,7 +336,12 @@ case class StateStoreSaveExec(
while (iter.hasNext) {
val row = iter.next().asInstanceOf[UnsafeRow]
val key = getKey(row)
store.put(key, row)
val value = if (removeRedundant) {
getValue(row)
} else {
row
}
store.put(key, value)
numUpdatedStateRows += 1
}
}
Expand All @@ -294,7 +352,18 @@ case class StateStoreSaveExec(
setStoreMetrics(store)
store.iterator().map { rowPair =>
numOutputRows += 1
rowPair.value

if (removeRedundant) {
val joinedRow = joiner.join(rowPair.key, rowPair.value)
if (needToProjectToRestoreValue) {
restoreValueProject(joinedRow)
} else {
joinedRow
}
} else {
rowPair.value
}

}

// Update and output only rows being evicted from the StateStore
Expand All @@ -305,7 +374,12 @@ case class StateStoreSaveExec(
while (filteredIter.hasNext) {
val row = filteredIter.next().asInstanceOf[UnsafeRow]
val key = getKey(row)
store.put(key, row)
val value = if (removeRedundant) {
getValue(row)
} else {
row
}
store.put(key, value)
numUpdatedStateRows += 1
}
}
Expand All @@ -320,7 +394,17 @@ case class StateStoreSaveExec(
val rowPair = rangeIter.next()
if (watermarkPredicateForKeys.get.eval(rowPair.key)) {
store.remove(rowPair.key)
removedValueRow = rowPair.value

if (removeRedundant) {
val joinedRow = joiner.join(rowPair.key, rowPair.value)
removedValueRow = if (needToProjectToRestoreValue) {
restoreValueProject(joinedRow)
} else {
joinedRow
}
} else {
removedValueRow = rowPair.value
}
}
}
if (removedValueRow == null) {
Expand Down Expand Up @@ -353,7 +437,12 @@ case class StateStoreSaveExec(
if (baseIterator.hasNext) {
val row = baseIterator.next().asInstanceOf[UnsafeRow]
val key = getKey(row)
store.put(key, row)
val value = if (removeRedundant) {
getValue(row)
} else {
row
}
store.put(key, value)
numOutputRows += 1
numUpdatedStateRows += 1
row
Expand Down
Loading