Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
OP-667: [SPARK-24634][SS] Add a new metric regarding number of rows l…
…ater than watermark plus allowed delay apache#24936
  • Loading branch information
jainshashank24 committed Sep 10, 2020
commit 4c6cf797f8cfa740f6412edfb0fc802d6b851f0c
5 changes: 5 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,11 @@ Any of the stateful operation(s) after any of below stateful operations can have
As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function
emits late rows if the operator uses Append mode.

Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue:

1. On Spark UI: check the metrics in "CountLateRows" node in query execution details page in SQL tab
2. On Streaming Query Listener: check "numLateInputRows" in "stateOperators" in QueryProcessEvent

There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
aggregateExpressions.map(expr => expr.asInstanceOf[AggregateExpression]),
rewrittenResultExpressions,
stateVersion,
planLater(child))
CountLateRowsExec(None, planLater(child)))

case _ => Nil
}
Expand All @@ -441,7 +441,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object StreamingDeduplicationStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case Deduplicate(keys, child) if child.isStreaming =>
StreamingDeduplicateExec(keys, planLater(child)) :: Nil
StreamingDeduplicateExec(keys, CountLateRowsExec(None, planLater(child))) :: Nil

case _ => Nil
}
Expand Down Expand Up @@ -492,7 +492,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

val stateVersion = conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION)
new StreamingSymmetricHashJoinExec(leftKeys, rightKeys, joinType, condition,
stateVersion, planLater(left), planLater(right)) :: Nil
stateVersion, CountLateRowsExec(None, planLater(left)),
CountLateRowsExec(None, planLater(right))) :: Nil

case Join(left, right, _, _, _) if left.isStreaming && right.isStreaming =>
throw new AnalysisException(
Expand Down Expand Up @@ -625,10 +626,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case FlatMapGroupsWithState(
func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, stateEnc, outputMode, _,
timeout, child) =>

val stateVersion = conf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
val execPlan = FlatMapGroupsWithStateExec(
func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, None, stateEnc, stateVersion,
outputMode, timeout, batchTimestampMs = None, eventTimeWatermark = None, planLater(child))
outputMode, timeout, batchTimestampMs = None, eventTimeWatermark = None,
CountLateRowsExec(None, planLater(child)))

execPlan :: Nil
case _ =>
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ class IncrementalExecution(
}

override def apply(plan: SparkPlan): SparkPlan = plan transform {
case m: CountLateRowsExec =>
m.copy(eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))

case StateStoreSaveExec(keys, None, None, None, stateFormatVersion,
UnaryExecNode(agg,
StateStoreRestoreExec(_, None, _, child))) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ trait ProgressReporter extends Logging {
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreWriter] =>
val progress = p.asInstanceOf[StateStoreWriter].getProgress()
if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0)
if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0, newNumLateInputRows = 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
Expand All @@ -37,6 +37,84 @@ import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress}
import org.apache.spark.sql.types._
import org.apache.spark.util.{CompletionIterator, NextIterator, Utils}

case class CountLateRowsExec(
eventTimeWatermark: Option[Long] = None,
child: SparkPlan)
extends UnaryExecNode with WatermarkSupport with CodegenSupport {

// No need to determine key expressions here.
override def keyExpressions: Seq[Attribute] = Seq.empty

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning

override lazy val metrics = Map(
"numLateRows" -> SQLMetrics.createMetric(sparkContext,
"number of input rows later than watermark plus allowed delay"))

override protected def doExecute(): RDD[InternalRow] = {
val numLateRows = longMetric("numLateRows")
child.execute().mapPartitionsWithIndex { (_, iter) =>
watermarkPredicateForData match {
case Some(pred) =>
iter.map { row =>
val r = pred.eval(row)
if (r) numLateRows += 1
row
}

case None => iter
}
}
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
}

override protected def doProduce(ctx: CodegenContext): String = {
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
val numLateRows = metricTerm(ctx, "numLateRows")

val generated = watermarkExpression match {
case Some(expr) =>
val bound = BindReferences.bindReference(expr, child.output)
val evaluated = evaluateRequiredVariables(child.output, input, expr.references)

// Generate the code for the predicate.
val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx)
val nullCheck = if (bound.nullable) {
s"${ev.isNull} || "
} else {
s""
}

s"""
|$evaluated
|${ev.code}
|if (${nullCheck}${ev.value}) {
| $numLateRows.add(1);
|}
""".stripMargin

case None => ""
}

// Note: wrap in "do { } while(false);", so the generated checks can jump out with "continue;"
s"""
|do {
| $generated
| ${consume(ctx, input)}
|} while(false);
""".stripMargin
}
}

/** Used to identify the state store for a given operator. */
case class StatefulOperatorStateInfo(
Expand Down Expand Up @@ -96,11 +174,16 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
new java.util.HashMap(customMetrics.mapValues(long2Long).asJava)

val numLateInputRows = self.children.flatMap(_.collectFirst {
case d: CountLateRowsExec => d
}).map(_.metrics("numLateRows").value).sum

new StateOperatorProgress(
numRowsTotal = longMetric("numTotalStateRows").value,
numRowsUpdated = longMetric("numUpdatedStateRows").value,
numLateInputRows = numLateInputRows,
memoryUsedBytes = longMetric("stateMemory").value,
javaConvertedCustomMetrics
customMetrics = javaConvertedCustomMetrics
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long,
val numLateInputRows: Long,
val memoryUsedBytes: Long,
val customMetrics: ju.Map[String, JLong] = new ju.HashMap()
) extends Serializable {
Expand All @@ -52,12 +53,14 @@ class StateOperatorProgress private[sql](
/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))

private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics)
private[sql] def copy(newNumRowsUpdated: Long, newNumLateInputRows: Long): StateOperatorProgress =
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, newNumLateInputRows, memoryUsedBytes,
customMetrics)

private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
("numLateInputRows" -> JInt(numLateInputRows)) ~
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
("customMetrics" -> {
if (!customMetrics.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((10, 5)),
assertNumStateRows(2),
assertNumLateRows(0),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2)
assertNumStateRows(2),
assertNumLateRows(1)
)
}

Expand All @@ -321,12 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((25, 1)),
assertNumStateRows(2),
assertNumLateRows(0),
AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
CheckNewAnswer((25, 2)),
assertNumStateRows(2),
assertNumLateRows(1),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2)
assertNumStateRows(2),
assertNumLateRows(1)
)
}

Expand Down Expand Up @@ -377,8 +382,10 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
testStream(df)(
AddData(inputData, 10, 11, 12, 13, 14, 15),
CheckAnswer(),
assertNumLateRows(0),
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckAnswer((10, 5)),
assertNumLateRows(0),
StopStream,
AssertOnQuery { q => // purge commit and clear the sink
val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
Expand All @@ -389,12 +396,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
StartStream(),
AddData(inputData, 10, 27, 30), // Advance watermark to 20 seconds, 10 should be ignored
CheckAnswer((15, 1)),
assertNumLateRows(1),
StopStream,
StartStream(),
AddData(inputData, 17), // Watermark should still be 20 seconds, 17 should be ignored
CheckAnswer((15, 1)),
assertNumLateRows(1),
AddData(inputData, 40), // Advance watermark to 30 seconds, emit first data 25
CheckNewAnswer((25, 2))
CheckNewAnswer((25, 2)),
assertNumLateRows(0)
)
}

Expand Down Expand Up @@ -486,18 +496,24 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])

// No eviction when asked to compute complete results.
// No state eviction when asked to compute complete results.
// It still counts late input rows, though.
testStream(windowedAggregation, OutputMode.Complete)(
AddData(inputData, 10, 11, 12),
CheckAnswer((10, 3)),
assertNumLateRows(0),
AddData(inputData, 25),
CheckAnswer((10, 3), (25, 1)),
assertNumLateRows(0),
AddData(inputData, 25),
CheckAnswer((10, 3), (25, 2)),
assertNumLateRows(0),
AddData(inputData, 10),
CheckAnswer((10, 4), (25, 2)),
assertNumLateRows(1),
AddData(inputData, 25),
CheckAnswer((10, 4), (25, 3))
CheckAnswer((10, 4), (25, 3)),
assertNumLateRows(0)
)
}

Expand Down Expand Up @@ -783,6 +799,13 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
true
}

private def assertNumLateRows(numLateRows: Long): AssertOnQuery = AssertOnQuery { q =>
q.processAllAvailable()
val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
assert(progressWithData.stateOperators(0).numLateInputRows === numLateRows)
true
}

/** Assert event stats generated on that last batch with data in it */
private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
Execute("AssertEventStats") { q =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,20 +626,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
testStream(result, Update)(
AddData(inputData, "a"),
CheckNewAnswer(("a", "1")),
assertNumStateRows(total = 1, updated = 1),
assertNumStateRows(total = 1, updated = 1, lateInput = 0),
AddData(inputData, "a", "b"),
CheckNewAnswer(("a", "2"), ("b", "1")),
assertNumStateRows(total = 2, updated = 2),
assertNumStateRows(total = 2, updated = 2, lateInput = 0),
StopStream,
StartStream(),
AddData(inputData, "a", "b"), // should remove state for "a" and not return anything for a
CheckNewAnswer(("b", "2")),
assertNumStateRows(total = 1, updated = 2),
assertNumStateRows(total = 1, updated = 2, lateInput = 0),
StopStream,
StartStream(),
AddData(inputData, "a", "c"), // should recreate state for "a" and return count as 1 and
CheckNewAnswer(("a", "1"), ("c", "1")),
assertNumStateRows(total = 3, updated = 2)
assertNumStateRows(total = 3, updated = 2, lateInput = 0)
)
}

Expand Down Expand Up @@ -768,25 +768,25 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("a", "1")),
assertNumStateRows(total = 1, updated = 1),
assertNumStateRows(total = 1, updated = 1, lateInput = 0),

AddData(inputData, "b"),
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("b", "1")),
assertNumStateRows(total = 2, updated = 1),
assertNumStateRows(total = 2, updated = 1, lateInput = 0),

AddData(inputData, "b"),
AdvanceManualClock(10 * 1000),
CheckNewAnswer(("a", "-1"), ("b", "2")),
assertNumStateRows(total = 1, updated = 2),
assertNumStateRows(total = 1, updated = 2, lateInput = 0),

StopStream,
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),

AddData(inputData, "c"),
AdvanceManualClock(11 * 1000),
CheckNewAnswer(("b", "-1"), ("c", "1")),
assertNumStateRows(total = 1, updated = 2),
assertNumStateRows(total = 1, updated = 2, lateInput = 0),

AdvanceManualClock(12 * 1000),
AssertOnQuery { _ => clock.getTimeMillis() == 35000 },
Expand All @@ -798,7 +798,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
}
},
CheckNewAnswer(("c", "-1")),
assertNumStateRows(total = 0, updated = 1)
assertNumStateRows(total = 0, updated = 1, lateInput = 0)
)
}

Expand Down Expand Up @@ -978,20 +978,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
testStream(result, Update)(
AddData(inputData, "a"),
CheckNewAnswer(("a", "1")),
assertNumStateRows(total = 1, updated = 1),
assertNumStateRows(total = 1, updated = 1, lateInput = 0),
AddData(inputData, "a", "b"),
CheckNewAnswer(("a", "2"), ("b", "1")),
assertNumStateRows(total = 2, updated = 2),
assertNumStateRows(total = 2, updated = 2, lateInput = 0),
StopStream,
StartStream(),
AddData(inputData, "a", "b"), // should remove state for "a" and return count as -1
CheckNewAnswer(("a", "-1"), ("b", "2")),
assertNumStateRows(total = 1, updated = 2),
assertNumStateRows(total = 1, updated = 2, lateInput = 0),
StopStream,
StartStream(),
AddData(inputData, "a", "c"), // should recreate state for "a" and return count as 1
CheckNewAnswer(("a", "1"), ("c", "1")),
assertNumStateRows(total = 3, updated = 2)
assertNumStateRows(total = 3, updated = 2, lateInput = 0)
)
}

Expand Down
Loading