Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
pan3793 committed Jun 13, 2025
commit c66f4d33cec35d09fd0e6477dc374b69b812ae4f
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,33 @@ import org.apache.spark.internal.config._
/**
* A built-in plugin to allow redirecting stdout/stderr to logging system (SLF4J).
*/
class ConsoleRedirectPlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = new DriverConsoleRedirectPlugin()
class RedirectConsolePlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = new DriverRedirectConsolePlugin()

override def executorPlugin(): ExecutorPlugin = new ExecConsoleRedirectPlugin()
override def executorPlugin(): ExecutorPlugin = new ExecRedirectConsolePlugin()
}

class DriverConsoleRedirectPlugin extends DriverPlugin with Logging {
class DriverRedirectConsolePlugin extends DriverPlugin with Logging {

override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
if (sc.conf.get(DRIVER_REDIRECT_STDOUT_TO_LOG_ENABLED)) {
logInfo("Redirect driver's stdout to logging system")
if (sc.conf.get(DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED)) {
logInfo("Redirect driver's stdout/stderr to logging system")
val stdoutLogger = LoggerFactory.getLogger("stdout")
System.setOut(new LoggingPrintStream(stdoutLogger.info))
}

if (sc.conf.get(DRIVER_REDIRECT_STDERR_TO_LOG_ENABLED)) {
logInfo("Redirect driver's stderr to logging system")
val stderrLogger = LoggerFactory.getLogger("stderr")
System.setOut(new LoggingPrintStream(stdoutLogger.info))
System.setErr(new LoggingPrintStream(stderrLogger.error))
}
Map.empty[String, String].asJava
}
}

class ExecConsoleRedirectPlugin extends ExecutorPlugin with Logging {
class ExecRedirectConsolePlugin extends ExecutorPlugin with Logging {

override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
if (ctx.conf.get(EXEC_REDIRECT_STDOUT_TO_LOG_ENABLED)) {
logInfo("Redirect executor's stdout to logging system")
if (ctx.conf.get(EXEC_REDIRECT_CONSOLE_TO_LOG_ENABLED)) {
logInfo("Redirect executor's stdout/stdout to logging system")
val stdoutLogger = LoggerFactory.getLogger("stdout")
System.setOut(new LoggingPrintStream(stdoutLogger.info))
}

if (ctx.conf.get(EXEC_REDIRECT_STDERR_TO_LOG_ENABLED)) {
logInfo("Redirect executor's stderr to logging system")
val stderrLogger = LoggerFactory.getLogger("stderr")
System.setErr(new LoggingPrintStream(stderrLogger.error))
}
Expand Down
38 changes: 10 additions & 28 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2839,39 +2839,21 @@ package object config {
.createWithDefault(
if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic")

private[spark] val DRIVER_REDIRECT_STDOUT_TO_LOG_ENABLED =
ConfigBuilder("spark.driver.log.redirectStdout.enabled")
.doc("Whether to redirect the driver's stdout to logging system. " +
private[spark] val DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED =
ConfigBuilder("spark.driver.log.redirectConsole.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we can still configure separately for stderr/stdout if we have a config intaking a subset of {stderr, stdout}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, refactored the config to accept a list of console output kinds
spark.driver.log.redirectConsoleOutputs

.doc("Whether to redirect the driver's stdout/stderr to logging system. " +
s"It only takes affect when `${PLUGINS.key}` is configured with " +
"`org.apache.spark.deploy.ConsoleRedirectPlugin`.")
"`org.apache.spark.deploy.RedirectConsolePlugin`.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val DRIVER_REDIRECT_STDERR_TO_LOG_ENABLED =
ConfigBuilder("spark.driver.log.redirectStderr.enabled")
.doc("Whether to redirect the driver's stderr to logging system. " +
s"It only takes affect when `${PLUGINS.key}` is configured with " +
"`org.apache.spark.deploy.ConsoleRedirectPlugin`.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val EXEC_REDIRECT_STDOUT_TO_LOG_ENABLED =
ConfigBuilder("spark.executor.log.redirectStdout.enabled")
.doc("Whether to redirect the executor's stdout to logging system. " +
s"It only takes affect when `${PLUGINS.key}` is configured with " +
"`org.apache.spark.deploy.ConsoleRedirectPlugin`.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

private[spark] val EXEC_REDIRECT_STDERR_TO_LOG_ENABLED =
ConfigBuilder("spark.executor.log.redirectStderr.enabled")
.doc("Whether to redirect the executor's stderr to logging system. " +
private[spark] val EXEC_REDIRECT_CONSOLE_TO_LOG_ENABLED =
ConfigBuilder("spark.executor.log.redirectConsole.enabled")
.doc("Whether to redirect the executor's stdout/stderr to logging system. " +
s"It only takes affect when `${PLUGINS.key}` is configured with " +
"`org.apache.spark.deploy.ConsoleRedirectPlugin`.")
"`org.apache.spark.deploy.RedirectConsolePlugin`.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)
}