diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index abb53cf3429f..36d4ac095e10 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -138,7 +138,7 @@ object HiveThriftServer2 extends Logging { } private[thriftserver] object ExecutionState extends Enumeration { - val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value + val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } @@ -174,16 +174,31 @@ object HiveThriftServer2 extends Logging { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { server.stop() } - private var onlineSessionNum: Int = 0 private val sessionList = new mutable.LinkedHashMap[String, SessionInfo] private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo] private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT) private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT) - private var totalRunning = 0 - def getOnlineSessionNum: Int = synchronized { onlineSessionNum } + def getOnlineSessionNum: Int = synchronized { + sessionList.count(_._2.finishTimestamp == 0) + } - def getTotalRunning: Int = synchronized { totalRunning } + def isExecutionActive(execInfo: ExecutionInfo): Boolean = { + !(execInfo.state == ExecutionState.FAILED || + execInfo.state == ExecutionState.CANCELED || + execInfo.state == ExecutionState.CLOSED) + } + + /** + * When an error or a cancellation occurs, we set the finishTimestamp of the statement. + * Therefore, when we count the number of running statements, we need to exclude errors and + * cancellations and count all statements that have not been closed so far. + */ + def getTotalRunning: Int = synchronized { + executionList.count { + case (_, v) => isExecutionActive(v) + } + } def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq } @@ -208,14 +223,12 @@ object HiveThriftServer2 extends Logging { synchronized { val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName) sessionList.put(sessionId, info) - onlineSessionNum += 1 trimSessionIfNecessary() } } def onSessionClosed(sessionId: String): Unit = synchronized { sessionList(sessionId).finishTimestamp = System.currentTimeMillis - onlineSessionNum -= 1 trimSessionIfNecessary() } @@ -231,7 +244,6 @@ object HiveThriftServer2 extends Logging { trimExecutionIfNecessary() sessionList(sessionId).totalExecution += 1 executionList(id).groupId = groupId - totalRunning += 1 } def onStatementParsed(id: String, executionPlan: String): Unit = synchronized { @@ -239,20 +251,22 @@ object HiveThriftServer2 extends Logging { executionList(id).state = ExecutionState.COMPILED } - def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = { - synchronized { - executionList(id).finishTimestamp = System.currentTimeMillis - executionList(id).detail = errorMessage - executionList(id).state = ExecutionState.FAILED - totalRunning -= 1 - trimExecutionIfNecessary() - } + def onStatementCanceled(id: String): Unit = synchronized { + executionList(id).finishTimestamp = System.currentTimeMillis + executionList(id).state = ExecutionState.CANCELED + trimExecutionIfNecessary() + } + + def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized { + executionList(id).finishTimestamp = System.currentTimeMillis + executionList(id).detail = errorMsg + executionList(id).state = ExecutionState.FAILED + trimExecutionIfNecessary() } def onStatementFinish(id: String): Unit = synchronized { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).state = ExecutionState.FINISHED - totalRunning -= 1 trimExecutionIfNecessary() } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 2f011c25fe2c..69e85484ccf8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -72,9 +72,8 @@ private[hive] class SparkExecuteStatementOperation( override def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - logDebug(s"CLOSING $statementId") + logInfo(s"Close statement with $statementId") cleanup(OperationState.CLOSED) - sqlContext.sparkContext.clearJobGroup() HiveThriftServer2.listener.onOperationClosed(statementId) } @@ -159,6 +158,14 @@ private[hive] class SparkExecuteStatementOperation( override def runInternal(): Unit = { setState(OperationState.PENDING) + statementId = UUID.randomUUID().toString + logInfo(s"Submitting query '$statement' with $statementId") + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + statement, + statementId, + parentSession.getUsername) setHasResultSet(true) // avoid no resultset for async run if (!runInBackground) { @@ -201,33 +208,38 @@ private[hive] class SparkExecuteStatementOperation( setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => + logError("Error submitting query in background, query rejected", rejected) setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, rejected.getMessage, SparkUtils.exceptionString(rejected)) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) throw new HiveSQLException(e) } } } private def execute(): Unit = withSchedulerPool { - statementId = UUID.randomUUID().toString - logInfo(s"Running query '$statement' with $statementId") - setState(OperationState.RUNNING) - // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader - Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - - HiveThriftServer2.listener.onStatementStart( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - statement, - statementId, - parentSession.getUsername) - sqlContext.sparkContext.setJobGroup(statementId, statement) try { + synchronized { + if (getStatus.getState.isTerminal) { + logInfo(s"Query with $statementId in terminal state before it started running") + return + } else { + logInfo(s"Running query with $statementId") + setState(OperationState.RUNNING) + } + } + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + + sqlContext.sparkContext.setJobGroup(statementId, statement) result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { @@ -249,32 +261,43 @@ private[hive] class SparkExecuteStatementOperation( } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray } catch { - case e: HiveSQLException => - if (getStatus().getState() == OperationState.CANCELED) { - return + // Actually do need to catch Throwable as some failures don't inherit from Exception and + // HiveServer will silently swallow them. + case e: Throwable => + val currentState = getStatus().getState() + if (currentState.isTerminal) { + // This may happen if the execution was cancelled, and then closed from another thread. + logWarning(s"Ignore exception in terminal state with $statementId: $e") } else { + logError(s"Error executing query with $statementId, currentState $currentState, ", e) setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e + if (e.isInstanceOf[HiveSQLException]) { + throw e.asInstanceOf[HiveSQLException] + } else { + throw new HiveSQLException("Error running query: " + e.toString, e) + } } - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. - case e: Throwable => - val currentState = getStatus().getState() - logError(s"Error executing query, currentState $currentState, ", e) - setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e.toString) + } finally { + synchronized { + if (!getStatus.getState.isTerminal) { + setState(OperationState.FINISHED) + HiveThriftServer2.listener.onStatementFinish(statementId) + } + } + sqlContext.sparkContext.clearJobGroup() } - setState(OperationState.FINISHED) - HiveThriftServer2.listener.onStatementFinish(statementId) } override def cancel(): Unit = { - logInfo(s"Cancel '$statement' with $statementId") - cleanup(OperationState.CANCELED) + synchronized { + if (!getStatus.getState.isTerminal) { + logInfo(s"Cancel query with $statementId") + cleanup(OperationState.CANCELED) + HiveThriftServer2.listener.onStatementCanceled(statementId) + } + } } private def cleanup(state: OperationState) {