Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Jun 12, 2024

What changes were proposed in this pull request?

This PR proposes to introduce a marker for isStreaming property in text representation of logical plan.

The marker will be ~, along with ! (invalid) and ' (unresolved).

This PR proposes to retain the prefix marker as single character (opposed to up to two characters). This would be OK in practice, since the moment the marker for isStreaming would be useful is to look into the plan which is already analyzed - that said, it’s unlikely that we need to see the both one of existing marker and the marker for streaming.

Why are the changes needed?

This would help tracking down QO issues happening with streaming query much easier. For example, here is the example of the rule which triggered SPARK-47305:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
 WriteToMicroBatchDataSource MemorySink, 49358516-bb57-4f10-badf-00c9f5e2484b, Append, 1   WriteToMicroBatchDataSource MemorySink, 49358516-bb57-4f10-badf-00c9f5e2484b, Append, 1
 +- Project [value#45]                                                                     +- Project [value#45]
    +- Join Inner                                                                             +- Join Inner
       :- Project [value#45]                                                                     :- Project [value#45]
       :  +- StreamingDataSourceV2ScanRelation[value#45] MemoryStreamDataSource                  :  +- StreamingDataSourceV2ScanRelation[value#45] MemoryStreamDataSource
       +- Project                                                                                +- Project
!         +- Filter false                                                                           +- LocalRelation <empty>, [id#54L]
!            +- Range (1, 5, step=1, splits=Some(2))          

The bug of SPARK-47305 was, LocalRelation in above was "incorrectly" marked as streaming=true where it should be streaming=false. There is no notion of isStreaming flag in the text representation of LocalRelation, hence from the text plan we would never know the rule had a bug. Even though we assume we show the value of isStreaming in LocalRelation, the depth of subtree could be huge in practice and it's not friendly to go down to the leaf node to figure out the isStreaming value of the entire subtree.

After this PR, the above rule information will be changed as below:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
 ~WriteToMicroBatchDataSource MemorySink, dcf936d0-8580-47e9-b898-4fffffc21db5, Append, 1   ~WriteToMicroBatchDataSource MemorySink, dcf936d0-8580-47e9-b898-4fffffc21db5, Append, 1
 +- ~Project [value#45]                                                                     +- ~Project [value#45]
    +- ~Join Inner                                                                             +- ~Join Inner
       :- ~Project [value#45]                                                                     :- ~Project [value#45]
       :  +- StreamingDataSourceV2ScanRelation[value#45] MemoryStreamDataSource                   :  +- StreamingDataSourceV2ScanRelation[value#45] MemoryStreamDataSource
!      +- Project                                                                                 +- ~Project
!         +- Filter false                                                                            +- ~LocalRelation <empty>, [id#54L]
!            +- Range (1, 5, step=1, splits=Some(2))                     

Now it's obvious that isStreaming flag of leaf node had changed. Also, to check the isStreaming flag of children for Join, we just need to look at the first node of subtree for children, instead of going down to leaf nodes.

Does this PR introduce any user-facing change?

Yes, since the textual representation of logical plan will be changed a bit. But it's only applied to the streaming Dataset, and also the textual representation of logical plan is arguably not a public API. (Keeping backward compatibility of the text is technically very hard.)

How was this patch tested?

Existing UTs for regression test on batch and streaming query. For streaming query, this PR updated the golden file to match with the change.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Jun 12, 2024
@HeartSaVioR
Copy link
Contributor Author

cc. @cloud-fan @viirya Please take a look. Thanks!

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jun 12, 2024

DISCLAIMER: @cloud-fan and I had a discussion about how to address the lack of information. This PR is based on the agreement. Thanks @cloud-fan for the valuable input!

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jun 12, 2024

One side note, I see some logical nodes to override the method simpleString, which removes out the chance for marker (statePrefix) to be added. Not sure it is intentional, but maybe better to think through the way to ensure statePrefix is always available for the simpleString, so that the marker is always showing up in logical plan for plan change logger.

Probably even better if we could ensure the property of isStreaming value to be available for verboseString, simpleStringWithNodeId, verboseStringWithOperatorId.

Comment on lines +112 to +115
// Ancestor class could mark something on the prefix, including 'invalid'. Add a marker for
// `streaming` only when there is no marker from ancestor class.
if (prefixFromSuper.isEmpty && isStreaming) {
"~"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we append it instead of using prefixFromSuper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR proposes to retain the prefix marker as single character (opposed to up to two characters). This would be OK in practice, since the moment the marker for isStreaming would be useful is to look into the plan which is already analyzed - that said, it’s unlikely that we need to see the both one of existing marker and the marker for streaming.

But we could reconsider if we have more voices supporting up to two chars for not overwriting. Maybe @cloud-fan ?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like useful feature. It should be also good to explain/understand the query plan to users.

@HeartSaVioR
Copy link
Contributor Author

@viirya Thanks for the reviewing and quick approval! What about your thought of side note?
#46953 (comment)

@viirya
Copy link
Member

viirya commented Jun 13, 2024

@viirya Thanks for the reviewing and quick approval! What about your thought of side note? #46953 (comment)

Yea, it sounds good direction to go. If the added statePrefix may be removed in some logical nodes, it looks like not a stable way to represent a streaming node in the text representation.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in c09039a Jun 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants