Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-6964][SQL][WIP] Support Cancellation in the Thrift Server
  • Loading branch information
Dong Wang committed Jun 5, 2015
commit 819ae03cad57660f38fdee039ff1f953f7fa230a
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,53 @@ package org.apache.spark.sql.hive.thriftserver
import java.sql.{Date, Timestamp}
import java.util.{Map => JMap, UUID}

import java.security.PrivilegedExceptionAction
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID}

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}

import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.hive.service.cli.thrift.TProtocolVersion

import org.apache.spark.Logging
import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}

private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
(hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String])
// NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
extends ExecuteStatementOperation(parentSession, statement, confOverlay, false)
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with Logging {

private var result: DataFrame = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
private var statementId: String = _

def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING")
hiveContext.sparkContext.clearJobGroup()
logDebug(s"CLOSING $statementId")
cleanup(OperationState.CLOSED)
}

def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
Expand Down Expand Up @@ -114,20 +130,76 @@ private[hive] class SparkExecuteStatementOperation(
}

def getResultSetSchema: TableSchema = {
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
if (result.queryExecution.analyzed.output.size == 0) {
if (result == null || result.queryExecution.analyzed.output.size == 0) {
new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
} else {
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
val schema = result.queryExecution.analyzed.output.map { attr =>
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}
new TableSchema(schema)
}
}

def run(): Unit = {
val statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement'")
override def run(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true) // avoid no resultset for async run

if (!runInBackground) {
runInternal()
} else {
val parentSessionState = SessionState.get()
val hiveConf = new HiveConf(getParentSession().getHiveConf())
val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)

// Runnable impl to call runInternal asynchronously,
// from a different thread
val backgroundOperation = new Runnable() {

override def run(): Unit = {
val doAsAction = new PrivilegedExceptionAction[Object]() {
override def run(): Object = {

// User information is part of the metastore client member in Hive
SessionState.setCurrentSessionState(parentSessionState)
try {
runInternal()
} catch {
case e: HiveSQLException =>
setOperationException(e)
log.error("Error running hive query: ", e)
}
return null
}
}

try {
ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction)
} catch {
case e: Exception =>
setOperationException(new HiveSQLException(e))
logError("Error running hive query as user : " +
sparkServiceUGI.getShortUserName(), e)
}
}
}
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR);
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
}
}
}

private def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
HiveThriftServer2.listener.onStatementStart(
statementId,
Expand Down Expand Up @@ -159,18 +231,43 @@ private[hive] class SparkExecuteStatementOperation(
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {
case e: HiveSQLException =>
if (getStatus().getState() == OperationState.CANCELED) {
return
} else {
setState(OperationState.ERROR);
throw e
}
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
val currentState = getStatus().getState()
logError(s"Error executing query, currentState $currentState, :", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, e.getStackTraceString)
logError("Error executing query:", e)
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}

override def cancel(): Unit = {
logInfo(s"Cancel '$statement' with $statementId")
if (statementId != null) {
hiveContext.sparkContext.cancelJobGroup(statementId)
}
cleanup(OperationState.CANCELED)
}

private def cleanup(state: OperationState) {
setState(state)
if (runInBackground) {
val backgroundHandle = getBackgroundHandle()
if (backgroundHandle != null) {
backgroundHandle.cancel(true)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {

val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)(
hiveContext, sessionToActivePool)
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
async)(hiveContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, async=$async")
operation
}
}