Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@
"Another instance of this query was just started by a concurrent session."
]
},
"CONCURRENT_STREAM_LOG_UPDATE" : {
"message" : [
"Concurrent update to the log. Multiple streaming jobs detected for <batchId>.",
"Please make sure only one streaming job runs on a specific checkpoint location at a time."
],
"sqlState" : "40000"
},
"CONNECT" : {
"message" : [
"Generic Spark Connect error."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,13 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
messageParameters = Map.empty[String, String])
}

def concurrentStreamLogUpdate(batchId: Long): Throwable = {
new SparkException(
errorClass = "CONCURRENT_STREAM_LOG_UPDATE",
messageParameters = Map("batchId" -> batchId.toString),
cause = null)
}

def cannotParseJsonArraysAsStructsError(): SparkRuntimeException = {
new SparkRuntimeException(
errorClass = "_LEGACY_ERROR_TEMP_2132",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, ThreadPoo
import scala.collection.JavaConverters._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors

/**
* Implementation of CommitLog to perform asynchronous writes to storage
Expand Down Expand Up @@ -54,9 +55,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService:
if (ret) {
batchId
} else {
throw new IllegalStateException(
s"Concurrent update to the log. Multiple streaming jobs detected for $batchId"
)
throw QueryExecutionErrors.concurrentStreamLogUpdate(batchId)
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.util.{Clock, SystemClock}

/**
Expand Down Expand Up @@ -90,9 +91,7 @@ class AsyncOffsetSeqLog(
if (ret) {
batchId
} else {
throw new IllegalStateException(
s"Concurrent update to the log. Multiple streaming jobs detected for $batchId"
)
throw QueryExecutionErrors.concurrentStreamLogUpdate(batchId)
}
})
pendingOffsetWrites.put(batchId, future)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.streaming.WriteToStream
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.{Clock, ThreadUtils}

Expand Down Expand Up @@ -194,9 +195,7 @@ class AsyncProgressTrackingMicroBatchExecution(
} else {
if (!commitLog.addInMemory(
currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
throw new IllegalStateException(
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId"
)
throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId)
}
}
offsetLog.removeAsyncOffsetWrite(currentBatchId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,9 +760,11 @@ class MicroBatchExecution(
* checkpointing to offset log and any microbatch startup tasks.
*/
protected def markMicroBatchStart(): Unit = {
assert(offsetLog.add(currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
if (!offsetLog.add(currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId)
}

logInfo(s"Committed offsets for batch $currentBatchId. " +
s"Metadata ${offsetSeqMetadata.toString}")
}
Expand All @@ -780,9 +782,9 @@ class MicroBatchExecution(
protected def markMicroBatchEnd(): Unit = {
watermarkTracker.updateWatermark(lastExecution.executedPlan)
reportTimeTaken("commitOffsets") {
assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)),
"Concurrent update to the commit log. Multiple streaming jobs detected for " +
s"$currentBatchId")
if (!commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId)
}
}
committedOffsets ++= availableOffsets
}
Expand Down