Skip to content

Commit 4cd4dbe

Browse files
Dong Wangjeanlyn
authored andcommitted
[SPARK-6964] [SQL] Support Cancellation in the Thrift Server
Support runInBackground in SparkExecuteStatementOperation, and add cancellation Author: Dong Wang <[email protected]> Closes apache#6207 from dongwang218/SPARK-6964-jdbc-cancel and squashes the following commits: 687c113 [Dong Wang] fix 100 characters 7bfa2a7 [Dong Wang] fix merge 380480f [Dong Wang] fix for liancheng's comments eb3e385 [Dong Wang] small nit 341885b [Dong Wang] small fix 3d8ebf8 [Dong Wang] add spark.sql.hive.thriftServer.async flag 04142c3 [Dong Wang] set SQLSession for async execution 184ec35 [Dong Wang] keep hive conf 819ae03 [Dong Wang] [SPARK-6964][SQL][WIP] Support Cancellation in the Thrift Server
1 parent 3e5e3e3 commit 4cd4dbe

File tree

5 files changed

+208
-16
lines changed

5 files changed

+208
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
916916
tlSession.remove()
917917
}
918918

919+
protected[sql] def setSession(session: SQLSession): Unit = {
920+
detachSession()
921+
tlSession.set(session)
922+
}
923+
919924
protected[sql] class SQLSession {
920925
// Note that this is a lazy val so we can override the default value in subclasses.
921926
protected[sql] lazy val conf: SQLConf = new SQLConf

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala

Lines changed: 152 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,23 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20+
import java.security.PrivilegedExceptionAction
2021
import java.sql.{Date, Timestamp}
22+
import java.util.concurrent.RejectedExecutionException
2123
import java.util.{Map => JMap, UUID}
2224

25+
import scala.collection.JavaConversions._
26+
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
27+
import scala.util.control.NonFatal
28+
29+
import org.apache.hadoop.hive.conf.HiveConf
2330
import org.apache.hadoop.hive.metastore.api.FieldSchema
2431
import org.apache.hive.service.cli._
32+
import org.apache.hadoop.hive.ql.metadata.Hive
33+
import org.apache.hadoop.hive.ql.metadata.HiveException
34+
import org.apache.hadoop.hive.ql.session.SessionState
35+
import org.apache.hadoop.hive.shims.ShimLoader
36+
import org.apache.hadoop.security.UserGroupInformation
2537
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
2638
import org.apache.hive.service.cli.session.HiveSession
2739

@@ -31,26 +43,26 @@ import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
3143
import org.apache.spark.sql.types._
3244
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
3345

34-
import scala.collection.JavaConversions._
35-
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
3646

3747
private[hive] class SparkExecuteStatementOperation(
3848
parentSession: HiveSession,
3949
statement: String,
4050
confOverlay: JMap[String, String],
4151
runInBackground: Boolean = true)
4252
(hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String])
43-
// NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
44-
extends ExecuteStatementOperation(parentSession, statement, confOverlay, false)
53+
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
4554
with Logging {
4655

4756
private var result: DataFrame = _
4857
private var iter: Iterator[SparkRow] = _
4958
private var dataTypes: Array[DataType] = _
59+
private var statementId: String = _
5060

5161
def close(): Unit = {
5262
// RDDs will be cleaned automatically upon garbage collection.
53-
logDebug("CLOSING")
63+
hiveContext.sparkContext.clearJobGroup()
64+
logDebug(s"CLOSING $statementId")
65+
cleanup(OperationState.CLOSED)
5466
}
5567

5668
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
@@ -114,20 +126,84 @@ private[hive] class SparkExecuteStatementOperation(
114126
}
115127

116128
def getResultSetSchema: TableSchema = {
117-
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
118-
if (result.queryExecution.analyzed.output.size == 0) {
129+
if (result == null || result.queryExecution.analyzed.output.size == 0) {
119130
new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
120131
} else {
132+
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
121133
val schema = result.queryExecution.analyzed.output.map { attr =>
122134
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
123135
}
124136
new TableSchema(schema)
125137
}
126138
}
127139

128-
def run(): Unit = {
129-
val statementId = UUID.randomUUID().toString
130-
logInfo(s"Running query '$statement'")
140+
override def run(): Unit = {
141+
setState(OperationState.PENDING)
142+
setHasResultSet(true) // avoid no resultset for async run
143+
144+
if (!runInBackground) {
145+
runInternal()
146+
} else {
147+
val parentSessionState = SessionState.get()
148+
val hiveConf = getConfigForOperation()
149+
val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
150+
val sessionHive = getCurrentHive()
151+
val currentSqlSession = hiveContext.currentSession
152+
153+
// Runnable impl to call runInternal asynchronously,
154+
// from a different thread
155+
val backgroundOperation = new Runnable() {
156+
157+
override def run(): Unit = {
158+
val doAsAction = new PrivilegedExceptionAction[Object]() {
159+
override def run(): Object = {
160+
161+
// User information is part of the metastore client member in Hive
162+
hiveContext.setSession(currentSqlSession)
163+
Hive.set(sessionHive)
164+
SessionState.setCurrentSessionState(parentSessionState)
165+
try {
166+
runInternal()
167+
} catch {
168+
case e: HiveSQLException =>
169+
setOperationException(e)
170+
log.error("Error running hive query: ", e)
171+
}
172+
return null
173+
}
174+
}
175+
176+
try {
177+
ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction)
178+
} catch {
179+
case e: Exception =>
180+
setOperationException(new HiveSQLException(e))
181+
logError("Error running hive query as user : " +
182+
sparkServiceUGI.getShortUserName(), e)
183+
}
184+
}
185+
}
186+
try {
187+
// This submit blocks if no background threads are available to run this operation
188+
val backgroundHandle =
189+
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
190+
setBackgroundHandle(backgroundHandle)
191+
} catch {
192+
case rejected: RejectedExecutionException =>
193+
setState(OperationState.ERROR)
194+
throw new HiveSQLException("The background threadpool cannot accept" +
195+
" new task for execution, please retry the operation", rejected)
196+
case NonFatal(e) =>
197+
logError(s"Error executing query in background", e)
198+
setState(OperationState.ERROR)
199+
throw e
200+
}
201+
}
202+
}
203+
204+
private def runInternal(): Unit = {
205+
statementId = UUID.randomUUID().toString
206+
logInfo(s"Running query '$statement' with $statementId")
131207
setState(OperationState.RUNNING)
132208
HiveThriftServer2.listener.onStatementStart(
133209
statementId,
@@ -159,18 +235,82 @@ private[hive] class SparkExecuteStatementOperation(
159235
}
160236
}
161237
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
162-
setHasResultSet(true)
163238
} catch {
239+
case e: HiveSQLException =>
240+
if (getStatus().getState() == OperationState.CANCELED) {
241+
return
242+
} else {
243+
setState(OperationState.ERROR);
244+
throw e
245+
}
164246
// Actually do need to catch Throwable as some failures don't inherit from Exception and
165247
// HiveServer will silently swallow them.
166248
case e: Throwable =>
249+
val currentState = getStatus().getState()
250+
logError(s"Error executing query, currentState $currentState, ", e)
167251
setState(OperationState.ERROR)
168252
HiveThriftServer2.listener.onStatementError(
169253
statementId, e.getMessage, e.getStackTraceString)
170-
logError("Error executing query:", e)
171254
throw new HiveSQLException(e.toString)
172255
}
173256
setState(OperationState.FINISHED)
174257
HiveThriftServer2.listener.onStatementFinish(statementId)
175258
}
259+
260+
override def cancel(): Unit = {
261+
logInfo(s"Cancel '$statement' with $statementId")
262+
if (statementId != null) {
263+
hiveContext.sparkContext.cancelJobGroup(statementId)
264+
}
265+
cleanup(OperationState.CANCELED)
266+
}
267+
268+
private def cleanup(state: OperationState) {
269+
setState(state)
270+
if (runInBackground) {
271+
val backgroundHandle = getBackgroundHandle()
272+
if (backgroundHandle != null) {
273+
backgroundHandle.cancel(true)
274+
}
275+
}
276+
}
277+
278+
/**
279+
* If there are query specific settings to overlay, then create a copy of config
280+
* There are two cases we need to clone the session config that's being passed to hive driver
281+
* 1. Async query -
282+
* If the client changes a config setting, that shouldn't reflect in the execution
283+
* already underway
284+
* 2. confOverlay -
285+
* The query specific settings should only be applied to the query config and not session
286+
* @return new configuration
287+
* @throws HiveSQLException
288+
*/
289+
private def getConfigForOperation(): HiveConf = {
290+
var sqlOperationConf = getParentSession().getHiveConf()
291+
if (!getConfOverlay().isEmpty() || runInBackground) {
292+
// clone the partent session config for this query
293+
sqlOperationConf = new HiveConf(sqlOperationConf)
294+
295+
// apply overlay query specific settings, if any
296+
getConfOverlay().foreach { case (k, v) =>
297+
try {
298+
sqlOperationConf.verifyAndSet(k, v)
299+
} catch {
300+
case e: IllegalArgumentException =>
301+
throw new HiveSQLException("Error applying statement specific settings", e)
302+
}
303+
}
304+
}
305+
return sqlOperationConf
306+
}
307+
308+
private def getCurrentHive(): Hive = {
309+
try {
310+
return Hive.get()
311+
} catch {
312+
case e: HiveException =>
313+
throw new HiveSQLException("Failed to get current Hive object", e);
314+
}
315+
}
176316
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
4444
confOverlay: JMap[String, String],
4545
async: Boolean): ExecuteStatementOperation = synchronized {
4646

47-
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)(
48-
hiveContext, sessionToActivePool)
47+
val runInBackground = async && hiveContext.hiveThriftServerAsync
48+
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
49+
runInBackground)(hiveContext, sessionToActivePool)
4950
handleToOperation.put(operation.getHandle, operation)
51+
logDebug(s"Created Operation for $statement with session=$parentSession, " +
52+
s"runInBackground=$runInBackground")
5053
operation
5154
}
5255
}

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive.thriftserver
1919

2020
import java.io.File
2121
import java.net.URL
22-
import java.sql.{Date, DriverManager, Statement}
22+
import java.nio.charset.StandardCharsets
23+
import java.sql.{Date, DriverManager, SQLException, Statement}
2324

2425
import scala.collection.mutable.ArrayBuffer
2526
import scala.concurrent.duration._
26-
import scala.concurrent.{Await, Promise}
27+
import scala.concurrent.{Await, Promise, future}
28+
import scala.concurrent.ExecutionContext.Implicits.global
2729
import scala.sys.process.{Process, ProcessLogger}
2830
import scala.util.{Random, Try}
2931

@@ -338,6 +340,42 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
338340
}
339341
)
340342
}
343+
344+
test("test jdbc cancel") {
345+
withJdbcStatement { statement =>
346+
val queries = Seq(
347+
"DROP TABLE IF EXISTS test_map",
348+
"CREATE TABLE test_map(key INT, value STRING)",
349+
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
350+
351+
queries.foreach(statement.execute)
352+
353+
val largeJoin = "SELECT COUNT(*) FROM test_map " +
354+
List.fill(10)("join test_map").mkString(" ")
355+
val f = future { Thread.sleep(100); statement.cancel(); }
356+
val e = intercept[SQLException] {
357+
statement.executeQuery(largeJoin)
358+
}
359+
assert(e.getMessage contains "cancelled")
360+
Await.result(f, 3.minute)
361+
362+
// cancel is a noop
363+
statement.executeQuery("SET spark.sql.hive.thriftServer.async=false")
364+
val sf = future { Thread.sleep(100); statement.cancel(); }
365+
val smallJoin = "SELECT COUNT(*) FROM test_map " +
366+
List.fill(4)("join test_map").mkString(" ")
367+
val rs1 = statement.executeQuery(smallJoin)
368+
Await.result(sf, 3.minute)
369+
rs1.next()
370+
assert(rs1.getInt(1) === math.pow(5, 5))
371+
rs1.close()
372+
373+
val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map")
374+
rs2.next()
375+
assert(rs2.getInt(1) === 5)
376+
rs2.close()
377+
}
378+
}
341379
}
342380

343381
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
144144
getConf("spark.sql.hive.metastore.barrierPrefixes", "")
145145
.split(",").filterNot(_ == "")
146146

147+
/*
148+
* hive thrift server use background spark sql thread pool to execute sql queries
149+
*/
150+
protected[hive] def hiveThriftServerAsync: Boolean =
151+
getConf("spark.sql.hive.thriftServer.async", "true").toBoolean
152+
147153
@transient
148154
protected[sql] lazy val substitutor = new VariableSubstitution()
149155

0 commit comments

Comments
 (0)