-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28901][SQL] SparkThriftServer's Cancel SQL Operation show it in JDBC Tab UI #25611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
b84d385
07d5679
187a4a6
44ba723
e8f902e
d4d1943
7d77b0c
72b885d
d2d6cc5
4c9d5f1
7b43b59
3744fc9
5070161
41ab7d7
87fa08f
63e8b59
bea260a
7e56c14
8b25006
ccd7de9
c6651f1
00f3553
8b84e04
ff5ac96
28174bd
1cbf7cc
61c9c73
f720963
c8d2ffc
536756b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,9 +72,8 @@ private[hive] class SparkExecuteStatementOperation( | |
|
|
||
| override def close(): Unit = { | ||
| // RDDs will be cleaned automatically upon garbage collection. | ||
| logDebug(s"CLOSING $statementId") | ||
| logInfo(s"Closed statement with $statementId") | ||
| cleanup(OperationState.CLOSED) | ||
| sqlContext.sparkContext.clearJobGroup() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: make that logDebug
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
maybe
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine with both Close and Closed. I see that you already committed Closed. |
||
| HiveThriftServer2.listener.onOperationClosed(statementId) | ||
| } | ||
|
|
||
|
|
@@ -159,6 +158,14 @@ private[hive] class SparkExecuteStatementOperation( | |
|
|
||
| override def runInternal(): Unit = { | ||
| setState(OperationState.PENDING) | ||
| statementId = UUID.randomUUID().toString | ||
| logInfo(s"Submitting query '$statement' with $statementId") | ||
| HiveThriftServer2.listener.onStatementStart( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ocd formatting nit: move |
||
| statementId, | ||
| parentSession.getSessionHandle.getSessionId.toString, | ||
| statement, | ||
| statementId, | ||
| parentSession.getUsername) | ||
| setHasResultSet(true) // avoid no resultset for async run | ||
|
|
||
| if (!runInBackground) { | ||
|
|
@@ -201,33 +208,38 @@ private[hive] class SparkExecuteStatementOperation( | |
| setBackgroundHandle(backgroundHandle) | ||
| } catch { | ||
| case rejected: RejectedExecutionException => | ||
| logError("Error submitting query in background, query rejected", rejected) | ||
| setState(OperationState.ERROR) | ||
| HiveThriftServer2.listener.onStatementError( | ||
| statementId, rejected.getMessage, SparkUtils.exceptionString(rejected)) | ||
| throw new HiveSQLException("The background threadpool cannot accept" + | ||
| " new task for execution, please retry the operation", rejected) | ||
| case NonFatal(e) => | ||
| logError(s"Error executing query in background", e) | ||
| setState(OperationState.ERROR) | ||
| HiveThriftServer2.listener.onStatementError( | ||
| statementId, e.getMessage, SparkUtils.exceptionString(e)) | ||
| throw new HiveSQLException(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def execute(): Unit = withSchedulerPool { | ||
| statementId = UUID.randomUUID().toString | ||
| logInfo(s"Running query '$statement' with $statementId") | ||
| setState(OperationState.RUNNING) | ||
| // Always use the latest class loader provided by executionHive's state. | ||
| val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader | ||
| Thread.currentThread().setContextClassLoader(executionHiveClassLoader) | ||
|
|
||
| HiveThriftServer2.listener.onStatementStart( | ||
| statementId, | ||
| parentSession.getSessionHandle.getSessionId.toString, | ||
| statement, | ||
| statementId, | ||
| parentSession.getUsername) | ||
| sqlContext.sparkContext.setJobGroup(statementId, statement) | ||
| try { | ||
| synchronized { | ||
| if (getStatus.getState.isTerminal) { | ||
| logInfo(s"Query with $statementId in terminal state before it started running") | ||
| return | ||
| } else { | ||
| logInfo(s"Running query with $statementId") | ||
| setState(OperationState.RUNNING) | ||
| } | ||
| } | ||
| // Always use the latest class loader provided by executionHive's state. | ||
| val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader | ||
| Thread.currentThread().setContextClassLoader(executionHiveClassLoader) | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @juliuszsompolski call cancel here for run statement in sync mode. seems can't stop task running . |
||
| sqlContext.sparkContext.setJobGroup(statementId, statement) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about move judgement to this place . I think
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @AngersZhuuuu
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But when
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's have
I think that the only way to prevent this, is to call another cancelJobGroup from the catch block when an exception comes.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Get your point. |
||
| result = sqlContext.sql(statement) | ||
| logDebug(result.queryExecution.toString()) | ||
| result.queryExecution.logical match { | ||
|
|
@@ -249,32 +261,43 @@ private[hive] class SparkExecuteStatementOperation( | |
| } | ||
| dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray | ||
| } catch { | ||
| case e: HiveSQLException => | ||
| if (getStatus().getState() == OperationState.CANCELED) { | ||
| return | ||
| // Actually do need to catch Throwable as some failures don't inherit from Exception and | ||
| // HiveServer will silently swallow them. | ||
| case e: Throwable => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @AngersZhuuuu I think it can be fixed by:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got you point. But cancelJobGroup here seem can't stop |
||
| val currentState = getStatus().getState() | ||
| if (currentState.isTerminal) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ocd nit:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| // This may happen if the execution was cancelled, and then closed from another thread. | ||
| logWarning(s"Ignore exception in terminal state with $statementId: $e") | ||
| } else { | ||
| logError(s"Error executing query with $statementId, currentState $currentState, ", e) | ||
| setState(OperationState.ERROR) | ||
| HiveThriftServer2.listener.onStatementError( | ||
| statementId, e.getMessage, SparkUtils.exceptionString(e)) | ||
| throw e | ||
| if (e.isInstanceOf[HiveSQLException]) { | ||
| throw e.asInstanceOf[HiveSQLException] | ||
| } else { | ||
| throw new HiveSQLException("Error running query: " + e.toString, e) | ||
| } | ||
| } | ||
| // Actually do need to catch Throwable as some failures don't inherit from Exception and | ||
| // HiveServer will silently swallow them. | ||
| case e: Throwable => | ||
| val currentState = getStatus().getState() | ||
| logError(s"Error executing query, currentState $currentState, ", e) | ||
| setState(OperationState.ERROR) | ||
| HiveThriftServer2.listener.onStatementError( | ||
| statementId, e.getMessage, SparkUtils.exceptionString(e)) | ||
| throw new HiveSQLException(e.toString) | ||
| } finally { | ||
| synchronized { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. collateral fix: please move
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Miss this point , fix it. |
||
| if (!getStatus.getState.isTerminal) { | ||
| setState(OperationState.FINISHED) | ||
| HiveThriftServer2.listener.onStatementFinish(statementId) | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I think this could become a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Reasonable, I add too much control. |
||
| sqlContext.sparkContext.clearJobGroup() | ||
| } | ||
| setState(OperationState.FINISHED) | ||
| HiveThriftServer2.listener.onStatementFinish(statementId) | ||
| } | ||
|
|
||
| override def cancel(): Unit = { | ||
| logInfo(s"Cancel '$statement' with $statementId") | ||
| cleanup(OperationState.CANCELED) | ||
| synchronized { | ||
| if (!getStatus.getState.isTerminal) { | ||
| logInfo(s"Cancel query with $statementId") | ||
| cleanup(OperationState.CANCELED) | ||
| HiveThriftServer2.listener.onStatementCanceled(statementId) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def cleanup(state: OperationState) { | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
promise final nit: remove double empty line