Skip to content

Conversation

@grundprinzip
Copy link
Contributor

@grundprinzip grundprinzip commented Feb 17, 2024

What changes were proposed in this pull request?

This patch adss a new mechanism to push query execution progress for batch queries. We add a new response message type and periodically push query progress to the client. The client can consume this data to for example display a progress bar.

This patch adds support for displaying a progress bar in the PySpark shell when started with Spark Connect.

The proto message is defined as follows:

// This message is used to communicate progress about the query progress during the execution.
  // This message is used to communicate progress about the query progress during the execution.
  message ExecutionProgress {
    // Captures the progress of each individual stage.
    repeated StageInfo stages = 1;

    // Captures the currently in progress tasks.
    int64 num_inflight_tasks = 2;

    message StageInfo {
      int64 stage_id = 1;
      int64 num_tasks = 2;
      int64 num_completed_tasks = 3;
      int64 input_bytes_read = 4;
      bool done = 5;
    }
  }

Clients can simply ignore the messages or consume them. On top of that this adds additional capabilities to register a callback for progress tracking to the SparkSession.

handler = lambda **kwargs: print(kwargs)
spark.register_progress_handler(handler)
spark.range(100).collect()
spark.remove_progress_handler(handler)

Example 1

progress_medium_query_multi_stage mp4

Example 2

progress_bar mp4

Why are the changes needed?

Usability and Experience

Does this PR introduce any user-facing change?

When the user opens the PySpark shell with Spark Connect mode, it will use the progress bar by default.

How was this patch tested?

Added new tests.

Was this patch authored or co-authored using generative AI tooling?

No

@grundprinzip
Copy link
Contributor Author

Any chance to get some more feedback here? @HyukjinKwon or @hvanhovell ?

@dtenedor
Copy link
Contributor

cc @ueshin @cloud-fan we need help 🙏

// This message is used to communicate progress about the query progress during the execution.
message ExecutionProgress {
int64 num_tasks = 1;
int64 num_completed_tasks = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this for the current running stage or all stages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Across all stages. It can always be extended later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering how can this be accurate. With AQE we never know what is the number of partitions for the next stage, as re-optimization can happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the progress metrics is not to be accurate into the future but only represent the snapshot of the current state. This means that the number of tasks can be updated when new stages are added or AQE kicks in.

The point is that the number of remaining tasks will converge over time and become stable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

progres

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Just my 2c: I think having any progress bar is much better than none. The standard Spark progress bar has some ups and some downs, definitely having new progress bars appear isn't the most intuitive either. I think it's probably net better than one progress bar that gets longer, but I would much prefer having some progress bar now that we can extend later, perhaps as we get a better sense of how to incorporate AQE and future stages into the UX.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, it's better to hide Spark internals (stages) to end users, and eventually we should only have one progress bar for the query. So the current PR is a good starting point.

However, this server-client protocol needs to be stable and we don't want to change the client frequently to improve the progress reporting. Can we define a minimum set of information we need to send to the client side to display the progress bar? I feel it's better to calculate the percentage at the server side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I refactored the code to avoid closing any doors. I did not change the way the progress bar is displayed. However, I extended the progress message to capture the stage-wise information so other clients can decide independently how to present the information to the end user.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 @cloud-fan what do you think about that? Capture stage-level info in the proto, but keep the display simple for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this is more flexible. The proto message contains all the information and clients can do whatever they want.

@github-actions github-actions bot added the DOCS label Apr 1, 2024
@grundprinzip
Copy link
Contributor Author

@zhengruifeng @HyukjinKwon addressed your comments, please take another look.

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
trackedTags.foreach({ case (_, tracker) =>
if (tracker.jobs.contains(jobEnd.jobId)) {
tracker.dirty.set(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we set the dirty flag when nothing is updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly to make sure that all progress is reported and an update is sent to the client. If you're tracking time between progress messages, every message itself is progress.

@grundprinzip
Copy link
Contributor Author

@HyukjinKwon @zhengruifeng @cloud-fan I addressed the commments, is there additional feedback?

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic seems fine. For the output shape and information, would be great if someone like @cloud-fan @hvanhovell reviews it.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in f6999df Apr 4, 2024
HyukjinKwon added a commit that referenced this pull request Apr 5, 2024
…ckage

### What changes were proposed in this pull request?

This PR is a followup of #45150 that adds the new `shell` module into PyPI package.

### Why are the changes needed?

So PyPI package contains `shell` module.

### Does this PR introduce _any_ user-facing change?

No, the main change has not been released yet.

### How was this patch tested?

The test case will be added at #45870. It was found out during working on that PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45882 from HyukjinKwon/SPARK-47081-followup.

Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
>>> spark.registerProgressHandler(progress_handler)
>>> res = spark.range(10).repartition(1).collect()
3 Stages known, Done: False
3 Stages known, Done: True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is flaky:

File "/__w/spark/spark/python/pyspark/sql/connect/session.py", line 346, in pyspark.sql.connect.session.SparkSession.registerProgressHandler
Failed example:
    res = spark.range(10).repartition(1).collect()
Expected:
    3 Stages known, Done: False
    3 Stages known, Done: True
Got:
    0 Stages known, Done: True

https://github.com/apache/spark/actions/runs/8564043093/job/23470007059.

Let me skip it for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll unflake it. Thanks!

HyukjinKwon added a commit that referenced this pull request Apr 5, 2024
### What changes were proposed in this pull request?

This PR is a followup of #45150 that skips flaky doctests.

### Why are the changes needed?

In order to make the build stable.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

CI in this PR should verify it.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45889 from HyukjinKwon/SPARK-47081-followup2.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Comment on lines -204 to +248
val timeout = Math.max(1, deadlineTimeMillis - System.currentTimeMillis())
// Wake up more frequently to send the progress updates.
val progressTimeout =
executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL)
// If the progress feature is disabled, wait for the deadline.
val timeout = if (progressTimeout > 0) {
progressTimeout
} else {
Math.max(1, deadlineTimeMillis - System.currentTimeMillis())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

            var timeout = Math.max(1, deadlineTimeMillis - System.currentTimeMillis())
            // Wake up more frequently to send the progress updates.
            val progressTimeout =
              executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL)
            if (progressTimeout > 0) {
              Math.min(progressTimeout, timeout)
            }

otherwise, progressTimeout may make us wait beyond the deadline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HyukjinKwon added a commit that referenced this pull request Jan 14, 2025
…ortInterval` over timeout

### What changes were proposed in this pull request?

This PR is a followup that addresses #45150 (comment)

### Why are the changes needed?

To respect `spark.connect.progress.reportInterval`

### Does this PR introduce _any_ user-facing change?

Virtually no. In corner case, it the progress upgrade might take longer than `spark.connect.progress.reportInterval`.

### How was this patch tested?

Manually tested.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49474 from HyukjinKwon/SPARK-47081-followup3.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@virrrat
Copy link

virrrat commented Apr 14, 2025

Is there a plan to port-back this feature to Spark 3.5? Not sure if that will have a dependency on some other features that's not there in Spark 3.5.

@grundprinzip
Copy link
Contributor Author

@virrrat this is not planned and given the Spark 4 release soon, not practical.

Comment on lines +2008 to +2010
Register a progress handler to be called when a progress update is received from the server.
.. versionadded:: 4.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially silly question, but: When you look at the docs for this, it's not obvious that Spark Connect supports this method. Should this be explicitly noted in the docstring somehow? Or are users supposed to assume that everything supports Spark Connect unless explicitly noted otherwise?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another related question: Should there be narrative documentation of ProgressHandler on the monitoring page, or are we happy with it just being tucked away in the API docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a PR with documentation updates would be very much appreciated!

Comment on lines +39 to +45
@dataclass
class StageInfo:
stage_id: int
num_tasks: int
num_completed_tasks: int
num_bytes_read: int
done: bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just learning about this new feature and am potentially interested in expanding on the documentation for it, as it's very useful for people building applications on top of Spark.

One thing I noticed is that a job can be marked as "done" even though the number of completed tasks is less than the number of tasks for one or more stages. I assume this is because the stage was skipped or something else, but this information is not captured in this class, so the resulting progress communicated to the user ends up being a bit misleading and/or noisy.

Is it possible to enhance this somehow with that information (in which case I'm happy to file a ticket), or have I misunderstood this data?

Here's an example from some testing I did. This is the last update I got from my progress handler:

{'stages': [StageInfo(stage_id=37,
                      num_tasks=1,
                      num_completed_tasks=1,
                      num_bytes_read=0,
                      done=True),
            StageInfo(stage_id=29,
                      num_tasks=1,
                      num_completed_tasks=1,
                      num_bytes_read=0,
                      done=True),
            StageInfo(stage_id=33,
                      num_tasks=183,
                      num_completed_tasks=183,
                      num_bytes_read=0,
                      done=True),
            StageInfo(stage_id=35,
                      num_tasks=120,
                      num_completed_tasks=0,
                      num_bytes_read=0,
                      done=False),
            StageInfo(stage_id=31,
                      num_tasks=1,
                      num_completed_tasks=0,
                      num_bytes_read=0,
                      done=False),
            StageInfo(stage_id=32,
                      num_tasks=120,
                      num_completed_tasks=0,
                      num_bytes_read=0,
                      done=False),
            StageInfo(stage_id=34,
                      num_tasks=1,
                      num_completed_tasks=0,
                      num_bytes_read=0,
                      done=False),
            StageInfo(stage_id=36,
                      num_tasks=183,
                      num_completed_tasks=0,
                      num_bytes_read=0,
                      done=False),
            StageInfo(stage_id=30,
                      num_tasks=120,
                      num_completed_tasks=120,
                      num_bytes_read=0,
                      done=True)],
 'inflight_tasks': 0,
 'operation_id': '1a9fbf1d-4a38-4c6b-b730-6c8b49179694',
 'done': True}

Note that the overall status is "done", even though many stages are not themselves done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The progress reporting of the query is meant to provide current information about the query with the progress message continuing until the query is observed as done from the client side.

Due to the way that the Spark event listeners work, it is not guaranteed that all events have been processed until the query is marked as done by the Spark Connect query execution.

This means there are two potential ways the data can be "off":

  1. Skipped / Canceled stages
  2. Events not yet processed.

The goal for the progress report is not to be 100% accurate but indicate to the user what kind of progress the operation is making, for that reason the completed and current task might not be a perfect measure but provide a reasonable approximation that converges to a reasonable progress report.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it makes sense to add the stage_status field to StageInfo to better handle skipped / cancelled stages scenario? Users can consume data based on status

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Spark UI does document skipped stages, but I'm not sure if that information is available before the job is done. I think that's what @grundprinzip is saying.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants