From b84d385aaa7c53f10a353b5c0509ac1af9f54a4d Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 28 Aug 2019 23:19:30 +0800 Subject: [PATCH 01/30] [SPARK-28901] Support cancel SQL operation. --- .../server/SparkSQLOperationManager.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 35f92547e781..645b99f605bf 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -145,6 +145,25 @@ private[thriftserver] class SparkSQLOperationManager() operation } + override def cancelOperation(opHandle: OperationHandle): Unit = { + val operation = getOperation(opHandle) + val opState = operation.getStatus.getState + if ((opState eq OperationState.CANCELED) || + (opState eq OperationState.CLOSED) || + (opState eq OperationState.FINISHED) || + (opState eq OperationState.ERROR) || + (opState eq OperationState.UNKNOWN)) { // Cancel should be a no-op in either cases + logDebug(opHandle + ": Operation is already aborted in state - " + opState) + } else { + logDebug(opHandle + ": Attempting to cancel from state - " + opState) + if (operation.isInstanceOf[SparkExecuteStatementOperation]) { + operation.asInstanceOf[SparkExecuteStatementOperation].cancelStatement() + } else { + operation.cancel() + } + } + } + def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = { val iterator = confMap.entrySet().iterator() while (iterator.hasNext) { From 07d56798f33f2e2ddd80da4f9114e139669a4224 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 28 Aug 2019 23:20:29 +0800 Subject: [PATCH 02/30] commit code --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 10 +++++++++- .../thriftserver/SparkExecuteStatementOperation.scala | 8 ++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index abb53cf3429f..950990f08f83 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -138,7 +138,7 @@ object HiveThriftServer2 extends Logging { } private[thriftserver] object ExecutionState extends Enumeration { - val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value + val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } @@ -239,6 +239,14 @@ object HiveThriftServer2 extends Logging { executionList(id).state = ExecutionState.COMPILED } + def onStatementCanceled(id: String): Unit = synchronized { + executionList(id).finishTimestamp = System.currentTimeMillis + executionList(id).state = ExecutionState.CANCELED + totalRunning -= 1 + // for progress bar + trimExecutionIfNecessary() + } + def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = { synchronized { executionList(id).finishTimestamp = System.currentTimeMillis diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 2f011c25fe2c..b5257a097860 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -70,6 +70,14 @@ private[hive] class SparkExecuteStatementOperation( } } + def cancelStatement(): Unit = { + // RDDs will be cleaned automatically upon garbage collection. + logDebug(s"CANCELING $statementId") + cleanup(OperationState.CANCELED) + sqlContext.sparkContext.clearJobGroup() + HiveThriftServer2.listener.onStatementCanceled(statementId) + } + override def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. logDebug(s"CLOSING $statementId") From 187a4a6de3802f0167a073e35e71035906ba151c Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 29 Aug 2019 09:48:32 +0800 Subject: [PATCH 03/30] the same as other ethod --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 950990f08f83..aea10d0d33e3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -239,12 +239,13 @@ object HiveThriftServer2 extends Logging { executionList(id).state = ExecutionState.COMPILED } - def onStatementCanceled(id: String): Unit = synchronized { - executionList(id).finishTimestamp = System.currentTimeMillis - executionList(id).state = ExecutionState.CANCELED - totalRunning -= 1 - // for progress bar - trimExecutionIfNecessary() + def onStatementCanceled(id: String): Unit = { + synchronized { + executionList(id).finishTimestamp = System.currentTimeMillis + executionList(id).state = ExecutionState.CANCELED + totalRunning -= 1 + trimExecutionIfNecessary() + } } def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = { From 44ba7230fffbfc6028e00898c04d5988794d87ca Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 29 Aug 2019 11:09:55 +0800 Subject: [PATCH 04/30] fix conflict between cancel and close status --- .../SparkExecuteStatementOperation.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index b5257a097860..216ccbe7e3c1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -270,11 +270,15 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => val currentState = getStatus().getState() - logError(s"Error executing query, currentState $currentState, ", e) - setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e.toString) + if (currentState == OperationState.CANCELED) { + return + } else { + logError(s"Error executing query, currentState $currentState, ", e) + setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e.toString) + } } setState(OperationState.FINISHED) HiveThriftServer2.listener.onStatementFinish(statementId) From e8f902e67f3c862f696f4fb405c36ae76c9bcb82 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 29 Aug 2019 14:25:37 +0800 Subject: [PATCH 05/30] use old cancel method, a little mistake of code --- .../SparkExecuteStatementOperation.scala | 9 +-------- .../server/SparkSQLOperationManager.scala | 19 ------------------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 216ccbe7e3c1..fab7fcf1dfbb 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -70,14 +70,6 @@ private[hive] class SparkExecuteStatementOperation( } } - def cancelStatement(): Unit = { - // RDDs will be cleaned automatically upon garbage collection. - logDebug(s"CANCELING $statementId") - cleanup(OperationState.CANCELED) - sqlContext.sparkContext.clearJobGroup() - HiveThriftServer2.listener.onStatementCanceled(statementId) - } - override def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. logDebug(s"CLOSING $statementId") @@ -287,6 +279,7 @@ private[hive] class SparkExecuteStatementOperation( override def cancel(): Unit = { logInfo(s"Cancel '$statement' with $statementId") cleanup(OperationState.CANCELED) + HiveThriftServer2.listener.onStatementCanceled(statementId) } private def cleanup(state: OperationState) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 645b99f605bf..35f92547e781 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -145,25 +145,6 @@ private[thriftserver] class SparkSQLOperationManager() operation } - override def cancelOperation(opHandle: OperationHandle): Unit = { - val operation = getOperation(opHandle) - val opState = operation.getStatus.getState - if ((opState eq OperationState.CANCELED) || - (opState eq OperationState.CLOSED) || - (opState eq OperationState.FINISHED) || - (opState eq OperationState.ERROR) || - (opState eq OperationState.UNKNOWN)) { // Cancel should be a no-op in either cases - logDebug(opHandle + ": Operation is already aborted in state - " + opState) - } else { - logDebug(opHandle + ": Attempting to cancel from state - " + opState) - if (operation.isInstanceOf[SparkExecuteStatementOperation]) { - operation.asInstanceOf[SparkExecuteStatementOperation].cancelStatement() - } else { - operation.cancel() - } - } - } - def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = { val iterator = confMap.entrySet().iterator() while (iterator.hasNext) { From d4d1943e222e62fcb1190d67bddceb49d6b89bde Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Aug 2019 10:52:58 +0800 Subject: [PATCH 06/30] fix call cancel before run execut() --- .../hive/thriftserver/HiveThriftServer2.scala | 47 ++++++++++--------- .../SparkExecuteStatementOperation.scala | 38 ++++++++++----- 2 files changed, 51 insertions(+), 34 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index aea10d0d33e3..bc25a7b62a5f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -138,7 +138,7 @@ object HiveThriftServer2 extends Logging { } private[thriftserver] object ExecutionState extends Enumeration { - val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value + val PREPARED, STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } @@ -219,18 +219,22 @@ object HiveThriftServer2 extends Logging { trimSessionIfNecessary() } - def onStatementStart( - id: String, - sessionId: String, - statement: String, - groupId: String, - userName: String = "UNKNOWN"): Unit = synchronized { + def onStatementPrepared( + id: String, + sessionId: String, + statement: String, + groupId: String, + userName: String = "UNKNOWN"): Unit = synchronized { val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName) - info.state = ExecutionState.STARTED + info.state = ExecutionState.PREPARED executionList.put(id, info) trimExecutionIfNecessary() sessionList(sessionId).totalExecution += 1 executionList(id).groupId = groupId + } + + def onStatementStart(id: String): Unit = synchronized { + executionList(id).state = ExecutionState.STARTED totalRunning += 1 } @@ -239,23 +243,20 @@ object HiveThriftServer2 extends Logging { executionList(id).state = ExecutionState.COMPILED } - def onStatementCanceled(id: String): Unit = { - synchronized { - executionList(id).finishTimestamp = System.currentTimeMillis - executionList(id).state = ExecutionState.CANCELED - totalRunning -= 1 - trimExecutionIfNecessary() - } + def onStatementCanceled(id: String): Unit = synchronized { + executionList(id).finishTimestamp = System.currentTimeMillis + executionList(id).state = ExecutionState.CANCELED + totalRunning -= 1 + trimExecutionIfNecessary() } - def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = { - synchronized { - executionList(id).finishTimestamp = System.currentTimeMillis - executionList(id).detail = errorMessage - executionList(id).state = ExecutionState.FAILED - totalRunning -= 1 - trimExecutionIfNecessary() - } + + def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = synchronized { + executionList(id).finishTimestamp = System.currentTimeMillis + executionList(id).detail = errorMessage + executionList(id).state = ExecutionState.FAILED + totalRunning -= 1 + trimExecutionIfNecessary() } def onStatementFinish(id: String): Unit = synchronized { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index fab7fcf1dfbb..1713e9efa253 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -160,9 +160,16 @@ private[hive] class SparkExecuteStatementOperation( override def runInternal(): Unit = { setState(OperationState.PENDING) setHasResultSet(true) // avoid no resultset for async run + statementId = UUID.randomUUID().toString + HiveThriftServer2.listener.onStatementPrepared( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + statement, + statementId, + parentSession.getUsername) if (!runInBackground) { - execute() + executeWhenNotTerminalStatus() } else { val sparkServiceUGI = Utils.getUGI() @@ -175,7 +182,7 @@ private[hive] class SparkExecuteStatementOperation( override def run(): Unit = { registerCurrentOperationLog() try { - execute() + executeWhenNotTerminalStatus() } catch { case e: HiveSQLException => setOperationException(e) @@ -212,20 +219,22 @@ private[hive] class SparkExecuteStatementOperation( } } + private def executeWhenNotTerminalStatus(): Unit = { + if(getStatus.getState != OperationState.CANCELED && + getStatus.getState != OperationState.CLOSED && + getStatus.getState != OperationState.FINISHED) { + execute() + } + } + private def execute(): Unit = withSchedulerPool { - statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - HiveThriftServer2.listener.onStatementStart( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - statement, - statementId, - parentSession.getUsername) + HiveThriftServer2.listener.onStatementStart(statementId) sqlContext.sparkContext.setJobGroup(statementId, statement) try { result = sqlContext.sql(statement) @@ -250,7 +259,10 @@ private[hive] class SparkExecuteStatementOperation( dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray } catch { case e: HiveSQLException => - if (getStatus().getState() == OperationState.CANCELED) { + val currentState = getStatus().getState() + if (currentState == OperationState.CANCELED || + currentState == OperationState.CLOSED || + currentState == OperationState.FINISHED) { return } else { setState(OperationState.ERROR) @@ -262,7 +274,11 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => val currentState = getStatus().getState() - if (currentState == OperationState.CANCELED) { + if (currentState == OperationState.CANCELED || + currentState == OperationState.CLOSED || + currentState == OperationState.FINISHED) { + // This may happen if the execution was cancelled, and then closed from another thread. + logWarning(s"Ignore exception in terminal state with $statementId: $e") return } else { logError(s"Error executing query, currentState $currentState, ", e) From 7d77b0c90550f62fa90c2e523c4282b12eaa2700 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Aug 2019 11:00:41 +0800 Subject: [PATCH 07/30] remove dunplicated code --- .../SparkExecuteStatementOperation.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 1713e9efa253..dc7fb200c9b3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -258,18 +258,6 @@ private[hive] class SparkExecuteStatementOperation( } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray } catch { - case e: HiveSQLException => - val currentState = getStatus().getState() - if (currentState == OperationState.CANCELED || - currentState == OperationState.CLOSED || - currentState == OperationState.FINISHED) { - return - } else { - setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e - } // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => From 72b885d81accfd4988016d9e7a05a1f3a0e744f5 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Aug 2019 11:08:28 +0800 Subject: [PATCH 08/30] fix throw exception --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index dc7fb200c9b3..93505d3a483f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -273,7 +273,11 @@ private[hive] class SparkExecuteStatementOperation( setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e.toString) + if (e.isInstanceOf[HiveSQLException]) { + throw e.asInstanceOf[HiveSQLException] + } else { + throw new HiveSQLException("Error running query: " + e.toString, e) + } } } setState(OperationState.FINISHED) From d2d6cc5d6f402629dbb74d743bf8ded373a76998 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Aug 2019 11:49:31 +0800 Subject: [PATCH 09/30] close before cancel or finish, also reduce totalRunning --- .../spark/sql/hive/thriftserver/HiveThriftServer2.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index bc25a7b62a5f..129a5a21d80f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -268,7 +268,11 @@ object HiveThriftServer2 extends Logging { def onOperationClosed(id: String): Unit = synchronized { executionList(id).closeTimestamp = System.currentTimeMillis + val lastState = executionList(id).state executionList(id).state = ExecutionState.CLOSED + if (lastState == ExecutionState.STARTED || lastState == ExecutionState.COMPILED) { + totalRunning -= 1 + } } private def trimExecutionIfNecessary() = { From 4c9d5f196c7a293187022efcf578eccde2a28877 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Aug 2019 11:49:45 +0800 Subject: [PATCH 10/30] fix other operation --- .../thriftserver/SparkGetCatalogsOperation.scala | 14 ++++++++------ .../thriftserver/SparkGetColumnsOperation.scala | 13 +++++++------ .../thriftserver/SparkGetFunctionsOperation.scala | 14 ++++++++------ .../thriftserver/SparkGetSchemasOperation.scala | 14 ++++++++------ .../thriftserver/SparkGetTableTypesOperation.scala | 14 ++++++++------ .../thriftserver/SparkGetTablesOperation.scala | 14 ++++++++------ 6 files changed, 47 insertions(+), 36 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala index cde99fd35bd5..6a412c6c40fd 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala @@ -50,18 +50,20 @@ private[hive] class SparkGetCatalogsOperation( statementId = UUID.randomUUID().toString val logMsg = "Listing catalogs" logInfo(s"$logMsg with $statementId") - setState(OperationState.RUNNING) - // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader - Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - - HiveThriftServer2.listener.onStatementStart( + HiveThriftServer2.listener.onStatementPrepared( statementId, parentSession.getSessionHandle.getSessionId.toString, logMsg, statementId, parentSession.getUsername) + setState(OperationState.RUNNING) + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + + HiveThriftServer2.listener.onStatementStart(statementId) + try { if (isAuthV2Enabled) { authorizeMetaGets(HiveOperationType.GET_CATALOGS, null) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 89faff2f6f91..11f4d28dc480 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -71,18 +71,19 @@ private[hive] class SparkGetColumnsOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'" logInfo(s"$logMsg with $statementId") + HiveThriftServer2.listener.onStatementPrepared( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - HiveThriftServer2.listener.onStatementStart( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) + HiveThriftServer2.listener.onStatementStart(statementId) val schemaPattern = convertSchemaPattern(schemaName) val tablePattern = convertIdentifierPattern(tableName, true) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 462e57300e82..fd46d898a4b6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -62,6 +62,13 @@ private[hive] class SparkGetFunctionsOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'" logInfo(s"$logMsg with $statementId") + HiveThriftServer2.listener.onStatementPrepared( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader @@ -80,12 +87,7 @@ private[hive] class SparkGetFunctionsOperation( authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr) } - HiveThriftServer2.listener.onStatementStart( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) + HiveThriftServer2.listener.onStatementStart(statementId) try { matchingDbs.foreach { db => diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index 87ef154bcc8a..458394c6a528 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -58,6 +58,13 @@ private[hive] class SparkGetSchemasOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing databases '$cmdStr'" logInfo(s"$logMsg with $statementId") + HiveThriftServer2.listener.onStatementPrepared( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader @@ -67,12 +74,7 @@ private[hive] class SparkGetSchemasOperation( authorizeMetaGets(HiveOperationType.GET_TABLES, null, cmdStr) } - HiveThriftServer2.listener.onStatementStart( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) + HiveThriftServer2.listener.onStatementStart(statementId) try { val schemaPattern = convertSchemaPattern(schemaName) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index 8f2257f77d2a..8cab4627f2ca 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -51,6 +51,13 @@ private[hive] class SparkGetTableTypesOperation( statementId = UUID.randomUUID().toString val logMsg = "Listing table types" logInfo(s"$logMsg with $statementId") + HiveThriftServer2.listener.onStatementPrepared( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader @@ -60,12 +67,7 @@ private[hive] class SparkGetTableTypesOperation( authorizeMetaGets(HiveOperationType.GET_TABLETYPES, null) } - HiveThriftServer2.listener.onStatementStart( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) + HiveThriftServer2.listener.onStatementStart(statementId) try { val tableTypes = CatalogTableType.tableTypes.map(tableTypeString).toSet diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 6441dc50f49f..f2116cf96451 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -69,6 +69,13 @@ private[hive] class SparkGetTablesOperation( val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",") val logMsg = s"Listing tables '$cmdStr, tableTypes : $tableTypesStr, tableName : $tableName'" logInfo(s"$logMsg with $statementId") + HiveThriftServer2.listener.onStatementPrepared( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader @@ -85,12 +92,7 @@ private[hive] class SparkGetTablesOperation( authorizeMetaGets(HiveOperationType.GET_TABLES, privObjs, cmdStr) } - HiveThriftServer2.listener.onStatementStart( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) + HiveThriftServer2.listener.onStatementStart(statementId) try { // Tables and views From 7b43b59d191f84facfffa04f5b8604e2e8b17cf4 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Aug 2019 11:52:33 +0800 Subject: [PATCH 11/30] fix conflicts between cancel and finish --- .../SparkExecuteStatementOperation.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 93505d3a483f..6c2899d46104 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -280,14 +280,21 @@ private[hive] class SparkExecuteStatementOperation( } } } - setState(OperationState.FINISHED) - HiveThriftServer2.listener.onStatementFinish(statementId) + synchronized { + if (!getStatus.getState.isTerminal) { + setState(OperationState.FINISHED) + HiveThriftServer2.listener.onStatementFinish(statementId) + } + } } override def cancel(): Unit = { - logInfo(s"Cancel '$statement' with $statementId") - cleanup(OperationState.CANCELED) - HiveThriftServer2.listener.onStatementCanceled(statementId) + synchronized { + if (!getStatus.getState.isTerminal) { + setState(OperationState.FINISHED) + HiveThriftServer2.listener.onStatementFinish(statementId) + } + } } private def cleanup(state: OperationState) { From 3744fc9c62e5618a286b1c2e9aabe2e6b70d7afb Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Aug 2019 12:02:03 +0800 Subject: [PATCH 12/30] move try block to satrt of execute(0 --- .../SparkExecuteStatementOperation.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 6c2899d46104..4f5067d24651 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -228,15 +228,16 @@ private[hive] class SparkExecuteStatementOperation( } private def execute(): Unit = withSchedulerPool { - logInfo(s"Running query '$statement' with $statementId") - setState(OperationState.RUNNING) - // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader - Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - - HiveThriftServer2.listener.onStatementStart(statementId) - sqlContext.sparkContext.setJobGroup(statementId, statement) try { + logInfo(s"Running query '$statement' with $statementId") + setState(OperationState.RUNNING) + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + + HiveThriftServer2.listener.onStatementStart(statementId) + sqlContext.sparkContext.setJobGroup(statementId, statement) + result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { From 507016193872cd74dd87d13087601022bac8ef09 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Aug 2019 12:10:09 +0800 Subject: [PATCH 13/30] fix scala style --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 129a5a21d80f..737c488529e1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -220,11 +220,11 @@ object HiveThriftServer2 extends Logging { } def onStatementPrepared( - id: String, - sessionId: String, - statement: String, - groupId: String, - userName: String = "UNKNOWN"): Unit = synchronized { + id: String, + sessionId: String, + statement: String, + groupId: String, + userName: String = "UNKNOWN"): Unit = synchronized { val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName) info.state = ExecutionState.PREPARED executionList.put(id, info) From 41ab7d732bc300c58ece53c02de2a8d77ccb8901 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Aug 2019 13:13:32 +0800 Subject: [PATCH 14/30] fix code style --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 4f5067d24651..a25f1de780e7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -220,9 +220,7 @@ private[hive] class SparkExecuteStatementOperation( } private def executeWhenNotTerminalStatus(): Unit = { - if(getStatus.getState != OperationState.CANCELED && - getStatus.getState != OperationState.CLOSED && - getStatus.getState != OperationState.FINISHED) { + if(!getStatus.getState.isTerminal) { execute() } } From 87fa08f21e824a93c775c4bc9beb4d1a12efc436 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 31 Aug 2019 00:23:04 +0800 Subject: [PATCH 15/30] fix error --- .../hive/thriftserver/HiveThriftServer2.scala | 4 ++- .../SparkExecuteStatementOperation.scala | 25 +++++++++---------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 737c488529e1..6e074dfd5ebf 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -251,7 +251,9 @@ object HiveThriftServer2 extends Logging { } - def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = synchronized { + def onStatementError(id: String, + errorMessage: String, + errorTrace: String): Unit = synchronized { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).detail = errorMessage executionList(id).state = ExecutionState.FAILED diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index a25f1de780e7..1c4f88829546 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -169,7 +169,10 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) if (!runInBackground) { - executeWhenNotTerminalStatus() + if (getStatus.getState.isTerminal) { + return + } + execute() } else { val sparkServiceUGI = Utils.getUGI() @@ -182,7 +185,10 @@ private[hive] class SparkExecuteStatementOperation( override def run(): Unit = { registerCurrentOperationLog() try { - executeWhenNotTerminalStatus() + if (getStatus.getState.isTerminal) { + return + } + execute() } catch { case e: HiveSQLException => setOperationException(e) @@ -219,12 +225,6 @@ private[hive] class SparkExecuteStatementOperation( } } - private def executeWhenNotTerminalStatus(): Unit = { - if(!getStatus.getState.isTerminal) { - execute() - } - } - private def execute(): Unit = withSchedulerPool { try { logInfo(s"Running query '$statement' with $statementId") @@ -261,9 +261,7 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => val currentState = getStatus().getState() - if (currentState == OperationState.CANCELED || - currentState == OperationState.CLOSED || - currentState == OperationState.FINISHED) { + if (currentState.isTerminal) { // This may happen if the execution was cancelled, and then closed from another thread. logWarning(s"Ignore exception in terminal state with $statementId: $e") return @@ -290,8 +288,9 @@ private[hive] class SparkExecuteStatementOperation( override def cancel(): Unit = { synchronized { if (!getStatus.getState.isTerminal) { - setState(OperationState.FINISHED) - HiveThriftServer2.listener.onStatementFinish(statementId) + logInfo(s"Cancel '$statement' with $statementId") + cleanup(OperationState.CANCELED) + HiveThriftServer2.listener.onStatementCanceled(statementId) } } } From 63e8b59cfb5a56716c94437e76e7c48a5bead37a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 31 Aug 2019 00:27:32 +0800 Subject: [PATCH 16/30] remove PREPARED --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 10 +++------- .../SparkExecuteStatementOperation.scala | 3 +-- .../thriftserver/SparkGetCatalogsOperation.scala | 13 ++++++------- .../thriftserver/SparkGetColumnsOperation.scala | 13 ++++++------- .../thriftserver/SparkGetFunctionsOperation.scala | 13 ++++++------- .../thriftserver/SparkGetSchemasOperation.scala | 13 ++++++------- .../thriftserver/SparkGetTableTypesOperation.scala | 13 ++++++------- .../hive/thriftserver/SparkGetTablesOperation.scala | 13 ++++++------- 8 files changed, 40 insertions(+), 51 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6e074dfd5ebf..a12da421518d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -138,7 +138,7 @@ object HiveThriftServer2 extends Logging { } private[thriftserver] object ExecutionState extends Enumeration { - val PREPARED, STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value + val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } @@ -219,22 +219,18 @@ object HiveThriftServer2 extends Logging { trimSessionIfNecessary() } - def onStatementPrepared( + def onStatementStart( id: String, sessionId: String, statement: String, groupId: String, userName: String = "UNKNOWN"): Unit = synchronized { val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName) - info.state = ExecutionState.PREPARED + info.state = ExecutionState.STARTED executionList.put(id, info) trimExecutionIfNecessary() sessionList(sessionId).totalExecution += 1 executionList(id).groupId = groupId - } - - def onStatementStart(id: String): Unit = synchronized { - executionList(id).state = ExecutionState.STARTED totalRunning += 1 } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 1c4f88829546..c3320a282c1c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -161,7 +161,7 @@ private[hive] class SparkExecuteStatementOperation( setState(OperationState.PENDING) setHasResultSet(true) // avoid no resultset for async run statementId = UUID.randomUUID().toString - HiveThriftServer2.listener.onStatementPrepared( + HiveThriftServer2.listener.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, statement, @@ -233,7 +233,6 @@ private[hive] class SparkExecuteStatementOperation( val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - HiveThriftServer2.listener.onStatementStart(statementId) sqlContext.sparkContext.setJobGroup(statementId, statement) result = sqlContext.sql(statement) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala index 6a412c6c40fd..839205a5d421 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala @@ -50,19 +50,18 @@ private[hive] class SparkGetCatalogsOperation( statementId = UUID.randomUUID().toString val logMsg = "Listing catalogs" logInfo(s"$logMsg with $statementId") - HiveThriftServer2.listener.onStatementPrepared( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - HiveThriftServer2.listener.onStatementStart(statementId) + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) try { if (isAuthV2Enabled) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 11f4d28dc480..89faff2f6f91 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -71,19 +71,18 @@ private[hive] class SparkGetColumnsOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'" logInfo(s"$logMsg with $statementId") - HiveThriftServer2.listener.onStatementPrepared( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - HiveThriftServer2.listener.onStatementStart(statementId) + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) val schemaPattern = convertSchemaPattern(schemaName) val tablePattern = convertIdentifierPattern(tableName, true) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index fd46d898a4b6..0f0061909e19 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -62,12 +62,6 @@ private[hive] class SparkGetFunctionsOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'" logInfo(s"$logMsg with $statementId") - HiveThriftServer2.listener.onStatementPrepared( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. @@ -87,7 +81,12 @@ private[hive] class SparkGetFunctionsOperation( authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr) } - HiveThriftServer2.listener.onStatementStart(statementId) + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) try { matchingDbs.foreach { db => diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index 458394c6a528..b9171a0aa28c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -58,12 +58,6 @@ private[hive] class SparkGetSchemasOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing databases '$cmdStr'" logInfo(s"$logMsg with $statementId") - HiveThriftServer2.listener.onStatementPrepared( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. @@ -74,7 +68,12 @@ private[hive] class SparkGetSchemasOperation( authorizeMetaGets(HiveOperationType.GET_TABLES, null, cmdStr) } - HiveThriftServer2.listener.onStatementStart(statementId) + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) try { val schemaPattern = convertSchemaPattern(schemaName) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index 8cab4627f2ca..456e1065308c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -51,12 +51,6 @@ private[hive] class SparkGetTableTypesOperation( statementId = UUID.randomUUID().toString val logMsg = "Listing table types" logInfo(s"$logMsg with $statementId") - HiveThriftServer2.listener.onStatementPrepared( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. @@ -67,7 +61,12 @@ private[hive] class SparkGetTableTypesOperation( authorizeMetaGets(HiveOperationType.GET_TABLETYPES, null) } - HiveThriftServer2.listener.onStatementStart(statementId) + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) try { val tableTypes = CatalogTableType.tableTypes.map(tableTypeString).toSet diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index f2116cf96451..5e93a7a2bfe7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -69,12 +69,6 @@ private[hive] class SparkGetTablesOperation( val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",") val logMsg = s"Listing tables '$cmdStr, tableTypes : $tableTypesStr, tableName : $tableName'" logInfo(s"$logMsg with $statementId") - HiveThriftServer2.listener.onStatementPrepared( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - logMsg, - statementId, - parentSession.getUsername) setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. @@ -92,7 +86,12 @@ private[hive] class SparkGetTablesOperation( authorizeMetaGets(HiveOperationType.GET_TABLES, privObjs, cmdStr) } - HiveThriftServer2.listener.onStatementStart(statementId) + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) try { // Tables and views From bea260a8bdc0022ec8c1e47c36956d63b7828f5d Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 31 Aug 2019 00:29:40 +0800 Subject: [PATCH 17/30] remove empty line --- .../spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala | 1 - .../spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala | 1 - .../spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala | 1 - .../spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala | 1 - .../sql/hive/thriftserver/SparkGetTableTypesOperation.scala | 1 - .../spark/sql/hive/thriftserver/SparkGetTablesOperation.scala | 1 - 6 files changed, 6 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala index 839205a5d421..cde99fd35bd5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala @@ -50,7 +50,6 @@ private[hive] class SparkGetCatalogsOperation( statementId = UUID.randomUUID().toString val logMsg = "Listing catalogs" logInfo(s"$logMsg with $statementId") - setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 89faff2f6f91..17a0d563609b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -71,7 +71,6 @@ private[hive] class SparkGetColumnsOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'" logInfo(s"$logMsg with $statementId") - setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 0f0061909e19..462e57300e82 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -62,7 +62,6 @@ private[hive] class SparkGetFunctionsOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'" logInfo(s"$logMsg with $statementId") - setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index b9171a0aa28c..87ef154bcc8a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -58,7 +58,6 @@ private[hive] class SparkGetSchemasOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing databases '$cmdStr'" logInfo(s"$logMsg with $statementId") - setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index 456e1065308c..8f2257f77d2a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -51,7 +51,6 @@ private[hive] class SparkGetTableTypesOperation( statementId = UUID.randomUUID().toString val logMsg = "Listing table types" logInfo(s"$logMsg with $statementId") - setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 5e93a7a2bfe7..6441dc50f49f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -69,7 +69,6 @@ private[hive] class SparkGetTablesOperation( val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",") val logMsg = s"Listing tables '$cmdStr, tableTypes : $tableTypesStr, tableName : $tableName'" logInfo(s"$logMsg with $statementId") - setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader From 7e56c1499b3bb3f04cca447110459314a0641b65 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 2 Sep 2019 17:53:40 +0800 Subject: [PATCH 18/30] fix scala style --- .../spark/sql/hive/thriftserver/HiveThriftServer2.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index a12da421518d..6774a3c9bbde 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -247,9 +247,7 @@ object HiveThriftServer2 extends Logging { } - def onStatementError(id: String, - errorMessage: String, - errorTrace: String): Unit = synchronized { + def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = synchronized { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).detail = errorMessage executionList(id).state = ExecutionState.FAILED From 8b25006e703128149cfdca0c430f15338111b0f6 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 2 Sep 2019 18:00:28 +0800 Subject: [PATCH 19/30] remove sync operation judge terminal --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index c3320a282c1c..5618af2adcf0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -169,9 +169,6 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) if (!runInBackground) { - if (getStatus.getState.isTerminal) { - return - } execute() } else { val sparkServiceUGI = Utils.getUGI() From ccd7de914ad51f42624e01a48d422178f5769b32 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 2 Sep 2019 19:14:13 +0800 Subject: [PATCH 20/30] add empty line --- .../spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 17a0d563609b..05c49fbce0b7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -71,6 +71,7 @@ private[hive] class SparkGetColumnsOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'" logInfo(s"$logMsg with $statementId") + setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader From c6651f1eadd36a2e94a5b724430ce23c97c2818f Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 2 Sep 2019 19:17:38 +0800 Subject: [PATCH 21/30] revert --- .../spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 05c49fbce0b7..89faff2f6f91 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -71,7 +71,7 @@ private[hive] class SparkGetColumnsOperation( val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'" logInfo(s"$logMsg with $statementId") - + setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader From 00f355326289f4ede862d3180d7fa6bad1227bd9 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 3 Sep 2019 13:32:52 +0800 Subject: [PATCH 22/30] save code --- .../SparkExecuteStatementOperation.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 5618af2adcf0..c8538ff1d03e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -48,7 +48,7 @@ private[hive] class SparkExecuteStatementOperation( runInBackground: Boolean = true) (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) - with Logging { + with Logging { private var result: DataFrame = _ @@ -161,6 +161,7 @@ private[hive] class SparkExecuteStatementOperation( setState(OperationState.PENDING) setHasResultSet(true) // avoid no resultset for async run statementId = UUID.randomUUID().toString + logInfo(s"Submitting query '$statement' with $statementId") HiveThriftServer2.listener.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, @@ -262,7 +263,7 @@ private[hive] class SparkExecuteStatementOperation( logWarning(s"Ignore exception in terminal state with $statementId: $e") return } else { - logError(s"Error executing query, currentState $currentState, ", e) + logError(s"Error executing query with $statementId, currentState $currentState, ", e) setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) @@ -272,9 +273,8 @@ private[hive] class SparkExecuteStatementOperation( throw new HiveSQLException("Error running query: " + e.toString, e) } } - } - synchronized { - if (!getStatus.getState.isTerminal) { + } finally { + synchronized { setState(OperationState.FINISHED) HiveThriftServer2.listener.onStatementFinish(statementId) } @@ -284,7 +284,7 @@ private[hive] class SparkExecuteStatementOperation( override def cancel(): Unit = { synchronized { if (!getStatus.getState.isTerminal) { - logInfo(s"Cancel '$statement' with $statementId") + logInfo(s"Cancel query with $statementId") cleanup(OperationState.CANCELED) HiveThriftServer2.listener.onStatementCanceled(statementId) } From 8b84e04fe007e6d7901a10792ac73918434616a6 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 3 Sep 2019 17:34:54 +0800 Subject: [PATCH 23/30] fix scala style and concurence problem --- .../hive/thriftserver/HiveThriftServer2.scala | 26 ++++++++++++++++--- .../SparkExecuteStatementOperation.scala | 14 ++++++---- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6774a3c9bbde..1b239e247d9f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -181,9 +181,26 @@ object HiveThriftServer2 extends Logging { private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT) private var totalRunning = 0 - def getOnlineSessionNum: Int = synchronized { onlineSessionNum } + def getOnlineSessionNum: Int = synchronized { + sessionList.count(_._2.finishTimestamp == 0) + } + + def isExecutionActive(execInfo: ExecutionInfo): Boolean = { + !(execInfo.state == ExecutionState.FAILED || + execInfo.state == ExecutionState.CANCELED || + execInfo.state == ExecutionState.CLOSED) + } - def getTotalRunning: Int = synchronized { totalRunning } + /** + * When an error or a cancellation occurs, we set the finishTimestamp of the statement. + * Therefore, when we count the number of running statements, we need to exclude errors and + * cancellations and count all statements that have not been closed so far. + */ + def getTotalRunning: Int = synchronized { + executionList.count { + case (_, v) => isExecutionActive(v) + } + } def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq } @@ -247,7 +264,10 @@ object HiveThriftServer2 extends Logging { } - def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = synchronized { + def onStatementError( + id: String, + errorMessage: String, + errorTrace: String): Unit = synchronized { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).detail = errorMessage executionList(id).state = ExecutionState.FAILED diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index c8538ff1d03e..dbadc2528a6b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -183,9 +183,6 @@ private[hive] class SparkExecuteStatementOperation( override def run(): Unit = { registerCurrentOperationLog() try { - if (getStatus.getState.isTerminal) { - return - } execute() } catch { case e: HiveSQLException => @@ -225,8 +222,15 @@ private[hive] class SparkExecuteStatementOperation( private def execute(): Unit = withSchedulerPool { try { - logInfo(s"Running query '$statement' with $statementId") - setState(OperationState.RUNNING) + synchronized { + if (getStatus.getState.isTerminal) { + logInfo(s"Query with $statementId in terminal state before it started running") + return + } else { + logInfo(s"Running query with $statementId") + setState(OperationState.RUNNING) + } + } // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) From ff5ac961d04eff03826c88b3e97959fcc3b91055 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 3 Sep 2019 18:30:14 +0800 Subject: [PATCH 24/30] clear job group in same thread --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index dbadc2528a6b..81b6f511afd8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -74,7 +74,6 @@ private[hive] class SparkExecuteStatementOperation( // RDDs will be cleaned automatically upon garbage collection. logDebug(s"CLOSING $statementId") cleanup(OperationState.CLOSED) - sqlContext.sparkContext.clearJobGroup() HiveThriftServer2.listener.onOperationClosed(statementId) } @@ -282,6 +281,7 @@ private[hive] class SparkExecuteStatementOperation( setState(OperationState.FINISHED) HiveThriftServer2.listener.onStatementFinish(statementId) } + sqlContext.sparkContext.clearJobGroup() } } From 28174bdbb658ace891a6d47f476bcbe3dc57ad82 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 3 Sep 2019 18:33:22 +0800 Subject: [PATCH 25/30] remove all the totalRunning and onlineSessionNum vars --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 1b239e247d9f..d5960ecda14e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -174,12 +174,10 @@ object HiveThriftServer2 extends Logging { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { server.stop() } - private var onlineSessionNum: Int = 0 private val sessionList = new mutable.LinkedHashMap[String, SessionInfo] private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo] private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT) private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT) - private var totalRunning = 0 def getOnlineSessionNum: Int = synchronized { sessionList.count(_._2.finishTimestamp == 0) @@ -225,14 +223,12 @@ object HiveThriftServer2 extends Logging { synchronized { val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName) sessionList.put(sessionId, info) - onlineSessionNum += 1 trimSessionIfNecessary() } } def onSessionClosed(sessionId: String): Unit = synchronized { sessionList(sessionId).finishTimestamp = System.currentTimeMillis - onlineSessionNum -= 1 trimSessionIfNecessary() } @@ -248,7 +244,6 @@ object HiveThriftServer2 extends Logging { trimExecutionIfNecessary() sessionList(sessionId).totalExecution += 1 executionList(id).groupId = groupId - totalRunning += 1 } def onStatementParsed(id: String, executionPlan: String): Unit = synchronized { @@ -259,7 +254,6 @@ object HiveThriftServer2 extends Logging { def onStatementCanceled(id: String): Unit = synchronized { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).state = ExecutionState.CANCELED - totalRunning -= 1 trimExecutionIfNecessary() } @@ -271,24 +265,18 @@ object HiveThriftServer2 extends Logging { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).detail = errorMessage executionList(id).state = ExecutionState.FAILED - totalRunning -= 1 trimExecutionIfNecessary() } def onStatementFinish(id: String): Unit = synchronized { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).state = ExecutionState.FINISHED - totalRunning -= 1 trimExecutionIfNecessary() } def onOperationClosed(id: String): Unit = synchronized { executionList(id).closeTimestamp = System.currentTimeMillis - val lastState = executionList(id).state executionList(id).state = ExecutionState.CLOSED - if (lastState == ExecutionState.STARTED || lastState == ExecutionState.COMPILED) { - totalRunning -= 1 - } } private def trimExecutionIfNecessary() = { From 1cbf7cc7de2e4a0b4647e05c6c83ca12a7bf7d1e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 3 Sep 2019 18:38:28 +0800 Subject: [PATCH 26/30] add onStatementError for background case --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 81b6f511afd8..11cd19abb756 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -208,12 +208,17 @@ private[hive] class SparkExecuteStatementOperation( setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => + logError("Error submit query in background, query rejected", rejected) setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, rejected.getMessage, SparkUtils.exceptionString(rejected)) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) throw new HiveSQLException(e) } } From 61c9c733f4ca2fc0f7fa45ecbf36918d6cf73e5a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 3 Sep 2019 21:54:02 +0800 Subject: [PATCH 27/30] fix code style --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 7 ++----- .../SparkExecuteStatementOperation.scala | 12 ++++++------ 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index d5960ecda14e..5a81d5306fd2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -258,12 +258,9 @@ object HiveThriftServer2 extends Logging { } - def onStatementError( - id: String, - errorMessage: String, - errorTrace: String): Unit = synchronized { + def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized { executionList(id).finishTimestamp = System.currentTimeMillis - executionList(id).detail = errorMessage + executionList(id).detail = errorMsg executionList(id).state = ExecutionState.FAILED trimExecutionIfNecessary() } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 11cd19abb756..0b817ffd7833 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -48,7 +48,7 @@ private[hive] class SparkExecuteStatementOperation( runInBackground: Boolean = true) (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) - with Logging { + with Logging { private var result: DataFrame = _ @@ -208,7 +208,7 @@ private[hive] class SparkExecuteStatementOperation( setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => - logError("Error submit query in background, query rejected", rejected) + logError("Error submitting query in background, query rejected", rejected) setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, rejected.getMessage, SparkUtils.exceptionString(rejected)) @@ -240,7 +240,6 @@ private[hive] class SparkExecuteStatementOperation( Thread.currentThread().setContextClassLoader(executionHiveClassLoader) sqlContext.sparkContext.setJobGroup(statementId, statement) - result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { @@ -269,7 +268,6 @@ private[hive] class SparkExecuteStatementOperation( if (currentState.isTerminal) { // This may happen if the execution was cancelled, and then closed from another thread. logWarning(s"Ignore exception in terminal state with $statementId: $e") - return } else { logError(s"Error executing query with $statementId, currentState $currentState, ", e) setState(OperationState.ERROR) @@ -283,8 +281,10 @@ private[hive] class SparkExecuteStatementOperation( } } finally { synchronized { - setState(OperationState.FINISHED) - HiveThriftServer2.listener.onStatementFinish(statementId) + if (!getStatus.getState.isTerminal) { + setState(OperationState.FINISHED) + HiveThriftServer2.listener.onStatementFinish(statementId) + } } sqlContext.sparkContext.clearJobGroup() } From f720963fded77661ad2bb912b2dc78ca1afe03f0 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 3 Sep 2019 21:56:13 +0800 Subject: [PATCH 28/30] to do all the initialization bookkeeping first. --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 0b817ffd7833..f3d02dcdb2aa 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -158,7 +158,6 @@ private[hive] class SparkExecuteStatementOperation( override def runInternal(): Unit = { setState(OperationState.PENDING) - setHasResultSet(true) // avoid no resultset for async run statementId = UUID.randomUUID().toString logInfo(s"Submitting query '$statement' with $statementId") HiveThriftServer2.listener.onStatementStart( @@ -167,6 +166,7 @@ private[hive] class SparkExecuteStatementOperation( statement, statementId, parentSession.getUsername) + setHasResultSet(true) // avoid no resultset for async run if (!runInBackground) { execute() From c8d2ffc244bea810f2abdd47ac5c5e59e72165ff Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 3 Sep 2019 22:10:39 +0800 Subject: [PATCH 29/30] fixlog --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index f3d02dcdb2aa..934a2fdf52d3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -72,7 +72,7 @@ private[hive] class SparkExecuteStatementOperation( override def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - logDebug(s"CLOSING $statementId") + logInfo(s"Closed statement with $statementId") cleanup(OperationState.CLOSED) HiveThriftServer2.listener.onOperationClosed(statementId) } From 536756be89b77d37219b97bbf4358722a214f76d Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 4 Sep 2019 06:45:57 +0800 Subject: [PATCH 30/30] code style fix --- .../apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala | 1 - .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 5a81d5306fd2..36d4ac095e10 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -257,7 +257,6 @@ object HiveThriftServer2 extends Logging { trimExecutionIfNecessary() } - def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).detail = errorMsg diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 934a2fdf52d3..69e85484ccf8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -72,7 +72,7 @@ private[hive] class SparkExecuteStatementOperation( override def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - logInfo(s"Closed statement with $statementId") + logInfo(s"Close statement with $statementId") cleanup(OperationState.CLOSED) HiveThriftServer2.listener.onOperationClosed(statementId) }