diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 24f972a5006b..88d83806572b 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -202,6 +202,13 @@ "Another instance of this query was just started by a concurrent session." ] }, + "CONCURRENT_STREAM_LOG_UPDATE" : { + "message" : [ + "Concurrent update to the log. Multiple streaming jobs detected for .", + "Please make sure only one streaming job runs on a specific checkpoint location at a time." + ], + "sqlState" : "40000" + }, "CONNECT" : { "message" : [ "Generic Spark Connect error." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 52e8c7df91e6..835788321eab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1409,6 +1409,13 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map.empty[String, String]) } + def concurrentStreamLogUpdate(batchId: Long): Throwable = { + new SparkException( + errorClass = "CONCURRENT_STREAM_LOG_UPDATE", + messageParameters = Map("batchId" -> batchId.toString), + cause = null) + } + def cannotParseJsonArraysAsStructsError(): SparkRuntimeException = { new SparkRuntimeException( errorClass = "_LEGACY_ERROR_TEMP_2132", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala index e9ad8bed27c6..495f2f7ac0bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala @@ -23,6 +23,7 @@ import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, ThreadPoo import scala.collection.JavaConverters._ import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.errors.QueryExecutionErrors /** * Implementation of CommitLog to perform asynchronous writes to storage @@ -54,9 +55,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: if (ret) { batchId } else { - throw new IllegalStateException( - s"Concurrent update to the log. Multiple streaming jobs detected for $batchId" - ) + throw QueryExecutionErrors.concurrentStreamLogUpdate(batchId) } }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala index 4dd49951436a..240a64ec7b0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.util.{Clock, SystemClock} /** @@ -90,9 +91,7 @@ class AsyncOffsetSeqLog( if (ret) { batchId } else { - throw new IllegalStateException( - s"Concurrent update to the log. Multiple streaming jobs detected for $batchId" - ) + throw QueryExecutionErrors.concurrentStreamLogUpdate(batchId) } }) pendingOffsetWrites.put(batchId, future) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala index f7c7aab65e20..56cdba881753 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.streaming.Trigger import org.apache.spark.util.{Clock, ThreadUtils} @@ -194,9 +195,7 @@ class AsyncProgressTrackingMicroBatchExecution( } else { if (!commitLog.addInMemory( currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) { - throw new IllegalStateException( - s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId" - ) + throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId) } } offsetLog.removeAsyncOffsetWrite(currentBatchId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 65a703281489..691cea9edde0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -760,9 +760,11 @@ class MicroBatchExecution( * checkpointing to offset log and any microbatch startup tasks. */ protected def markMicroBatchStart(): Unit = { - assert(offsetLog.add(currentBatchId, - availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)), - s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") + if (!offsetLog.add(currentBatchId, + availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))) { + throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId) + } + logInfo(s"Committed offsets for batch $currentBatchId. " + s"Metadata ${offsetSeqMetadata.toString}") } @@ -780,9 +782,9 @@ class MicroBatchExecution( protected def markMicroBatchEnd(): Unit = { watermarkTracker.updateWatermark(lastExecution.executedPlan) reportTimeTaken("commitOffsets") { - assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)), - "Concurrent update to the commit log. Multiple streaming jobs detected for " + - s"$currentBatchId") + if (!commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) { + throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId) + } } committedOffsets ++= availableOffsets }