Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b84d385
[SPARK-28901] Support cancel SQL operation.
AngersZhuuuu Aug 28, 2019
07d5679
commit code
AngersZhuuuu Aug 28, 2019
187a4a6
the same as other ethod
AngersZhuuuu Aug 29, 2019
44ba723
fix conflict between cancel and close status
AngersZhuuuu Aug 29, 2019
e8f902e
use old cancel method, a little mistake of code
AngersZhuuuu Aug 29, 2019
d4d1943
fix call cancel before run execut()
AngersZhuuuu Aug 30, 2019
7d77b0c
remove dunplicated code
AngersZhuuuu Aug 30, 2019
72b885d
fix throw exception
AngersZhuuuu Aug 30, 2019
d2d6cc5
close before cancel or finish, also reduce totalRunning
AngersZhuuuu Aug 30, 2019
4c9d5f1
fix other operation
AngersZhuuuu Aug 30, 2019
7b43b59
fix conflicts between cancel and finish
AngersZhuuuu Aug 30, 2019
3744fc9
move try block to satrt of execute(0
AngersZhuuuu Aug 30, 2019
5070161
fix scala style
AngersZhuuuu Aug 30, 2019
41ab7d7
fix code style
AngersZhuuuu Aug 30, 2019
87fa08f
fix error
AngersZhuuuu Aug 30, 2019
63e8b59
remove PREPARED
AngersZhuuuu Aug 30, 2019
bea260a
remove empty line
AngersZhuuuu Aug 30, 2019
7e56c14
fix scala style
AngersZhuuuu Sep 2, 2019
8b25006
remove sync operation judge terminal
AngersZhuuuu Sep 2, 2019
ccd7de9
add empty line
AngersZhuuuu Sep 2, 2019
c6651f1
revert
AngersZhuuuu Sep 2, 2019
00f3553
save code
AngersZhuuuu Sep 3, 2019
8b84e04
fix scala style and concurence problem
AngersZhuuuu Sep 3, 2019
ff5ac96
clear job group in same thread
AngersZhuuuu Sep 3, 2019
28174bd
remove all the totalRunning and onlineSessionNum vars
AngersZhuuuu Sep 3, 2019
1cbf7cc
add onStatementError for background case
AngersZhuuuu Sep 3, 2019
61c9c73
fix code style
AngersZhuuuu Sep 3, 2019
f720963
to do all the initialization bookkeeping first.
AngersZhuuuu Sep 3, 2019
c8d2ffc
fixlog
AngersZhuuuu Sep 3, 2019
536756b
code style fix
AngersZhuuuu Sep 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object HiveThriftServer2 extends Logging {
}

private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value
val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}

Expand Down Expand Up @@ -239,6 +239,15 @@ object HiveThriftServer2 extends Logging {
executionList(id).state = ExecutionState.COMPILED
}

def onStatementCanceled(id: String): Unit = {
synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CANCELED
totalRunning -= 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can happen that cancel() will race with query finishing, and then both onStatementCanceled() and onStatementFinish() will do totalRunning -= 1, resulting in negative count.

Instead of keeping a counter, for sessions and queries, it is better to do something like:

    def getOnlineSessionNum: Int = synchronized {
      sessionList.count(_._2.finishTimestamp == 0)
    }

    def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
      !(execInfo.state == ExecutionState.FAILED ||
        execInfo.state == ExecutionState.CANCELED ||
        execInfo.closeTimestamp != 0)
    }

    /**
     * When an error or a cancellation occurs, we set the finishTimestamp of the statement.
     * Therefore, when we count the number of running statements, we need to exclude errors and
     * cancellations and count all statements that have not been closed so far.
     */
    def getTotalRunning: Int = synchronized {
      executionList.count {
        case (_, v) => isExecutionActive(v)
      }
    }

(Note that we should also consider statements that are CLOSED instead of FINISHED as not running anymore, because a FINISHED statement is still fetching results.)
(Same for sessions counters - though quite an obscure, there can be a race between a session timeout and a client closing session)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski if we call close() before FINISH, onOperationClose() won't do
totalRunning -= 1, also a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski Can help to look at logic again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu I think it's easy to get something wrong with the totalRunning counter. Even if we covered all the cases now, it's hard to reason about all the possible paths, and easy to break. I would suggest to count it on the fly based on status, like I proposed before.

    def getOnlineSessionNum: Int = synchronized {
      sessionList.count(_._2.finishTimestamp == 0)
    }

    def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
      !(execInfo.state == ExecutionState.FAILED ||
        execInfo.state == ExecutionState.CANCELED ||
        execInfo.closeTimestamp != 0)
    }

    /**
     * When an error or a cancellation occurs, we set the finishTimestamp of the statement.
     * Therefore, when we count the number of running statements, we need to exclude errors and
     * cancellations and count all statements that have not been closed so far.
     */
    def getTotalRunning: Int = synchronized {
      executionList.count {
        case (_, v) => isExecutionActive(v)
      }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
!(execInfo.state == ExecutionState.FAILED ||
execInfo.state == ExecutionState.CANCELED ||
execInfo.state == ExecutionState.CLOSED)
}

make isExecutionActive() like this? to keep same code style

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can cover all situation, very good ideal

trimExecutionIfNecessary()
}
}

def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,15 @@ private[hive] class SparkExecuteStatementOperation(
// HiveServer will silently swallow them.
case e: Throwable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu
I think I found one more problem:
If cancel() and close() is called very quickly after the query is started, then they may both call cleanup() before Spark Jobs are started. Then sqlContext.sparkContext.cancelJobGroup(statementId) does nothing.
But then the execute thread can start the jobs, and only then get interrupted and exit through here. But then it will exit here, and no-one will cancel these jobs and they will keep running even though this execution has exited.

I think it can be fixed by:

      case e: Throwable =>
        // In any case, cancel any remaining running jobs.
        // E.g. a cancel() operation could have called cleanup() which canceled the Jobs before
        // they started.
        if (statementId != null) {
          sqlContext.sparkContext.cancelJobGroup(statementId)
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you point.
When cleanup().
SparkContext haven't setup jobGroup.
But execute thread start execute and setup jobGroup.
cleanup() can cancel background thread task but can't promise cancelJobGroup since it may be called before sparkContext setupJobGroup

But cancelJobGroup here seem can't stop execute() method run.

val currentState = getStatus().getState()
logError(s"Error executing query, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e.toString)
if (currentState == OperationState.CANCELED) {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you either don't need the else or the return
I would keep the return and get rid of the else to save on indentation.

} else {
logError(s"Error executing query, currentState $currentState, ", e)
Copy link
Contributor

@juliuszsompolski juliuszsompolski Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Error executing query with $statementId

setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e.toString)
}
}
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
Expand All @@ -275,6 +279,7 @@ private[hive] class SparkExecuteStatementOperation(
override def cancel(): Unit = {
logInfo(s"Cancel '$statement' with $statementId")
cleanup(OperationState.CANCELED)
HiveThriftServer2.listener.onStatementCanceled(statementId)
}

private def cleanup(state: OperationState) {
Expand Down