Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b84d385
[SPARK-28901] Support cancel SQL operation.
AngersZhuuuu Aug 28, 2019
07d5679
commit code
AngersZhuuuu Aug 28, 2019
187a4a6
the same as other ethod
AngersZhuuuu Aug 29, 2019
44ba723
fix conflict between cancel and close status
AngersZhuuuu Aug 29, 2019
e8f902e
use old cancel method, a little mistake of code
AngersZhuuuu Aug 29, 2019
d4d1943
fix call cancel before run execut()
AngersZhuuuu Aug 30, 2019
7d77b0c
remove dunplicated code
AngersZhuuuu Aug 30, 2019
72b885d
fix throw exception
AngersZhuuuu Aug 30, 2019
d2d6cc5
close before cancel or finish, also reduce totalRunning
AngersZhuuuu Aug 30, 2019
4c9d5f1
fix other operation
AngersZhuuuu Aug 30, 2019
7b43b59
fix conflicts between cancel and finish
AngersZhuuuu Aug 30, 2019
3744fc9
move try block to satrt of execute(0
AngersZhuuuu Aug 30, 2019
5070161
fix scala style
AngersZhuuuu Aug 30, 2019
41ab7d7
fix code style
AngersZhuuuu Aug 30, 2019
87fa08f
fix error
AngersZhuuuu Aug 30, 2019
63e8b59
remove PREPARED
AngersZhuuuu Aug 30, 2019
bea260a
remove empty line
AngersZhuuuu Aug 30, 2019
7e56c14
fix scala style
AngersZhuuuu Sep 2, 2019
8b25006
remove sync operation judge terminal
AngersZhuuuu Sep 2, 2019
ccd7de9
add empty line
AngersZhuuuu Sep 2, 2019
c6651f1
revert
AngersZhuuuu Sep 2, 2019
00f3553
save code
AngersZhuuuu Sep 3, 2019
8b84e04
fix scala style and concurence problem
AngersZhuuuu Sep 3, 2019
ff5ac96
clear job group in same thread
AngersZhuuuu Sep 3, 2019
28174bd
remove all the totalRunning and onlineSessionNum vars
AngersZhuuuu Sep 3, 2019
1cbf7cc
add onStatementError for background case
AngersZhuuuu Sep 3, 2019
61c9c73
fix code style
AngersZhuuuu Sep 3, 2019
f720963
to do all the initialization bookkeeping first.
AngersZhuuuu Sep 3, 2019
c8d2ffc
fixlog
AngersZhuuuu Sep 3, 2019
536756b
code style fix
AngersZhuuuu Sep 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object HiveThriftServer2 extends Logging {
}

private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value
val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}

Expand Down Expand Up @@ -174,16 +174,31 @@ object HiveThriftServer2 extends Logging {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
server.stop()
}
private var onlineSessionNum: Int = 0
private val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
private var totalRunning = 0

def getOnlineSessionNum: Int = synchronized { onlineSessionNum }
def getOnlineSessionNum: Int = synchronized {
sessionList.count(_._2.finishTimestamp == 0)
}

def getTotalRunning: Int = synchronized { totalRunning }
def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
!(execInfo.state == ExecutionState.FAILED ||
execInfo.state == ExecutionState.CANCELED ||
execInfo.state == ExecutionState.CLOSED)
}

/**
* When an error or a cancellation occurs, we set the finishTimestamp of the statement.
* Therefore, when we count the number of running statements, we need to exclude errors and
* cancellations and count all statements that have not been closed so far.
*/
def getTotalRunning: Int = synchronized {
executionList.count {
case (_, v) => isExecutionActive(v)
}
}

def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq }

Expand All @@ -208,14 +223,12 @@ object HiveThriftServer2 extends Logging {
synchronized {
val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
sessionList.put(sessionId, info)
onlineSessionNum += 1
trimSessionIfNecessary()
}
}

def onSessionClosed(sessionId: String): Unit = synchronized {
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
onlineSessionNum -= 1
trimSessionIfNecessary()
}

Expand All @@ -231,28 +244,29 @@ object HiveThriftServer2 extends Logging {
trimExecutionIfNecessary()
sessionList(sessionId).totalExecution += 1
executionList(id).groupId = groupId
totalRunning += 1
}

def onStatementParsed(id: String, executionPlan: String): Unit = synchronized {
executionList(id).executePlan = executionPlan
executionList(id).state = ExecutionState.COMPILED
}

def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMessage
executionList(id).state = ExecutionState.FAILED
totalRunning -= 1
trimExecutionIfNecessary()
}
def onStatementCanceled(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CANCELED
trimExecutionIfNecessary()
}

def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMsg
executionList(id).state = ExecutionState.FAILED
trimExecutionIfNecessary()
}

def onStatementFinish(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
totalRunning -= 1
trimExecutionIfNecessary()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ private[hive] class SparkExecuteStatementOperation(

override def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug(s"CLOSING $statementId")
logInfo(s"Close statement with $statementId")
cleanup(OperationState.CLOSED)
sqlContext.sparkContext.clearJobGroup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: make that logDebug logInfo(s"Closed statement with $statementId"), to make it consistent with other logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: make that logDebug logInfo(s"Closed statement with $statementId"), to make it consistent with other logs.

maybe logInfo(s"Close statement with $statementId") ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with both Close and Closed. I see that you already committed Closed.

HiveThriftServer2.listener.onOperationClosed(statementId)
}

Expand Down Expand Up @@ -159,6 +158,14 @@ private[hive] class SparkExecuteStatementOperation(

override def runInternal(): Unit = {
setState(OperationState.PENDING)
statementId = UUID.randomUUID().toString
logInfo(s"Submitting query '$statement' with $statementId")
HiveThriftServer2.listener.onStatementStart(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add logInfo(s"Submitting query '$statement' with $statementId") here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add logInfo(s"Submitting query '$statement' with $statementId") here?

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd formatting nit: move setHasResultSet(true) below statementid, logInfo and onStatementStart, to do all the initialization bookkeeping first.

statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
statementId,
parentSession.getUsername)
setHasResultSet(true) // avoid no resultset for async run

if (!runInBackground) {
Expand Down Expand Up @@ -201,33 +208,38 @@ private[hive] class SparkExecuteStatementOperation(
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
}
}

private def execute(): Unit = withSchedulerPool {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
statementId,
parentSession.getUsername)
sqlContext.sparkContext.setJobGroup(statementId, statement)
try {
synchronized {
if (getStatus.getState.isTerminal) {
logInfo(s"Query with $statementId in terminal state before it started running")
return
} else {
logInfo(s"Running query with $statementId")
setState(OperationState.RUNNING)
}
}
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski call cancel here for run statement in sync mode. seems can't stop task running .

sqlContext.sparkContext.setJobGroup(statementId, statement)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about move judgement to this place . I think setState(RUNNING) after setJobGroup is ok.
Since we can think setClassLoader and setJobGroup is prepare work for running

sqlContext.sparkContext.setJobGroup(statementId, statement)
  synchronized {
        if (getStatus.getState.isTerminal) {
          logInfo(s"Query with $statementId in terminal state before it started running")
          return
        } else {
          logInfo(s"Running query with $statementId")
          setState(OperationState.RUNNING)
        }
      }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu
I don't think it will help. It's not just about setJobGroup, but about actually starting the job. You can setJobGroup, then cancelJobGroup, and then start running Jobs in that Job Group.
What cancelJobGroup does is only cancel Jobs that are currently running in the Job group, it doesn't prevent further jobs being started. So I think it can still go past here, and then be cancelled before it actually starts the Jobs.
I think it would be safer to handle it from the catch block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But when
You can setJobGroup, then cancelJobGroup, and then start running Jobs in that Job Group.
The job is under no JobGroup since when call cancelJobGroup, jobGroup in localProperties has been cleared

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have

  1. Thread1 be in execute() after this synchronized block, but before the Jobs have started
  2. Thread2 be a user connection calling cancel(). It cancels all jobs in the job group, and notifies Thread1
  3. But before Thread1 gets the notification and throws InterruptedException, it starts some Jobs.
  4. Then Thread1 gets InterruptedException, and exits through the catch and finally block. It does a clearJobGroup in the finally, but that doesn't cancel the Jobs started in 3. These Jobs keep running after Thread1 exits, and nobody cancels them.

I think that the only way to prevent this, is to call another cancelJobGroup from the catch block when an exception comes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get your point.
Cancel before setJobGroup, but there will be an interval between cancel and execute thread been interrupted. Then job won't be canceled after execute thread get into catch block.

result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
Expand All @@ -249,32 +261,43 @@ private[hive] class SparkExecuteStatementOperation(
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
case e: HiveSQLException =>
if (getStatus().getState() == OperationState.CANCELED) {
return
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu
I think I found one more problem:
If cancel() and close() is called very quickly after the query is started, then they may both call cleanup() before Spark Jobs are started. Then sqlContext.sparkContext.cancelJobGroup(statementId) does nothing.
But then the execute thread can start the jobs, and only then get interrupted and exit through here. But then it will exit here, and no-one will cancel these jobs and they will keep running even though this execution has exited.

I think it can be fixed by:

      case e: Throwable =>
        // In any case, cancel any remaining running jobs.
        // E.g. a cancel() operation could have called cleanup() which canceled the Jobs before
        // they started.
        if (statementId != null) {
          sqlContext.sparkContext.cancelJobGroup(statementId)
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you point.
When cleanup().
SparkContext haven't setup jobGroup.
But execute thread start execute and setup jobGroup.
cleanup() can cancel background thread task but can't promise cancelJobGroup since it may be called before sparkContext setupJobGroup

But cancelJobGroup here seem can't stop execute() method run.

val currentState = getStatus().getState()
if (currentState.isTerminal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd nit: if (getStatus.getState.isTerminal) would make it consistent with other places in the file, and adding the val currentState now is not needed, as it's accessed only once anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd nit: if (getStatus.getState.isTerminal) would make it consistent with other places in the file, and adding the val currentState now is not needed, as it's accessed only once anyway.

We should show currentState in
image

// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
if (e.isInstanceOf[HiveSQLException]) {
throw e.asInstanceOf[HiveSQLException]
} else {
throw new HiveSQLException("Error running query: " + e.toString, e)
}
}
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
val currentState = getStatus().getState()
logError(s"Error executing query, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e.toString)
} finally {
synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collateral fix: please move sqlContext.sparkContext.clearJobGroup() from close() to this finally block (outside synchronized) - setJobGroup is called from this thread, so clearJobGroup should also be called from here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collateral fix: please move sqlContext.sparkContext.clearJobGroup() from close() to this finally block (outside synchronized) - setJobGroup is called from this thread, so clearJobGroup should also be called from here.

Miss this point , fix it.

if (!getStatus.getState.isTerminal) {
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this could become a finally { synchronized { block; the if check will make sure that it doesn't go to finished after another state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this could become a finally { synchronized { block; the if check will make sure that it doesn't go to finished after another state.

Reasonable, I add too much control.

sqlContext.sparkContext.clearJobGroup()
}
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}

override def cancel(): Unit = {
logInfo(s"Cancel '$statement' with $statementId")
cleanup(OperationState.CANCELED)
synchronized {
if (!getStatus.getState.isTerminal) {
logInfo(s"Cancel query with $statementId")
cleanup(OperationState.CANCELED)
HiveThriftServer2.listener.onStatementCanceled(statementId)
}
}
}

private def cleanup(state: OperationState) {
Expand Down