From c3de557ee383f3bb96ab5401db146c4cf2a13124 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 10 Sep 2019 17:44:59 +0800 Subject: [PATCH 1/5] save change --- .../hive/execution/HiveTableScanExec.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5b00e2ebafa4..1b0e5a0c0bb2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,8 +20,11 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.common.JavaUtils +import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -120,6 +123,36 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) + logInfo(s"Test ADD JAR ${SessionState.get()}") + if (SessionState.get() != null) { + logInfo(s"Test ADD JAR ${SessionState.get().getConf.getClassLoader}") + logInfo("Test ADD JAR with SessionState.getConf.getClassLoader") + // scalastyle:off + try { + Class.forName(tableDesc.getSerdeClassName(), true, SessionState.get().getConf.getClassLoader) + } catch { + case e:Exception => + logInfo("Failed Test ADD JAR with SessionState.getConf.getClassLoader") + } + } + + logInfo(s"JavaUtils.getClassLoader => ${JavaUtils.getClassLoader}") + logInfo(s"SessionState.SharedState.jarClssLoader => ${sparkSession.sharedState.jarClassLoader}") + logInfo("Test ADD JAR with sharedState's JarClassloader") + // scalastyle:off + try { + Class.forName(tableDesc.getSerdeClassName(), true, sparkSession.sharedState.jarClassLoader) + } catch { + case e: Exception => + logInfo("Failed Test ADD JAR with sharedState's JarClassloader") + } + logInfo("Test ADD JAR with JavaUtils.getClassLoader") + try { + Class.forName(tableDesc.getSerdeClassName(), true, JavaUtils.getClassLoader) + } catch { + case e: Exception => + logInfo("Failed Test ADD JAR with JavaUtils.getClassLoader") + } val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) From 2cf3153f046a25e38173fad0cddc1c581ed42b93 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 10 Sep 2019 19:05:07 +0800 Subject: [PATCH 2/5] Revert "save change" This reverts commit c3de557ee383f3bb96ab5401db146c4cf2a13124. --- .../hive/execution/HiveTableScanExec.scala | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 1b0e5a0c0bb2..5b00e2ebafa4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,11 +20,8 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.common.JavaUtils -import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -123,36 +120,6 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) - logInfo(s"Test ADD JAR ${SessionState.get()}") - if (SessionState.get() != null) { - logInfo(s"Test ADD JAR ${SessionState.get().getConf.getClassLoader}") - logInfo("Test ADD JAR with SessionState.getConf.getClassLoader") - // scalastyle:off - try { - Class.forName(tableDesc.getSerdeClassName(), true, SessionState.get().getConf.getClassLoader) - } catch { - case e:Exception => - logInfo("Failed Test ADD JAR with SessionState.getConf.getClassLoader") - } - } - - logInfo(s"JavaUtils.getClassLoader => ${JavaUtils.getClassLoader}") - logInfo(s"SessionState.SharedState.jarClssLoader => ${sparkSession.sharedState.jarClassLoader}") - logInfo("Test ADD JAR with sharedState's JarClassloader") - // scalastyle:off - try { - Class.forName(tableDesc.getSerdeClassName(), true, sparkSession.sharedState.jarClassLoader) - } catch { - case e: Exception => - logInfo("Failed Test ADD JAR with sharedState's JarClassloader") - } - logInfo("Test ADD JAR with JavaUtils.getClassLoader") - try { - Class.forName(tableDesc.getSerdeClassName(), true, JavaUtils.getClassLoader) - } catch { - case e: Exception => - logInfo("Failed Test ADD JAR with JavaUtils.getClassLoader") - } val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) From b047e76c64cfd88fd6275a753f99b73a13d38761 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 10 Sep 2019 19:21:19 +0800 Subject: [PATCH 3/5] cancel jobGroup after catch error --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index f246f43435c7..ecf41b94bd4e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -267,6 +267,9 @@ private[hive] class SparkExecuteStatementOperation( // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => + if (statementId != null) { + sqlContext.sparkContext.cancelJobGroup(statementId) + } val currentState = getStatus().getState() if (currentState.isTerminal) { // This may happen if the execution was cancelled, and then closed from another thread. From 202f5eef963820af574bcdfad62da4e00255d8ba Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 12 Sep 2019 23:19:31 +0800 Subject: [PATCH 4/5] add comment --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index ecf41b94bd4e..884965d87ed6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -267,6 +267,10 @@ private[hive] class SparkExecuteStatementOperation( // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => + // When cancel() and close() is called very quickly after the query is started, + // then they may both call cleanup() before Spark Jobs are started. But before background + // task interrupted, it may have start some spark job, so we need to cancel again to + // make sure job was cancelled when background thread was interrupted if (statementId != null) { sqlContext.sparkContext.cancelJobGroup(statementId) } From 232ef816de0dba7690ff9959800ba94ec8f24a5e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 16 Sep 2019 16:04:05 +0800 Subject: [PATCH 5/5] modify comment --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 884965d87ed6..ce3cbc3a2fc4 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -267,7 +267,7 @@ private[hive] class SparkExecuteStatementOperation( // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => - // When cancel() and close() is called very quickly after the query is started, + // When cancel() or close() is called very quickly after the query is started, // then they may both call cleanup() before Spark Jobs are started. But before background // task interrupted, it may have start some spark job, so we need to cancel again to // make sure job was cancelled when background thread was interrupted