Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b84d385
[SPARK-28901] Support cancel SQL operation.
AngersZhuuuu Aug 28, 2019
07d5679
commit code
AngersZhuuuu Aug 28, 2019
187a4a6
the same as other ethod
AngersZhuuuu Aug 29, 2019
44ba723
fix conflict between cancel and close status
AngersZhuuuu Aug 29, 2019
e8f902e
use old cancel method, a little mistake of code
AngersZhuuuu Aug 29, 2019
d4d1943
fix call cancel before run execut()
AngersZhuuuu Aug 30, 2019
7d77b0c
remove dunplicated code
AngersZhuuuu Aug 30, 2019
72b885d
fix throw exception
AngersZhuuuu Aug 30, 2019
d2d6cc5
close before cancel or finish, also reduce totalRunning
AngersZhuuuu Aug 30, 2019
4c9d5f1
fix other operation
AngersZhuuuu Aug 30, 2019
7b43b59
fix conflicts between cancel and finish
AngersZhuuuu Aug 30, 2019
3744fc9
move try block to satrt of execute(0
AngersZhuuuu Aug 30, 2019
5070161
fix scala style
AngersZhuuuu Aug 30, 2019
41ab7d7
fix code style
AngersZhuuuu Aug 30, 2019
87fa08f
fix error
AngersZhuuuu Aug 30, 2019
63e8b59
remove PREPARED
AngersZhuuuu Aug 30, 2019
bea260a
remove empty line
AngersZhuuuu Aug 30, 2019
7e56c14
fix scala style
AngersZhuuuu Sep 2, 2019
8b25006
remove sync operation judge terminal
AngersZhuuuu Sep 2, 2019
ccd7de9
add empty line
AngersZhuuuu Sep 2, 2019
c6651f1
revert
AngersZhuuuu Sep 2, 2019
00f3553
save code
AngersZhuuuu Sep 3, 2019
8b84e04
fix scala style and concurence problem
AngersZhuuuu Sep 3, 2019
ff5ac96
clear job group in same thread
AngersZhuuuu Sep 3, 2019
28174bd
remove all the totalRunning and onlineSessionNum vars
AngersZhuuuu Sep 3, 2019
1cbf7cc
add onStatementError for background case
AngersZhuuuu Sep 3, 2019
61c9c73
fix code style
AngersZhuuuu Sep 3, 2019
f720963
to do all the initialization bookkeeping first.
AngersZhuuuu Sep 3, 2019
c8d2ffc
fixlog
AngersZhuuuu Sep 3, 2019
536756b
code style fix
AngersZhuuuu Sep 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
commit code
  • Loading branch information
AngersZhuuuu committed Aug 28, 2019
commit 07d56798f33f2e2ddd80da4f9114e139669a4224
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object HiveThriftServer2 extends Logging {
}

private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value
val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}

Expand Down Expand Up @@ -239,6 +239,14 @@ object HiveThriftServer2 extends Logging {
executionList(id).state = ExecutionState.COMPILED
}

def onStatementCanceled(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CANCELED
totalRunning -= 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit leftover: you can remove all the totalRunning and onlineSessionNum vars.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit leftover: you can remove all the totalRunning and onlineSessionNum vars.

Good ideal, we have more save method to get this statistics.

// for progress bar
trimExecutionIfNecessary()
}

def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ private[hive] class SparkExecuteStatementOperation(
}
}

def cancelStatement(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug(s"CANCELING $statementId")
cleanup(OperationState.CANCELED)
sqlContext.sparkContext.clearJobGroup()
HiveThriftServer2.listener.onStatementCanceled(statementId)
}

override def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug(s"CLOSING $statementId")
Expand Down