Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rule is being applied
  • Loading branch information
ericm-db committed Jun 13, 2024
commit 2adbb02d5f2d924854059175b81bc4129ef902df
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,12 @@ class IncrementalExecution(
}

object PopulateSchemaV3Rule extends SparkPlanPartialRule with Logging {
logError(s"### PopulateSchemaV3Rule, batchId = $currentBatchId")
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case tws: TransformWithStateExec =>
case tws: TransformWithStateExec if isFirstBatch && currentBatchId != 0 =>
val stateSchemaV3File = new StateSchemaV3File(
hadoopConf, tws.stateSchemaFilePath().toString)
logError(s"### trying to get schema from file: ${tws.stateSchemaFilePath()}")
stateSchemaV3File.getLatest() match {
case Some((_, schemaJValue)) =>
logError("### PASSING SCHEMA TO OPERATOR")
logError(s"### schemaJValue: $schemaJValue")
tws.copy(columnFamilyJValue = Some(schemaJValue))
case None => tws
}
Expand Down Expand Up @@ -471,7 +467,6 @@ class IncrementalExecution(
}

override def apply(plan: SparkPlan): SparkPlan = {
logError(s"### applying rules to plan")
val planWithStateOpId = plan transform composedRule
val planWithSchema = planWithStateOpId transform PopulateSchemaV3Rule.rule
// Need to check before write to metadata because we need to detect add operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,39 +899,38 @@ class MicroBatchExecution(
*/
protected def markMicroBatchEnd(execCtx: MicroBatchExecutionContext): Unit = {
watermarkTracker.updateWatermark(execCtx.executionPlan.executedPlan)
val shouldWriteMetadatas = execCtx.previousContext match {
case Some(prevCtx)
if prevCtx.executionPlan.runId == execCtx.executionPlan.runId =>
false
case _ => true
}

if (shouldWriteMetadatas) {
execCtx.executionPlan.executedPlan.collect {
case tws: TransformWithStateExec =>
val schema = tws.getColumnFamilyJValue()
val metadata = tws.operatorStateMetadata()
val id = metadata.operatorInfo.operatorId
val schemaFile = stateSchemaLogs(id)
if (!schemaFile.add(execCtx.batchId, schema)) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
}
execCtx.executionPlan.executedPlan.collect {
case s: StateStoreWriter =>
val metadata = s.operatorStateMetadata()
val id = metadata.operatorInfo.operatorId
val metadataFile = operatorStateMetadataLogs(id)
if (!metadataFile.add(execCtx.batchId, metadata)) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
}
}
execCtx.reportTimeTaken("commitOffsets") {
if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
val shouldWriteMetadatas = execCtx.previousContext match {
case Some(prevCtx)
if prevCtx.executionPlan.runId == execCtx.executionPlan.runId =>
false
case _ => true
}

if (shouldWriteMetadatas) {
execCtx.executionPlan.executedPlan.collect {
case tws: TransformWithStateExec =>
val schema = tws.getColumnFamilyJValue()
val metadata = tws.operatorStateMetadata()
val id = metadata.operatorInfo.operatorId
val schemaFile = stateSchemaLogs(id)
logError(s"Writing schema for operator $id at path ${schemaFile.metadataPath}")
if (!schemaFile.add(execCtx.batchId, schema)) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
}
execCtx.executionPlan.executedPlan.collect {
case s: StateStoreWriter =>
val metadata = s.operatorStateMetadata()
val id = metadata.operatorInfo.operatorId
val metadataFile = operatorStateMetadataLogs(id)
if (!metadataFile.add(execCtx.batchId, metadata)) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
}
}
}
committedOffsets ++= execCtx.endOffsets
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,7 @@ case class TransformWithStateExec(
}

def columnFamilySchemas(): List[ColumnFamilySchema] = {
val columnFamilySchemas = ColumnFamilySchemaV1.fromJValue(columnFamilyJValue)
columnFamilySchemas.foreach {
case c1: ColumnFamilySchemaV1 => logError(s"### colFamilyName:" +
s"${c1.columnFamilyName}")
}
columnFamilySchemas
ColumnFamilySchemaV1.fromJValue(columnFamilyJValue)
}

override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,14 +919,16 @@ class TransformWithStateSuite extends StateStoreMetricsTest
OutputMode.Append())

// verify that query with ttl enabled after restart fails
testStream(result, OutputMode.Append())(
testStream(result2, OutputMode.Append())(
StartStream(
Trigger.ProcessingTime("1 second"),
triggerClock = clock,
checkpointLocation = chkptDir.getCanonicalPath
),
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000)
AdvanceManualClock(1 * 1000),
CheckNewAnswer(("a", "1")),
StopStream
)
}
}
Expand Down