diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c325222b764b..76654406373b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -229,6 +229,23 @@ private[spark] class Executor( ManagementFactory.getGarbageCollectorMXBeans.asScala.map(_.getCollectionTime).sum } + /** + * Only in local mode, we have to prevent the driver from setting the active SparkSession + * in the executor threads. See SPARK-23894. + */ + private lazy val clearActiveSparkSessionMethod = if (Utils.isLocalMaster(conf)) { + try { + val cls = Utils.classForName("org.apache.spark.sql.SparkSession") + Some(cls.getMethod("clearActiveSession")) + } catch { + case _: ClassNotFoundException => + // sql not on the classpath, no problem, we don't need to worry about clearing anything + None + } + } else { + None + } + class TaskRunner( execBackend: ExecutorBackend, private val taskDescription: TaskDescription) @@ -299,6 +316,9 @@ private[spark] class Executor( Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") + // When running in local mode, we might end up with the active session from the driver set on + // this thread, though we never should, so we defensively clear it. See SPARK-23894. + clearActiveSparkSessionMethod.foreach(_.invoke(null)) execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 var taskStartCpu: Long = 0