diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 53835d830486..c9ef9f918ffd 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -293,9 +293,8 @@ public void close() { } connectionPool.clear(); - if (workerGroup != null) { + if (workerGroup != null && !workerGroup.isShuttingDown()) { workerGroup.shutdownGracefully(); - workerGroup = null; } } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java index 2aec4a33bbe4..9b76981c31c5 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java @@ -217,4 +217,11 @@ public Iterable> getAll() { assertFalse(c1.isActive()); } } + + @Test(expected = IOException.class) + public void closeFactoryBeforeCreateClient() throws IOException, InterruptedException { + TransportClientFactory factory = context.createClientFactory(); + factory.close(); + factory.createClient(TestUtils.getLocalHost(), server1.getPort()); + } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 419f0ab06515..14895b39b24b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -70,7 +70,7 @@ class SparkEnv ( val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { - private[spark] var isStopped = false + @volatile private[spark] var isStopped = false private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // A general, soft-reference map for metadata needed during HadoopRDD split computation diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 12bc227b341a..d2eb01d20e9a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -629,6 +629,11 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason)) + case t: Throwable if env.isStopped => + // Log the expected exception after executor.stop without stack traces + // see: SPARK-19147 + logError(s"Exception in $taskName (TID $taskId): ${t.getMessage}") + case t: Throwable => // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to