Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b84d385
[SPARK-28901] Support cancel SQL operation.
AngersZhuuuu Aug 28, 2019
07d5679
commit code
AngersZhuuuu Aug 28, 2019
187a4a6
the same as other ethod
AngersZhuuuu Aug 29, 2019
44ba723
fix conflict between cancel and close status
AngersZhuuuu Aug 29, 2019
e8f902e
use old cancel method, a little mistake of code
AngersZhuuuu Aug 29, 2019
d4d1943
fix call cancel before run execut()
AngersZhuuuu Aug 30, 2019
7d77b0c
remove dunplicated code
AngersZhuuuu Aug 30, 2019
72b885d
fix throw exception
AngersZhuuuu Aug 30, 2019
d2d6cc5
close before cancel or finish, also reduce totalRunning
AngersZhuuuu Aug 30, 2019
4c9d5f1
fix other operation
AngersZhuuuu Aug 30, 2019
7b43b59
fix conflicts between cancel and finish
AngersZhuuuu Aug 30, 2019
3744fc9
move try block to satrt of execute(0
AngersZhuuuu Aug 30, 2019
5070161
fix scala style
AngersZhuuuu Aug 30, 2019
41ab7d7
fix code style
AngersZhuuuu Aug 30, 2019
87fa08f
fix error
AngersZhuuuu Aug 30, 2019
63e8b59
remove PREPARED
AngersZhuuuu Aug 30, 2019
bea260a
remove empty line
AngersZhuuuu Aug 30, 2019
7e56c14
fix scala style
AngersZhuuuu Sep 2, 2019
8b25006
remove sync operation judge terminal
AngersZhuuuu Sep 2, 2019
ccd7de9
add empty line
AngersZhuuuu Sep 2, 2019
c6651f1
revert
AngersZhuuuu Sep 2, 2019
00f3553
save code
AngersZhuuuu Sep 3, 2019
8b84e04
fix scala style and concurence problem
AngersZhuuuu Sep 3, 2019
ff5ac96
clear job group in same thread
AngersZhuuuu Sep 3, 2019
28174bd
remove all the totalRunning and onlineSessionNum vars
AngersZhuuuu Sep 3, 2019
1cbf7cc
add onStatementError for background case
AngersZhuuuu Sep 3, 2019
61c9c73
fix code style
AngersZhuuuu Sep 3, 2019
f720963
to do all the initialization bookkeeping first.
AngersZhuuuu Sep 3, 2019
c8d2ffc
fixlog
AngersZhuuuu Sep 3, 2019
536756b
code style fix
AngersZhuuuu Sep 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix error
  • Loading branch information
AngersZhuuuu committed Aug 30, 2019
commit 87fa08f21e824a93c775c4bc9beb4d1a12efc436
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ object HiveThriftServer2 extends Logging {
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

promise final nit: remove double empty line

def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = synchronized {
def onStatementError(id: String,
errorMessage: String,
errorTrace: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMessage
executionList(id).state = ExecutionState.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ private[hive] class SparkExecuteStatementOperation(
parentSession.getUsername)

if (!runInBackground) {
executeWhenNotTerminalStatus()
if (getStatus.getState.isTerminal) {
return
}
execute()
} else {
val sparkServiceUGI = Utils.getUGI()

Expand All @@ -182,7 +185,10 @@ private[hive] class SparkExecuteStatementOperation(
override def run(): Unit = {
registerCurrentOperationLog()
try {
executeWhenNotTerminalStatus()
if (getStatus.getState.isTerminal) {
return
}
execute()
} catch {
case e: HiveSQLException =>
setOperationException(e)
Expand Down Expand Up @@ -219,12 +225,6 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private def executeWhenNotTerminalStatus(): Unit = {
if(!getStatus.getState.isTerminal) {
execute()
}
}

private def execute(): Unit = withSchedulerPool {
try {
logInfo(s"Running query '$statement' with $statementId")
Expand Down Expand Up @@ -261,9 +261,7 @@ private[hive] class SparkExecuteStatementOperation(
// HiveServer will silently swallow them.
case e: Throwable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu
I think I found one more problem:
If cancel() and close() is called very quickly after the query is started, then they may both call cleanup() before Spark Jobs are started. Then sqlContext.sparkContext.cancelJobGroup(statementId) does nothing.
But then the execute thread can start the jobs, and only then get interrupted and exit through here. But then it will exit here, and no-one will cancel these jobs and they will keep running even though this execution has exited.

I think it can be fixed by:

      case e: Throwable =>
        // In any case, cancel any remaining running jobs.
        // E.g. a cancel() operation could have called cleanup() which canceled the Jobs before
        // they started.
        if (statementId != null) {
          sqlContext.sparkContext.cancelJobGroup(statementId)
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you point.
When cleanup().
SparkContext haven't setup jobGroup.
But execute thread start execute and setup jobGroup.
cleanup() can cancel background thread task but can't promise cancelJobGroup since it may be called before sparkContext setupJobGroup

But cancelJobGroup here seem can't stop execute() method run.

val currentState = getStatus().getState()
if (currentState == OperationState.CANCELED ||
currentState == OperationState.CLOSED ||
currentState == OperationState.FINISHED) {
if (currentState.isTerminal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd nit: if (getStatus.getState.isTerminal) would make it consistent with other places in the file, and adding the val currentState now is not needed, as it's accessed only once anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ocd nit: if (getStatus.getState.isTerminal) would make it consistent with other places in the file, and adding the val currentState now is not needed, as it's accessed only once anyway.

We should show currentState in
image

// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you either don't need the else or the return
I would keep the return and get rid of the else to save on indentation.

Expand All @@ -290,8 +288,9 @@ private[hive] class SparkExecuteStatementOperation(
override def cancel(): Unit = {
synchronized {
if (!getStatus.getState.isTerminal) {
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
logInfo(s"Cancel '$statement' with $statementId")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do logInfo(s"Cancel query with $statementId")? I think it's enough to log the full statement at submission time.

cleanup(OperationState.CANCELED)
HiveThriftServer2.listener.onStatementCanceled(statementId)
}
}
}
Expand Down