From b0c4474e9719e0c0cdc85ceb65683e1181d3cc90 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 9 Jun 2025 17:59:53 +0800 Subject: [PATCH 01/11] [SPARK-52426][CORE] Support redirecting stdout/stderr to logging system --- .../spark/deploy/ConsoleRedirectPlugin.scala | 135 ++++++++++++++++++ .../spark/internal/config/package.scala | 36 +++++ 2 files changed, 171 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala b/core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala new file mode 100644 index 000000000000..d7257425b987 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.util.{Map => JMap} + +import scala.jdk.CollectionConverters._ + +import org.slf4j.LoggerFactory + +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +/** + * A built-in plugin to allow redirecting stdout/stderr to logging system (SLF4J). + */ +class ConsoleRedirectPlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = new DriverConsoleRedirectPlugin() + + // No-op + override def executorPlugin(): ExecutorPlugin = new ExecConsoleRedirectPlugin() +} + +class DriverConsoleRedirectPlugin extends DriverPlugin with Logging { + + override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + if (sc.conf.get(DRIVER_REDIRECT_STDOUT_TO_LOG_ENABLED)) { + logInfo("Redirect driver's stdout to logging system") + val stdoutLogger = LoggerFactory.getLogger("stdout") + System.setOut(new LoggingPrintStream(stdoutLogger.info)) + } + + if (sc.conf.get(DRIVER_REDIRECT_STDERR_TO_LOG_ENABLED)) { + logInfo("Redirect driver's stderr to logging system") + val stderrLogger = LoggerFactory.getLogger("stderr") + System.setErr(new LoggingPrintStream(stderrLogger.error)) + } + Map.empty[String, String].asJava + } +} + +class ExecConsoleRedirectPlugin extends ExecutorPlugin with Logging { + + override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { + if (ctx.conf.get(EXEC_REDIRECT_STDOUT_TO_LOG_ENABLED)) { + logInfo("Redirect executor's stdout to logging system") + val stdoutLogger = LoggerFactory.getLogger("stdout") + System.setOut(new LoggingPrintStream(stdoutLogger.info)) + } + + if (ctx.conf.get(EXEC_REDIRECT_STDERR_TO_LOG_ENABLED)) { + logInfo("Redirect executor's stderr to logging system") + val stderrLogger = LoggerFactory.getLogger("stderr") + System.setErr(new LoggingPrintStream(stderrLogger.error)) + } + } +} + +private[spark] class LoggingPrintStream( + redirect: String => Unit, + lineMaxBytes: Long = 4 * 1024 * 1024) + extends PrintStream(new LineBuffer(lineMaxBytes)) { + + override def write(b: Int): Unit = { + super.write(b) + tryLogCurrentLine() + } + + override def write(buf: Array[Byte], off: Int, len: Int): Unit = { + super.write(buf, off, len) + tryLogCurrentLine() + } + + private def tryLogCurrentLine(): Unit = this.synchronized { + out.asInstanceOf[LineBuffer].tryGenerateContext.foreach { logContext => + redirect(logContext) + } + } +} + +/** + * Cache bytes before line ending. When current line is ended or the bytes size reaches the + * threshold, it can generate the line. + */ +private[spark] object LineBuffer { + private val LF_BYTES = System.lineSeparator.getBytes + private val LF_LENGTH = LF_BYTES.length +} + +private[spark] class LineBuffer(lineMaxBytes: Long) extends ByteArrayOutputStream { + + import LineBuffer._ + + def tryGenerateContext: Option[String] = + if (isLineEnded) { + try Some(new String(buf, 0, count - LF_LENGTH)) finally reset() + } else if (count >= lineMaxBytes) { + try Some(new String(buf, 0, count)) finally reset() + } else { + None + } + + private def isLineEnded: Boolean = { + if (count < LF_LENGTH) return false + // fast return in UNIX-like OS when LF is single char '\n' + if (LF_LENGTH == 1) return LF_BYTES(0) == buf(count - 1) + + var i = 0 + do { + if (LF_BYTES(i) != buf(count - LF_LENGTH + i)) { + return false + } + i = i + 1 + } while (i < LF_LENGTH) + true + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7cb3d068b676..19cd39d05829 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2838,4 +2838,40 @@ package object config { .checkValues(Set("connect", "classic")) .createWithDefault( if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic") + + private[spark] val DRIVER_REDIRECT_STDOUT_TO_LOG_ENABLED = + ConfigBuilder("spark.driver.log.redirectStdout.enabled") + .doc("Whether to redirect the driver's stdout to logging system. " + + s"It only takes affect when `${PLUGINS.key}` is configured with " + + "`org.apache.spark.deploy.ConsoleRedirectPlugin`.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val DRIVER_REDIRECT_STDERR_TO_LOG_ENABLED = + ConfigBuilder("spark.driver.log.redirectStderr.enabled") + .doc("Whether to redirect the driver's stderr to logging system. " + + s"It only takes affect when `${PLUGINS.key}` is configured with " + + "`org.apache.spark.deploy.ConsoleRedirectPlugin`.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val EXEC_REDIRECT_STDOUT_TO_LOG_ENABLED = + ConfigBuilder("spark.executor.log.redirectStdout.enabled") + .doc("Whether to redirect the executor's stdout to logging system. " + + s"It only takes affect when `${PLUGINS.key}` is configured with " + + "`org.apache.spark.deploy.ConsoleRedirectPlugin`.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val EXEC_REDIRECT_STDERR_TO_LOG_ENABLED = + ConfigBuilder("spark.executor.log.redirectStderr.enabled") + .doc("Whether to redirect the executor's stderr to logging system. " + + s"It only takes affect when `${PLUGINS.key}` is configured with " + + "`org.apache.spark.deploy.ConsoleRedirectPlugin`.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) } From 6cf48f7f3d1eeea107bcbf5ba42bafe21811067f Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 9 Jun 2025 18:14:57 +0800 Subject: [PATCH 02/11] nit --- .../scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala b/core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala index d7257425b987..fc7aac24d968 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala @@ -35,7 +35,6 @@ import org.apache.spark.internal.config._ class ConsoleRedirectPlugin extends SparkPlugin { override def driverPlugin(): DriverPlugin = new DriverConsoleRedirectPlugin() - // No-op override def executorPlugin(): ExecutorPlugin = new ExecConsoleRedirectPlugin() } From c66f4d33cec35d09fd0e6477dc374b69b812ae4f Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 13 Jun 2025 17:29:40 +0800 Subject: [PATCH 03/11] address comments --- ...ugin.scala => RedirectConsolePlugin.scala} | 28 +++++--------- .../spark/internal/config/package.scala | 38 +++++-------------- 2 files changed, 20 insertions(+), 46 deletions(-) rename core/src/main/scala/org/apache/spark/deploy/{ConsoleRedirectPlugin.scala => RedirectConsolePlugin.scala} (81%) diff --git a/core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala similarity index 81% rename from core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala rename to core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala index fc7aac24d968..f51230cba722 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ConsoleRedirectPlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala @@ -32,41 +32,33 @@ import org.apache.spark.internal.config._ /** * A built-in plugin to allow redirecting stdout/stderr to logging system (SLF4J). */ -class ConsoleRedirectPlugin extends SparkPlugin { - override def driverPlugin(): DriverPlugin = new DriverConsoleRedirectPlugin() +class RedirectConsolePlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = new DriverRedirectConsolePlugin() - override def executorPlugin(): ExecutorPlugin = new ExecConsoleRedirectPlugin() + override def executorPlugin(): ExecutorPlugin = new ExecRedirectConsolePlugin() } -class DriverConsoleRedirectPlugin extends DriverPlugin with Logging { +class DriverRedirectConsolePlugin extends DriverPlugin with Logging { override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { - if (sc.conf.get(DRIVER_REDIRECT_STDOUT_TO_LOG_ENABLED)) { - logInfo("Redirect driver's stdout to logging system") + if (sc.conf.get(DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED)) { + logInfo("Redirect driver's stdout/stderr to logging system") val stdoutLogger = LoggerFactory.getLogger("stdout") - System.setOut(new LoggingPrintStream(stdoutLogger.info)) - } - - if (sc.conf.get(DRIVER_REDIRECT_STDERR_TO_LOG_ENABLED)) { - logInfo("Redirect driver's stderr to logging system") val stderrLogger = LoggerFactory.getLogger("stderr") + System.setOut(new LoggingPrintStream(stdoutLogger.info)) System.setErr(new LoggingPrintStream(stderrLogger.error)) } Map.empty[String, String].asJava } } -class ExecConsoleRedirectPlugin extends ExecutorPlugin with Logging { +class ExecRedirectConsolePlugin extends ExecutorPlugin with Logging { override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { - if (ctx.conf.get(EXEC_REDIRECT_STDOUT_TO_LOG_ENABLED)) { - logInfo("Redirect executor's stdout to logging system") + if (ctx.conf.get(EXEC_REDIRECT_CONSOLE_TO_LOG_ENABLED)) { + logInfo("Redirect executor's stdout/stdout to logging system") val stdoutLogger = LoggerFactory.getLogger("stdout") System.setOut(new LoggingPrintStream(stdoutLogger.info)) - } - - if (ctx.conf.get(EXEC_REDIRECT_STDERR_TO_LOG_ENABLED)) { - logInfo("Redirect executor's stderr to logging system") val stderrLogger = LoggerFactory.getLogger("stderr") System.setErr(new LoggingPrintStream(stderrLogger.error)) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 19cd39d05829..e8b601fdb411 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2839,39 +2839,21 @@ package object config { .createWithDefault( if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic") - private[spark] val DRIVER_REDIRECT_STDOUT_TO_LOG_ENABLED = - ConfigBuilder("spark.driver.log.redirectStdout.enabled") - .doc("Whether to redirect the driver's stdout to logging system. " + + private[spark] val DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED = + ConfigBuilder("spark.driver.log.redirectConsole.enabled") + .doc("Whether to redirect the driver's stdout/stderr to logging system. " + s"It only takes affect when `${PLUGINS.key}` is configured with " + - "`org.apache.spark.deploy.ConsoleRedirectPlugin`.") + "`org.apache.spark.deploy.RedirectConsolePlugin`.") .version("4.1.0") .booleanConf - .createWithDefault(false) - - private[spark] val DRIVER_REDIRECT_STDERR_TO_LOG_ENABLED = - ConfigBuilder("spark.driver.log.redirectStderr.enabled") - .doc("Whether to redirect the driver's stderr to logging system. " + - s"It only takes affect when `${PLUGINS.key}` is configured with " + - "`org.apache.spark.deploy.ConsoleRedirectPlugin`.") - .version("4.1.0") - .booleanConf - .createWithDefault(false) - - private[spark] val EXEC_REDIRECT_STDOUT_TO_LOG_ENABLED = - ConfigBuilder("spark.executor.log.redirectStdout.enabled") - .doc("Whether to redirect the executor's stdout to logging system. " + - s"It only takes affect when `${PLUGINS.key}` is configured with " + - "`org.apache.spark.deploy.ConsoleRedirectPlugin`.") - .version("4.1.0") - .booleanConf - .createWithDefault(false) + .createWithDefault(true) - private[spark] val EXEC_REDIRECT_STDERR_TO_LOG_ENABLED = - ConfigBuilder("spark.executor.log.redirectStderr.enabled") - .doc("Whether to redirect the executor's stderr to logging system. " + + private[spark] val EXEC_REDIRECT_CONSOLE_TO_LOG_ENABLED = + ConfigBuilder("spark.executor.log.redirectConsole.enabled") + .doc("Whether to redirect the executor's stdout/stderr to logging system. " + s"It only takes affect when `${PLUGINS.key}` is configured with " + - "`org.apache.spark.deploy.ConsoleRedirectPlugin`.") + "`org.apache.spark.deploy.RedirectConsolePlugin`.") .version("4.1.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) } From 94616731a8aded817d499c35c4754f45507b5b8e Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 13 Jun 2025 17:30:52 +0800 Subject: [PATCH 04/11] nit --- .../scala/org/apache/spark/deploy/RedirectConsolePlugin.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala index f51230cba722..b520cac34cbc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala @@ -44,8 +44,8 @@ class DriverRedirectConsolePlugin extends DriverPlugin with Logging { if (sc.conf.get(DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED)) { logInfo("Redirect driver's stdout/stderr to logging system") val stdoutLogger = LoggerFactory.getLogger("stdout") - val stderrLogger = LoggerFactory.getLogger("stderr") System.setOut(new LoggingPrintStream(stdoutLogger.info)) + val stderrLogger = LoggerFactory.getLogger("stderr") System.setErr(new LoggingPrintStream(stderrLogger.error)) } Map.empty[String, String].asJava From 1593faa695657fcc6a344134a3022123f5f20286 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 13 Jun 2025 17:39:51 +0800 Subject: [PATCH 05/11] extract common code --- .../spark/deploy/RedirectConsolePlugin.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala index b520cac34cbc..41998aaaa540 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala @@ -38,15 +38,22 @@ class RedirectConsolePlugin extends SparkPlugin { override def executorPlugin(): ExecutorPlugin = new ExecRedirectConsolePlugin() } +object RedirectConsolePlugin { + + def redirectConsoleToLog(): Unit = { + val stdoutLogger = LoggerFactory.getLogger("stdout") + System.setOut(new LoggingPrintStream(stdoutLogger.info)) + val stderrLogger = LoggerFactory.getLogger("stderr") + System.setErr(new LoggingPrintStream(stderrLogger.error)) + } +} + class DriverRedirectConsolePlugin extends DriverPlugin with Logging { override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { if (sc.conf.get(DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED)) { logInfo("Redirect driver's stdout/stderr to logging system") - val stdoutLogger = LoggerFactory.getLogger("stdout") - System.setOut(new LoggingPrintStream(stdoutLogger.info)) - val stderrLogger = LoggerFactory.getLogger("stderr") - System.setErr(new LoggingPrintStream(stderrLogger.error)) + RedirectConsolePlugin.redirectConsoleToLog() } Map.empty[String, String].asJava } @@ -57,10 +64,7 @@ class ExecRedirectConsolePlugin extends ExecutorPlugin with Logging { override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { if (ctx.conf.get(EXEC_REDIRECT_CONSOLE_TO_LOG_ENABLED)) { logInfo("Redirect executor's stdout/stdout to logging system") - val stdoutLogger = LoggerFactory.getLogger("stdout") - System.setOut(new LoggingPrintStream(stdoutLogger.info)) - val stderrLogger = LoggerFactory.getLogger("stderr") - System.setErr(new LoggingPrintStream(stderrLogger.error)) + RedirectConsolePlugin.redirectConsoleToLog() } } } From 110e7b1d39f49937959c583367700d1056f45655 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 13 Jun 2025 18:09:21 +0800 Subject: [PATCH 06/11] document affect shell console progress bar --- .../spark/deploy/RedirectConsolePlugin.scala | 16 +++++++++++----- .../apache/spark/internal/config/package.scala | 3 ++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala index 41998aaaa540..84d81af1eb7e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala @@ -22,12 +22,12 @@ import java.util.{Map => JMap} import scala.jdk.CollectionConverters._ -import org.slf4j.LoggerFactory - import org.apache.spark.SparkContext import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, SparkLoggerFactory} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.UI.UI_SHOW_CONSOLE_PROGRESS /** * A built-in plugin to allow redirecting stdout/stderr to logging system (SLF4J). @@ -41,9 +41,9 @@ class RedirectConsolePlugin extends SparkPlugin { object RedirectConsolePlugin { def redirectConsoleToLog(): Unit = { - val stdoutLogger = LoggerFactory.getLogger("stdout") + val stdoutLogger = SparkLoggerFactory.getLogger("stdout") System.setOut(new LoggingPrintStream(stdoutLogger.info)) - val stderrLogger = LoggerFactory.getLogger("stderr") + val stderrLogger = SparkLoggerFactory.getLogger("stderr") System.setErr(new LoggingPrintStream(stderrLogger.error)) } } @@ -53,6 +53,12 @@ class DriverRedirectConsolePlugin extends DriverPlugin with Logging { override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { if (sc.conf.get(DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED)) { logInfo("Redirect driver's stdout/stderr to logging system") + if (sc.conf.get(UI_SHOW_CONSOLE_PROGRESS)) { + logWarning(log"Redirect driver's stderr to logging system may affect " + + log"console progress bar, consider disabling either " + + log"${MDC(CONFIG, DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED.key)} or " + + log"${MDC(CONFIG2, UI_SHOW_CONSOLE_PROGRESS.key)}.") + } RedirectConsolePlugin.redirectConsoleToLog() } Map.empty[String, String].asJava diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e8b601fdb411..98faa6a920f8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2843,7 +2843,8 @@ package object config { ConfigBuilder("spark.driver.log.redirectConsole.enabled") .doc("Whether to redirect the driver's stdout/stderr to logging system. " + s"It only takes affect when `${PLUGINS.key}` is configured with " + - "`org.apache.spark.deploy.RedirectConsolePlugin`.") + "`org.apache.spark.deploy.RedirectConsolePlugin`. Note, enabling this " + + "feature may affect the shell console progress bar.") .version("4.1.0") .booleanConf .createWithDefault(true) From ed764883b5051efc79b6cf8d9d4361a2e8dc9f03 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 13 Jun 2025 18:32:15 +0800 Subject: [PATCH 07/11] nit --- .../org/apache/spark/deploy/RedirectConsolePlugin.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala index 84d81af1eb7e..91c2488187c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala @@ -18,9 +18,7 @@ package org.apache.spark.deploy import java.io.{ByteArrayOutputStream, PrintStream} -import java.util.{Map => JMap} - -import scala.jdk.CollectionConverters._ +import java.util.{Collections, Map => JMap} import org.apache.spark.SparkContext import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} @@ -61,7 +59,7 @@ class DriverRedirectConsolePlugin extends DriverPlugin with Logging { } RedirectConsolePlugin.redirectConsoleToLog() } - Map.empty[String, String].asJava + Collections.emptyMap } } From 8ccae63bac6e9df2531ef47f6fb3953a8c0aa0f5 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 13 Jun 2025 20:05:35 +0800 Subject: [PATCH 08/11] Ensure ConsoleProgressBar always print to console stderr --- .../main/scala/org/apache/spark/ui/ConsoleProgressBar.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala index e95eeddbdace..980cd6e541a2 100644 --- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala +++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala @@ -38,6 +38,8 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { private val updatePeriodMSec = sc.conf.get(UI_CONSOLE_PROGRESS_UPDATE_INTERVAL) // Delay to show up a progress bar, in milliseconds private val firstDelayMSec = 500L + // Get the stderr (which is console for spark-shell) before installing RedirectConsolePlugin + private val console = System.err // The width of terminal private val TerminalWidth = sys.env.getOrElse("COLUMNS", "80").toInt @@ -92,7 +94,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed // after idle some time) if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) { - System.err.print(s"$CR$bar$CR") + console.print(s"$CR$bar$CR") lastUpdateTime = now } lastProgressBar = bar @@ -103,7 +105,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { */ private def clear(): Unit = { if (!lastProgressBar.isEmpty) { - System.err.printf(s"$CR${" " * TerminalWidth}$CR") + console.printf(s"$CR${" " * TerminalWidth}$CR") lastProgressBar = "" } } From 28222ec6b39bc901a837aaa5808569ff5fce4a2b Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 13 Jun 2025 20:06:52 +0800 Subject: [PATCH 09/11] revert docs changes --- .../apache/spark/deploy/RedirectConsolePlugin.scala | 10 +--------- .../org/apache/spark/internal/config/package.scala | 3 +-- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala index 91c2488187c7..b8bc5e0e3173 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala @@ -22,10 +22,8 @@ import java.util.{Collections, Map => JMap} import org.apache.spark.SparkContext import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} -import org.apache.spark.internal.{Logging, MDC, SparkLoggerFactory} -import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.{Logging, SparkLoggerFactory} import org.apache.spark.internal.config._ -import org.apache.spark.internal.config.UI.UI_SHOW_CONSOLE_PROGRESS /** * A built-in plugin to allow redirecting stdout/stderr to logging system (SLF4J). @@ -51,12 +49,6 @@ class DriverRedirectConsolePlugin extends DriverPlugin with Logging { override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { if (sc.conf.get(DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED)) { logInfo("Redirect driver's stdout/stderr to logging system") - if (sc.conf.get(UI_SHOW_CONSOLE_PROGRESS)) { - logWarning(log"Redirect driver's stderr to logging system may affect " + - log"console progress bar, consider disabling either " + - log"${MDC(CONFIG, DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED.key)} or " + - log"${MDC(CONFIG2, UI_SHOW_CONSOLE_PROGRESS.key)}.") - } RedirectConsolePlugin.redirectConsoleToLog() } Collections.emptyMap diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 98faa6a920f8..e8b601fdb411 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2843,8 +2843,7 @@ package object config { ConfigBuilder("spark.driver.log.redirectConsole.enabled") .doc("Whether to redirect the driver's stdout/stderr to logging system. " + s"It only takes affect when `${PLUGINS.key}` is configured with " + - "`org.apache.spark.deploy.RedirectConsolePlugin`. Note, enabling this " + - "feature may affect the shell console progress bar.") + "`org.apache.spark.deploy.RedirectConsolePlugin`.") .version("4.1.0") .booleanConf .createWithDefault(true) From 5a3207852e20d60905c9dd47aac382c5ce5059ad Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 13 Jun 2025 20:10:23 +0800 Subject: [PATCH 10/11] nit --- .../org/apache/spark/deploy/RedirectConsolePlugin.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala index b8bc5e0e3173..7331fb76c920 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala @@ -65,10 +65,8 @@ class ExecRedirectConsolePlugin extends ExecutorPlugin with Logging { } } -private[spark] class LoggingPrintStream( - redirect: String => Unit, - lineMaxBytes: Long = 4 * 1024 * 1024) - extends PrintStream(new LineBuffer(lineMaxBytes)) { +private[spark] class LoggingPrintStream(redirect: String => Unit) + extends PrintStream(new LineBuffer(4 * 1024 * 1024)) { override def write(b: Int): Unit = { super.write(b) From a3326a6f4069b9ed8a5edd8793f4faabf8302f5f Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 17 Jun 2025 11:36:57 +0800 Subject: [PATCH 11/11] address comments --- .../spark/deploy/RedirectConsolePlugin.scala | 27 +++++++++---- .../spark/internal/config/package.scala | 38 +++++++++++-------- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala index 7331fb76c920..cc1995a264fe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RedirectConsolePlugin.scala @@ -36,9 +36,12 @@ class RedirectConsolePlugin extends SparkPlugin { object RedirectConsolePlugin { - def redirectConsoleToLog(): Unit = { + def redirectStdoutToLog(): Unit = { val stdoutLogger = SparkLoggerFactory.getLogger("stdout") System.setOut(new LoggingPrintStream(stdoutLogger.info)) + } + + def redirectStderrToLog(): Unit = { val stderrLogger = SparkLoggerFactory.getLogger("stderr") System.setErr(new LoggingPrintStream(stderrLogger.error)) } @@ -47,9 +50,14 @@ object RedirectConsolePlugin { class DriverRedirectConsolePlugin extends DriverPlugin with Logging { override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { - if (sc.conf.get(DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED)) { - logInfo("Redirect driver's stdout/stderr to logging system") - RedirectConsolePlugin.redirectConsoleToLog() + val outputs = sc.conf.get(DRIVER_REDIRECT_CONSOLE_OUTPUTS) + if (outputs.contains("stdout")) { + logInfo("Redirect driver's stdout to logging system.") + RedirectConsolePlugin.redirectStdoutToLog() + } + if (outputs.contains("stderr")) { + logInfo("Redirect driver's stderr to logging system.") + RedirectConsolePlugin.redirectStderrToLog() } Collections.emptyMap } @@ -58,9 +66,14 @@ class DriverRedirectConsolePlugin extends DriverPlugin with Logging { class ExecRedirectConsolePlugin extends ExecutorPlugin with Logging { override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { - if (ctx.conf.get(EXEC_REDIRECT_CONSOLE_TO_LOG_ENABLED)) { - logInfo("Redirect executor's stdout/stdout to logging system") - RedirectConsolePlugin.redirectConsoleToLog() + val outputs = ctx.conf.get(EXEC_REDIRECT_CONSOLE_OUTPUTS) + if (outputs.contains("stdout")) { + logInfo("Redirect executor's stdout to logging system.") + RedirectConsolePlugin.redirectStdoutToLog() + } + if (outputs.contains("stderr")) { + logInfo("Redirect executor's stderr to logging system.") + RedirectConsolePlugin.redirectStderrToLog() } } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e8b601fdb411..2ff0a8cf3646 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2839,21 +2839,29 @@ package object config { .createWithDefault( if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic") - private[spark] val DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED = - ConfigBuilder("spark.driver.log.redirectConsole.enabled") - .doc("Whether to redirect the driver's stdout/stderr to logging system. " + - s"It only takes affect when `${PLUGINS.key}` is configured with " + - "`org.apache.spark.deploy.RedirectConsolePlugin`.") + private[spark] val DRIVER_REDIRECT_CONSOLE_OUTPUTS = + ConfigBuilder("spark.driver.log.redirectConsoleOutputs") + .doc("Comma-separated list of the console output kind for driver that needs to redirect " + + "to logging system. Supported values are `stdout`, `stderr`. It only takes affect when " + + s"`${PLUGINS.key}` is configured with `org.apache.spark.deploy.RedirectConsolePlugin`.") .version("4.1.0") - .booleanConf - .createWithDefault(true) - - private[spark] val EXEC_REDIRECT_CONSOLE_TO_LOG_ENABLED = - ConfigBuilder("spark.executor.log.redirectConsole.enabled") - .doc("Whether to redirect the executor's stdout/stderr to logging system. " + - s"It only takes affect when `${PLUGINS.key}` is configured with " + - "`org.apache.spark.deploy.RedirectConsolePlugin`.") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .toSequence + .checkValue(v => v.forall(Set("stdout", "stderr").contains), + "The value only can be one or more of 'stdout, stderr'.") + .createWithDefault(Seq("stdout", "stderr")) + + private[spark] val EXEC_REDIRECT_CONSOLE_OUTPUTS = + ConfigBuilder("spark.executor.log.redirectConsoleOutputs") + .doc("Comma-separated list of the console output kind for executor that needs to redirect " + + "to logging system. Supported values are `stdout`, `stderr`. It only takes affect when " + + s"`${PLUGINS.key}` is configured with `org.apache.spark.deploy.RedirectConsolePlugin`.") .version("4.1.0") - .booleanConf - .createWithDefault(true) + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .toSequence + .checkValue(v => v.forall(Set("stdout", "stderr").contains), + "The value only can be one or more of 'stdout, stderr'.") + .createWithDefault(Seq("stdout", "stderr")) }