Skip to content

Conversation

@szehon-ho
Copy link
Member

What changes were proposed in this pull request?

Introduce runtime partition filtering for SPJ. In planning, we have the list of partition values on both sides to plan the tasks. We can thus filter out partition values based on the join type.

Currently LEFT OUTER, RIGHT OUTER, INNER join types are supported as they are more common, we can optimize other join types in subsequent PR.

Why are the changes needed?

In some common join types (INNER, LEFT, RIGHT), we have an opportunity to greatly reduce the data scanned in SPJ. For example, a small table joining a larger table by partition key, can prune out most of the partitions of the large table.

There is some similarity with the concept of DPP, but that uses heuristics and this is more exact as SPJ planning requires us anyway to list out both sides partitioning.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New tests in KeyGroupedPartitioningSuite.

@github-actions github-actions bot added the SQL label Jul 19, 2024
@szehon-ho szehon-ho force-pushed the spj_partition_filter branch from 8ebd413 to b9d8855 Compare July 19, 2024 21:55
    ### What changes were proposed in this pull request?

    Introduce runtime partition filtering for SPJ.  In planning, we have the list of partition values on both sides to plan the tasks.  We can thus filter out partition values based on the join type.

    ### Why are the changes needed?

    In some common join types (INNER, LEFT, RIGHT), we have an opportunity to greatly reduce the data scanned in SPJ.  For example, a small table joining a larger table by partition key, can prune out most of the partitions of the large table.

    There is some similarity with the concept of DPP, but that uses heuristics and this is more exact as SPJ planning requires us anyway to list out both sides partitioning.

    ### Does this PR introduce _any_ user-facing change?

    No.

    ### How was this patch tested?

    New tests in KeyGroupedPartitioningSuite.
@szehon-ho szehon-ho force-pushed the spj_partition_filter branch from b9d8855 to 7fd0d08 Compare July 19, 2024 23:40
@szehon-ho
Copy link
Member Author

@sunchao can you take a look when you get a chance? Thanks

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - sorry for the late review @szehon-ho

leftPartitionSet.union(rightPartitionSet)
}
partitionsSet.map(_.row).toSeq.sorted(partitionOrdering)
result.toSeq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need to sort the result partitions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see it is sorted later in the other method now

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @szehon-ho and @cloud-fan .
Merged to master.

@dongjoon-hyun
Copy link
Member

Also, cc @cloud-fan , @viirya , too.

@szehon-ho
Copy link
Member Author

Thank you @sunchao and @dongjoon-hyun for quick review!

leftReducers)
val rightReducers = rightSpec.reducers(leftSpec)
val rightParts = reducePartValues(rightSpec.partitioning.partitionValues,
partitionExprs,
Copy link
Member

@viirya viirya Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionExprs are from left spec. As this goes to reduce on right spec. Though they are compatible, but does it guarantee that right spec's partition expressions have same data types as left spec?

For compatible partition expressions, it is r(t1(x)) = t2(x), or r(t2(x)) = t1(x) by definition. But t1 and t2 still can have different data types, isn't?

It just requires r must be same data type as other side, i.e., r(t1(x)) and t2(x), or r(t2(x)) and t1(x).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you may be right, let me double check this with a test and get back to you.

dongjoon-hyun added a commit that referenced this pull request Aug 13, 2024
### What changes were proposed in this pull request?

This PR aims to regenerate benchmark results (except `ExternalAppendOnlyUnsafeRowArrayBenchmark`) as a preparation for Apache Spark 4.0.0-preview2.

- During the testing, it's observed that `ExternalAppendOnlyUnsafeRowArrayBenchmark` hangs in both CI and local environment. SPARK-49228 is filed for its investigation.

- In addition, `Storage Partition Join`-related benchmark are generated for the following commits.
  - #46265
  - #47426

### Why are the changes needed?

To check the performance regression.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This is generated by
- https://github.com/dongjoon-hyun/spark/actions/runs/10364365815 (Java 17)
- https://github.com/dongjoon-hyun/spark/actions/runs/10364368441 (Java 21)

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47743 from dongjoon-hyun/SPARK-49224.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
Introduce runtime partition filtering for SPJ.  In planning, we have the list of partition values on both sides to plan the tasks.  We can thus filter out partition values based on the join type.

Currently LEFT OUTER, RIGHT OUTER, INNER join types are supported as they are more common, we can optimize other join types in subsequent PR.

  ### Why are the changes needed?

In some common join types (INNER, LEFT, RIGHT), we have an opportunity to greatly reduce the data scanned in SPJ.  For example, a small table joining a larger table by partition key, can prune out most of the partitions of the large table.

There is some similarity with the concept of DPP, but that uses heuristics and this is more exact as SPJ planning requires us anyway to list out both sides partitioning.

  ### Does this PR introduce _any_ user-facing change?

No.

  ### How was this patch tested?

New tests in KeyGroupedPartitioningSuite.

Closes apache#47426 from szehon-ho/spj_partition_filter.

Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants