Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Replace logging an error with throwing an exception
  • Loading branch information
Efim Poberezkin committed May 22, 2018
commit 0919b3f7542aa0a807b0ac56e0da1366f347bb54
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.sql.execution.streaming.continuous

import java.lang.Thread.UncaughtExceptionHandler
import java.util.UUID
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -233,9 +235,15 @@ class ContinuousExecution(
}
false
} else if (isActive) {
currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
logInfo(s"New epoch $currentBatchId is starting.")
true
val maxBacklogExceeded = epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
if (maxBacklogExceeded) {
throw new IllegalStateException(
"Size of the epochs queue has exceeded maximum allowed epoch backlog.")
Copy link

@yanlin-Lynn yanlin-Lynn May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw exception will make epochUpdateThread stop working, but the application will keep working?
I think it's better to block and wait old epoch to be committed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the code as written won't shut down the stream. But I think it does make sense to kill the stream rather than waiting for old epochs. If we end up with a large backlog it's almost surely because some partition isn't making any progress, so I wouldn't expect Spark to ever be able to catch up.

} else {
currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
logInfo(s"New epoch $currentBatchId is starting.")
true
}
} else {
false
}
Expand All @@ -248,7 +256,12 @@ class ContinuousExecution(
}
}, s"epoch update thread for $prettyIdString")

val throwableReference: AtomicReference[Throwable] = new AtomicReference[Throwable]()
try {
epochUpdateThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, throwable: Throwable): Unit =
throwableReference.set(throwable)
})
epochUpdateThread.setDaemon(true)
epochUpdateThread.start()

Expand All @@ -268,6 +281,11 @@ class ContinuousExecution(
epochUpdateThread.interrupt()
epochUpdateThread.join()

val throwable: Throwable = throwableReference.get()
if (throwable != null && throwable.isInstanceOf[IllegalStateException]) {
throw throwable.asInstanceOf[IllegalStateException]
}

stopSources()
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
*/
private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage

/**
* Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog.
*/
private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need to make a side-channel in the RPC handler for this. I'd try to just make the query fail when the condition is reached in the first place.

Copy link
Author

@spaced4ndy spaced4ndy May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean make the query fail right from EpochCoordinator? If yes, I wanted to do so, but didn't figure out how to terminate query with exception.
EpochCoordinator has query: ContinuousExecution as a parameter, but then I don't see a suitable method for query.. Closest I found is stop() I guess.
Or am I looking in a completely wrong direction? Please give a hint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd probably want to add some method like private[streaming] stopWithException(e) to ContinuousExecution.

Copy link
Author

@spaced4ndy spaced4ndy May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thought about something like this but wasn't sure if it's fine to do so in scope of this change. Thanks


// Init messages
/**
* Set the reader and writer partition counts. Tasks may not be started until the coordinator
Expand Down Expand Up @@ -125,6 +130,7 @@ private[continuous] class EpochCoordinator(

private val maxEpochBacklog = session.sqlContext.conf.maxEpochBacklog

private var maxEpochBacklogExceeded: Boolean = false
private var queryWritesStopped: Boolean = false

private var numReaderPartitions: Int = _
Expand Down Expand Up @@ -156,8 +162,7 @@ private[continuous] class EpochCoordinator(
// otherwise commit it.
if (lastCommittedEpoch != epoch - 1) {
if (epochsWaitingToBeCommitted.size == maxEpochBacklog) {
logError("Epochs queue has reached maximum epoch backlog. " +
"Not remembering this and further epochs until size of the queue decreases.")
maxEpochBacklogExceeded = true
} else {
logDebug(s"Epoch $epoch has received commits from all partitions " +
s"and is waiting for epoch ${epoch - 1} to be committed first.")
Expand Down Expand Up @@ -253,5 +258,8 @@ private[continuous] class EpochCoordinator(
case StopContinuousExecutionWrites =>
queryWritesStopped = true
context.reply(())

case CheckIfMaxBacklogIsExceeded =>
context.reply(maxEpochBacklogExceeded)
}
}