Skip to content

Conversation

@pan3793
Copy link
Member

@pan3793 pan3793 commented Jun 9, 2025

What changes were proposed in this pull request?

Though directly printing messages to stdout/stderr is discouraged, but as a framework, Spark does not forbid users from doing that.

This PR adds a RedirectConsolePlugin to allow Spark to redirect stdout/stderr to the logging system (Log4J2 via Slf4J API).

Apache Flink also has the same capability, see FLINK-31234

Why are the changes needed?

This is especially useful for integrating with external logging services, for example, use Kafka Log4J appender to send logs to Kafka, then drain to ElasticSearch.

Does this PR introduce any user-facing change?

Yes, new feature.

How was this patch tested?

I don't write UT as it will affect other cases run in the same JVM, I test it manually.

$ cp conf/log4j2.properties.template conf/log4j2.properties
$ cat >> conf/log4j2.properties <<EOF
logger.stdout.name = stdout
logger.stdout.level = info
EOF

$ bin/spark-shell --conf spark.plugins=org.apache.spark.deploy.RedirectConsolePlugin
WARNING: Using incubator modules: jdk.incubator.vector
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
      /_/
         
Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.13)
Type in expressions to have them evaluated.
Type :help for more information.
25/06/13 17:28:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://10.242.159.140:4040
Spark context available as 'sc' (master = local[*], app id = local-1749806903960).
Spark session available as 'spark'.

scala> System.out.println("driver stdout")
     | System.err.println("driver stderr")
     | sc.parallelize(Array(1, 2, 3)).foreach { x =>
     |   System.out.println(s"executor stdout: $x")
     |   System.err.println(s"executor stderr: $x")
     | }
warning: 1 deprecation (since 2.13.0); for details, enable `:setting -deprecation` or `:replay -deprecation`
25/06/13 17:28:50 INFO stdout: driver stdout
25/06/13 17:28:50 ERROR stderr: driver stderr
25/06/13 17:28:50 INFO stdout: executor stdout: 2
25/06/13 17:28:50 ERROR stderr: executor stderr: 2
25/06/13 17:28:50 INFO stdout: executor stdout: 1
25/06/13 17:28:50 ERROR stderr: executor stderr: 1
25/06/13 17:28:50 INFO stdout: executor stdout: 3
25/06/13 17:28:50 ERROR stderr: executor stderr: 3

scala>

Console progress bar (without console redirection)

$ bin/spark-shell
WARNING: Using incubator modules: jdk.incubator.vector
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
      /_/
         
Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.13)
Type in expressions to have them evaluated.
Type :help for more information.
25/06/13 19:49:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://10.242.159.140:4040
Spark context available as 'sc' (master = local[*], app id = local-1749815345805).
Spark session available as 'spark'.

scala> import org.apache.spark._
     | 
     | System.out.println("driver stdout")
     | System.err.println("driver stderr")
     | sc.parallelize(Array(1, 2, 3)).foreach { x =>
     |   Thread.sleep(TaskContext.getPartitionId() * 10000)
     |   System.out.println(s"executor stdout: $x")
     |   System.err.println(s"executor stderr: $x")
     | }
warning: 1 deprecation (since 2.13.0); for details, enable `:setting -deprecation` or `:replay -deprecation`
driver stdout
driver stderr
executor stdout: 1===============================>                 (7 + 3) / 10]
executor stderr: 1
executor stdout: 2=====================================>           (8 + 2) / 10]
executor stderr: 2
executor stdout: 3===========================================>     (9 + 1) / 10]
executor stderr: 3
import org.apache.spark._                                                       

scala>

Console progress bar (with console redirection)

$ bin/spark-shell --conf spark.plugins=org.apache.spark.deploy.RedirectConsolePlugin
WARNING: Using incubator modules: jdk.incubator.vector
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
      /_/
         
Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.13)
Type in expressions to have them evaluated.
Type :help for more information.
25/06/13 19:54:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/13 19:54:14 WARN DriverRedirectConsolePlugin: Redirect driver's stderr to logging system may affect console progress bar, consider disabling either spark.driver.log.redirectConsole.enabled or spark.ui.showConsoleProgress.
Spark context Web UI available at http://10.242.159.140:4040
Spark context available as 'sc' (master = local[*], app id = local-1749815654458).
Spark session available as 'spark'.

scala> import org.apache.spark._
     | 
     | System.out.println("driver stdout")
     | System.err.println("driver stderr")
     | sc.parallelize(Array(1, 2, 3)).foreach { x =>
     |   Thread.sleep(TaskContext.getPartitionId() * 10000)
     |   System.out.println(s"executor stdout: $x")
     |   System.err.println(s"executor stderr: $x")
     | }
warning: 1 deprecation (since 2.13.0); for details, enable `:setting -deprecation` or `:replay -deprecation`
25/06/13 19:54:22 INFO stdout: driver stdout
25/06/13 19:54:22 ERROR stderr: driver stderr
25/06/13 19:54:52 INFO stdout: executor stdout: 1>                 (7 + 3) / 10]
25/06/13 19:54:52 ERROR stderr: executor stderr: 1
25/06/13 19:55:22 INFO stdout: executor stdout: 2======>           (8 + 2) / 10]
25/06/13 19:55:22 ERROR stderr: executor stderr: 2
25/06/13 19:55:52 INFO stdout: executor stdout: 3============>     (9 + 1) / 10]
25/06/13 19:55:52 ERROR stderr: executor stderr: 3
import org.apache.spark._                                                       

scala> 

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Jun 9, 2025
@pan3793 pan3793 marked this pull request as ready for review June 9, 2025 12:10
@pan3793
Copy link
Member Author

pan3793 commented Jun 9, 2025

cc @gengliangwang @LuciferYang

"`org.apache.spark.deploy.ConsoleRedirectPlugin`.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.createWithDefault(false)
.createWithDefault(true)

"`org.apache.spark.deploy.ConsoleRedirectPlugin`.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.createWithDefault(false)
.createWithDefault(true)

"`org.apache.spark.deploy.ConsoleRedirectPlugin`.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.createWithDefault(false)
.createWithDefault(true)

"`org.apache.spark.deploy.ConsoleRedirectPlugin`.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.createWithDefault(false)
.createWithDefault(true)

@yaooqinn
Copy link
Member

Shall we simplify the implementation by:

  • Enable by default in configurations as it's controlled by adding the plugin first
  • Reduce the number of configurations

@pan3793
Copy link
Member Author

pan3793 commented Jun 13, 2025

@yaooqinn thanks for reviewing, made changes based on your suggestion:

  1. simplify the configuration from 4 to 2, spark.[driver|executor].log.redirectConsole.enabled;
  2. change the above configs default value to true;
  3. rename ConsoleRedirectPlugin to RedirectConsolePlugin for consistency.

@yaooqinn
Copy link
Member

Can we also have a unit test or manual test with the console progress bar which supports inline updates for stage professes instead of printing a wall of logs

// Delay to show up a progress bar, in milliseconds
private val firstDelayMSec = 500L
// Get the stderr (which is console for spark-shell) before installing RedirectConsolePlugin
private val console = System.err
Copy link
Member Author

@pan3793 pan3793 Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaooqinn then it does not affect shell console progress bar, I also pasted the manual test result in PR description.

if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic")

private[spark] val DRIVER_REDIRECT_CONSOLE_TO_LOG_ENABLED =
ConfigBuilder("spark.driver.log.redirectConsole.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we can still configure separately for stderr/stdout if we have a config intaking a subset of {stderr, stdout}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, refactored the config to accept a list of console output kinds
spark.driver.log.redirectConsoleOutputs

.transform(_.toLowerCase(Locale.ROOT))
.toSequence
.checkValue(v => v.forall(Set("stdout", "stderr").contains),
"The value only can be one or more of 'stdout, stderr'.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the user-facing error message would be like

org.apache.spark.SparkIllegalArgumentException: [INVALID_CONF_VALUE.REQUIREMENT] The value 'stdin' in the config "spark.driver.log.redirectConsoleOutputs" is invalid. The value only can be one or more of 'stdout, stderr'. SQLSTATE: 22022

@yaooqinn yaooqinn closed this in e8d6f54 Jun 19, 2025
@yaooqinn
Copy link
Member

Merged to master. thank you @pan3793

dongjoon-hyun added a commit that referenced this pull request Oct 15, 2025
…oleOutputs`

### What changes were proposed in this pull request?

This PR aims to rename `spark.executor.log.redirectConsoleOutputs` to `spark.executor.logs.redirectConsoleOutputs` to be consistent with the existing executor log configurations like the following.

```
spark.executor.logs.rolling.strategy
spark.executor.logs.rolling.time.interval
spark.executor.logs.rolling.maxSize
spark.executor.logs.rolling.maxRetainedFiles
spark.executor.logs.rolling.enableCompression
spark.executor.logs.redirectConsoleOutputs
```

### Why are the changes needed?

Although SPARK-52426 introduced two configurations consistently in the PR, Apache Spark has different namespaces for Driver and Executor which are `spark.driver.log.*` and `spark.executor.logs.*` respectively. Instead of introducing a new config namespace, `spark.executor.log`, for this single new configuration, we had better follow the existing naming rule.
- #51130

```
spark.driver.log.redirectConsoleOutputs
spark.executor.log.redirectConsoleOutputs
```

### Does this PR introduce _any_ user-facing change?

No this is not released yet.

### How was this patch tested?

Pass the CIs. Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52624 from dongjoon-hyun/SPARK-53923.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun added a commit that referenced this pull request Oct 16, 2025
### What changes were proposed in this pull request?

This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation.

### Why are the changes needed?

To help the users use new features easily.

- #47856
- #51130
- #51163
- #51604
- #51630
- #51708
- #51885
- #52091
- #52382

### Does this PR introduce _any_ user-facing change?

No behavior change because this is a documentation update.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52626 from dongjoon-hyun/SPARK-53926.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…oleOutputs`

### What changes were proposed in this pull request?

This PR aims to rename `spark.executor.log.redirectConsoleOutputs` to `spark.executor.logs.redirectConsoleOutputs` to be consistent with the existing executor log configurations like the following.

```
spark.executor.logs.rolling.strategy
spark.executor.logs.rolling.time.interval
spark.executor.logs.rolling.maxSize
spark.executor.logs.rolling.maxRetainedFiles
spark.executor.logs.rolling.enableCompression
spark.executor.logs.redirectConsoleOutputs
```

### Why are the changes needed?

Although SPARK-52426 introduced two configurations consistently in the PR, Apache Spark has different namespaces for Driver and Executor which are `spark.driver.log.*` and `spark.executor.logs.*` respectively. Instead of introducing a new config namespace, `spark.executor.log`, for this single new configuration, we had better follow the existing naming rule.
- apache#51130

```
spark.driver.log.redirectConsoleOutputs
spark.executor.log.redirectConsoleOutputs
```

### Does this PR introduce _any_ user-facing change?

No this is not released yet.

### How was this patch tested?

Pass the CIs. Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52624 from dongjoon-hyun/SPARK-53923.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation.

### Why are the changes needed?

To help the users use new features easily.

- apache#47856
- apache#51130
- apache#51163
- apache#51604
- apache#51630
- apache#51708
- apache#51885
- apache#52091
- apache#52382

### Does this PR introduce _any_ user-facing change?

No behavior change because this is a documentation update.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52626 from dongjoon-hyun/SPARK-53926.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants