Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Aug 28, 2019

What changes were proposed in this pull request?

Current Spark Thirft Server can't support cancel SQL job, when we use Hue to query throgh Spark Thrift Server, when we run a sql and then click cancel button to cancel this sql, we will it won't work in backend and in the spark JDBC UI tab, we can see the SQL's status is always COMPILED, then the duration of SQL is always increasing, this may make people confused.

image

Why are the changes needed?

If sql status can't reflect sql's true status, it will make user confused.

Does this PR introduce any user-facing change?

SparkthriftServer's UI tab will show SQL's status in CANCELED when we cancel a SQL .

How was this patch tested?

Manuel tested

UI TAB Status
image

image

backend log
image

@AngersZhuuuu
Copy link
Contributor Author

gentl ping @juliuszsompolski @wangyum @srowen

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-28901][SQL] Support cancel SparkThriftServer's SQL Operation [SPARK-28901][SQL] SparkThriftServer's Cancel SQL Operation show it in JDBC Tab UI Aug 29, 2019
Copy link
Contributor

@juliuszsompolski juliuszsompolski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking into it at some time in the past but never finished. Some comments about the various race conditions between the cancellation and the query thread that I considered.

synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CANCELED
totalRunning -= 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can happen that cancel() will race with query finishing, and then both onStatementCanceled() and onStatementFinish() will do totalRunning -= 1, resulting in negative count.

Instead of keeping a counter, for sessions and queries, it is better to do something like:

    def getOnlineSessionNum: Int = synchronized {
      sessionList.count(_._2.finishTimestamp == 0)
    }

    def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
      !(execInfo.state == ExecutionState.FAILED ||
        execInfo.state == ExecutionState.CANCELED ||
        execInfo.closeTimestamp != 0)
    }

    /**
     * When an error or a cancellation occurs, we set the finishTimestamp of the statement.
     * Therefore, when we count the number of running statements, we need to exclude errors and
     * cancellations and count all statements that have not been closed so far.
     */
    def getTotalRunning: Int = synchronized {
      executionList.count {
        case (_, v) => isExecutionActive(v)
      }
    }

(Note that we should also consider statements that are CLOSED instead of FINISHED as not running anymore, because a FINISHED statement is still fetching results.)
(Same for sessions counters - though quite an obscure, there can be a race between a session timeout and a client closing session)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski if we call close() before FINISH, onOperationClose() won't do
totalRunning -= 1, also a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski Can help to look at logic again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu I think it's easy to get something wrong with the totalRunning counter. Even if we covered all the cases now, it's hard to reason about all the possible paths, and easy to break. I would suggest to count it on the fly based on status, like I proposed before.

    def getOnlineSessionNum: Int = synchronized {
      sessionList.count(_._2.finishTimestamp == 0)
    }

    def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
      !(execInfo.state == ExecutionState.FAILED ||
        execInfo.state == ExecutionState.CANCELED ||
        execInfo.closeTimestamp != 0)
    }

    /**
     * When an error or a cancellation occurs, we set the finishTimestamp of the statement.
     * Therefore, when we count the number of running statements, we need to exclude errors and
     * cancellations and count all statements that have not been closed so far.
     */
    def getTotalRunning: Int = synchronized {
      executionList.count {
        case (_, v) => isExecutionActive(v)
      }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
!(execInfo.state == ExecutionState.FAILED ||
execInfo.state == ExecutionState.CANCELED ||
execInfo.state == ExecutionState.CLOSED)
}

make isExecutionActive() like this? to keep same code style

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can cover all situation, very good ideal

@AngersZhuuuu
Copy link
Contributor Author

@juliuszsompolski There are some conflicts in the process. Changed a lot to cover all problems you have mentioned, include call close()before setState FINISHED, Thanks for review again.

setState(OperationState.PENDING)
setHasResultSet(true) // avoid no resultset for async run
statementId = UUID.randomUUID().toString
HiveThriftServer2.listener.onStatementPrepared(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's enough to move onStatementStarted here, without adding Prepared as a separate state... I don't think there's much value added in distinguishing this from when the query starts running in the thread (it will go into a new state almost immediately after that)...

To not have to modify all these other operations, I would just stick with STARTED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it. A little forgot why I do this, move onStatementStarted here is enough

executionList(id).state = ExecutionState.CLOSED
if (lastState == ExecutionState.STARTED || lastState == ExecutionState.COMPILED) {
totalRunning -= 1
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does FINISHED need to be handled somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does FINISHED need to be handled somewhere?

this place is to handle situation like : we have call close before finish, then the totalRunning value should -1 too. If we call cancel() or it finished, then when we call closeOperation, we won't do totalRunning -= 1. Since for FINISHED, has do totalRunning -= 1 , so here don't need to contain FINISHED

@wangyum
Copy link
Member

wangyum commented Sep 2, 2019

ok to test

Copy link
Contributor

@juliuszsompolski juliuszsompolski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more things that are not around a diffed line to anchor a comment to:

  • close() does sqlContext.sparkContext.clearJobGroup(), but that's incorrect because this is called from a different thread than execute which sets the JobGroup. Could you move it to a finally in execute?
  • In runInternal in the catch block in https://github.com/apache/spark/pull/25611/files#diff-72dcd8f81a51c8a815159fdf0332acdcR212 when it fails to launch the background thread, you should call HiveThriftServer2.listener.onStatementError to notify of the error in that case as well.
  • nit: We log query submission etc. with logInfo. I think for consistency we could change the log in close to logInfo as well.

// HiveServer will silently swallow them.
case e: Throwable =>
val currentState = getStatus().getState()
if (currentState.isTerminal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd nit: if (getStatus.getState.isTerminal) would make it consistent with other places in the file, and adding the val currentState now is not needed, as it's accessed only once anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd nit: if (getStatus.getState.isTerminal) would make it consistent with other places in the file, and adding the val currentState now is not needed, as it's accessed only once anyway.

We should show currentState in
image

sqlContext.sparkContext.setJobGroup(statementId, statement)
try {
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd change that to

synchronized {
  if (getStatus.getState.isTerminal) {
    logInfo(s"Query with $statementId in terminal state before it started running")
    return
  } else {
    logInfo(s"Running query with $statementId")
    setState(OperationState.RUNNING)
  }
}

instead of the if in runInternal. Otherwise, it can still change the state to cancelled between there and here.
(note: removed $statement from the log message; let's log it during submission)

setState(OperationState.PENDING)
setHasResultSet(true) // avoid no resultset for async run
statementId = UUID.randomUUID().toString
HiveThriftServer2.listener.onStatementStart(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add logInfo(s"Submitting query '$statement' with $statementId") here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add logInfo(s"Submitting query '$statement' with $statementId") here?

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd formatting nit: move setHasResultSet(true) below statementid, logInfo and onStatementStart, to do all the initialization bookkeeping first.

cleanup(OperationState.CANCELED)
synchronized {
if (!getStatus.getState.isTerminal) {
logInfo(s"Cancel '$statement' with $statementId")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do logInfo(s"Cancel query with $statementId")? I think it's enough to log the full statement at submission time.

if (!getStatus.getState.isTerminal) {
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this could become a finally { synchronized { block; the if check will make sure that it doesn't go to finished after another state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this could become a finally { synchronized { block; the if check will make sure that it doesn't go to finished after another state.

Reasonable, I add too much control.

logWarning(s"Ignore exception in terminal state with $statementId: $e")
return
} else {
logError(s"Error executing query, currentState $currentState, ", e)
Copy link
Contributor

@juliuszsompolski juliuszsompolski Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Error executing query with $statementId

@AngersZhuuuu
Copy link
Contributor Author

image
onStatementError needs to be added to these catch blocks, otherwise statement will be left in STARTED state.

New case, should do this.

Copy link
Contributor

@juliuszsompolski juliuszsompolski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with ocd formatting nits

(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with Logging {
with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: revert. https://github.com/databricks/scala-style-guide#spacing-and-indentation with shouldn't get extra indentation compared to extends

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my local scala style may have some problem, not fit with that.

def onStatementError(
id: String,
errorMessage: String,
errorTrace: String): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd nit: changing errorMessage to errorMsg would make it fit on one line.

Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

sqlContext.sparkContext.setJobGroup(statementId, statement)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd nit: empty lines would be more readable if it was

...
      }

      // Always use the latest class loader provided by executionHive's state.
      val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
      Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

      sqlContext.sparkContext.setJobGroup(statementId, statement)
      result = sqlContext.sql(statement)
...
  • it then separates the initialization part, the hive class loader part and the Spark part with empty lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a lot , learn it.

setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
logError("Error submit query in background, query rejected", rejected)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Error submit" -> "Error submitting"

if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you either don't need the else or the return
I would keep the return and get rid of the else to save on indentation.

} finally {
synchronized {
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it: You still need to wrap it in if (!currentState.isTerminal) { otherwise it will get executed in finally after e.g. an ERROR, or you can have a race condition with cancel/close.
When I wrote to put it in the finally block, I meant that then it could be together with clearJobGroup, but the if is still needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add if here is the most direct way, otherwise add too much control

// RDDs will be cleaned automatically upon garbage collection.
logDebug(s"CLOSING $statementId")
cleanup(OperationState.CLOSED)
sqlContext.sparkContext.clearJobGroup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: make that logDebug logInfo(s"Closed statement with $statementId"), to make it consistent with other logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: make that logDebug logInfo(s"Closed statement with $statementId"), to make it consistent with other logs.

maybe logInfo(s"Close statement with $statementId") ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with both Close and Closed. I see that you already committed Closed.

Copy link
Contributor

@juliuszsompolski juliuszsompolski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you @AngersZhuuuu for bearing with all my comments! :-)

trimExecutionIfNecessary()
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

promise final nit: remove double empty line

// RDDs will be cleaned automatically upon garbage collection.
logDebug(s"CLOSING $statementId")
cleanup(OperationState.CLOSED)
sqlContext.sparkContext.clearJobGroup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with both Close and Closed. I see that you already committed Closed.

@AngersZhuuuu
Copy link
Contributor Author

@juliuszsompolski Thanks a lot for your review, this is the most deeply code review I have had, learn a lot from it. ,

@gatorsmile
Copy link
Member

Thanks! Merged to master.

return
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu
I think I found one more problem:
If cancel() and close() is called very quickly after the query is started, then they may both call cleanup() before Spark Jobs are started. Then sqlContext.sparkContext.cancelJobGroup(statementId) does nothing.
But then the execute thread can start the jobs, and only then get interrupted and exit through here. But then it will exit here, and no-one will cancel these jobs and they will keep running even though this execution has exited.

I think it can be fixed by:

      case e: Throwable =>
        // In any case, cancel any remaining running jobs.
        // E.g. a cancel() operation could have called cleanup() which canceled the Jobs before
        // they started.
        if (statementId != null) {
          sqlContext.sparkContext.cancelJobGroup(statementId)
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you point.
When cleanup().
SparkContext haven't setup jobGroup.
But execute thread start execute and setup jobGroup.
cleanup() can cancel background thread task but can't promise cancelJobGroup since it may be called before sparkContext setupJobGroup

But cancelJobGroup here seem can't stop execute() method run.

// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski call cancel here for run statement in sync mode. seems can't stop task running .

val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

sqlContext.sparkContext.setJobGroup(statementId, statement)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about move judgement to this place . I think setState(RUNNING) after setJobGroup is ok.
Since we can think setClassLoader and setJobGroup is prepare work for running

sqlContext.sparkContext.setJobGroup(statementId, statement)
  synchronized {
        if (getStatus.getState.isTerminal) {
          logInfo(s"Query with $statementId in terminal state before it started running")
          return
        } else {
          logInfo(s"Running query with $statementId")
          setState(OperationState.RUNNING)
        }
      }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu
I don't think it will help. It's not just about setJobGroup, but about actually starting the job. You can setJobGroup, then cancelJobGroup, and then start running Jobs in that Job Group.
What cancelJobGroup does is only cancel Jobs that are currently running in the Job group, it doesn't prevent further jobs being started. So I think it can still go past here, and then be cancelled before it actually starts the Jobs.
I think it would be safer to handle it from the catch block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But when
You can setJobGroup, then cancelJobGroup, and then start running Jobs in that Job Group.
The job is under no JobGroup since when call cancelJobGroup, jobGroup in localProperties has been cleared

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have

  1. Thread1 be in execute() after this synchronized block, but before the Jobs have started
  2. Thread2 be a user connection calling cancel(). It cancels all jobs in the job group, and notifies Thread1
  3. But before Thread1 gets the notification and throws InterruptedException, it starts some Jobs.
  4. Then Thread1 gets InterruptedException, and exits through the catch and finally block. It does a clearJobGroup in the finally, but that doesn't cancel the Jobs started in 3. These Jobs keep running after Thread1 exits, and nobody cancels them.

I think that the only way to prevent this, is to call another cancelJobGroup from the catch block when an exception comes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get your point.
Cancel before setJobGroup, but there will be an interval between cancel and execute thread been interrupted. Then job won't be canceled after execute thread get into catch block.

wangyum pushed a commit that referenced this pull request Sep 23, 2019
…d interrupted

### What changes were proposed in this pull request?
Discuss in #25611

If cancel() and close() is called very quickly after the query is started, then they may both call cleanup() before Spark Jobs are started. Then sqlContext.sparkContext.cancelJobGroup(statementId) does nothing.
But then the execute thread can start the jobs, and only then get interrupted and exit through here. But then it will exit here, and no-one will cancel these jobs and they will keep running even though this execution has exited.

So  when execute() was interrupted by `cancel()`, when get into catch block, we should call canJobGroup again to make sure the job was canceled.

### Why are the changes needed?

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?
MT

Closes #25743 from AngersZhuuuu/SPARK-29036.

Authored-by: angerszhu <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants