-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34892][SS] Introduce MergingSortWithSessionWindowStateIterator sorting input rows and rows in state efficiently #33077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Same here; I marked this as draft as other PRs has to be reviewed and merged earlier. I'll rebase this PR once all other PRs are merged. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
eb433d7 to
83cc78a
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #140306 has finished for PR 33077 at commit
|
|
Test build #140311 has finished for PR 33077 at commit
|
83cc78a to
abab1e8
Compare
|
cc. @viirya @xuanyuanking Please take a look. Thanks! |
|
Thanks @HeartSaVioR. I will review this today. |
|
Test build #140981 has finished for PR 33077 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
… sorting input rows and rows in state efficiently
b540632 to
e4a74a3
Compare
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #140987 has finished for PR 33077 at commit
|
|
Test build #140989 has finished for PR 33077 at commit
|
| private var currentRow: SessionRowInformation = _ | ||
| private var currentStateRow: SessionRowInformation = _ | ||
| private var currentStateIter: Iterator[InternalRow] = _ | ||
| private var currentStateFetchedKey: UnsafeRow = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few suggestions:
currentRow -> currentRowFromInput
currentStateRow -> currentRowFromState
currentStateIter -> sessionIterFromState
currentStateFetchedKey -> currentSessionKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And maybe add a few comments explaining these variables for readability.
| } else { | ||
| // compare | ||
| if (currentRow.keys != currentStateRow.keys) { | ||
| // state row cannot advance to row in input, so state row should be lower |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this case mean, the input iterator advances to new keys other than current sessions from the state? So we should output from current sessions until it ends and retrieves new sessions from the state again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly. We retrieve state rows for the specific key only when there's a new key from input side, so the case is not possible state side advances compared to input side. If the keys differ, there're rows to process in state side. The opposite case is not possible.
| rowAttributes) | ||
|
|
||
| val actual = iter.map(_.copy()).toList | ||
| assert(actual.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm? Why it is empty? If input is empty, doesn't it output sorted sessions in the state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, nvm. I see.
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I have a few suggestions about variable names.
|
I will merge this tomorrow. Thanks @HeartSaVioR |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Please bear with me on merging this one now to unblock the last PR #33081. We can do post-review for any part of session window changes even it's merged during QA period (and even RC period if someone catches defect.) |
… sorting input rows and rows in state efficiently Introduction: this PR is a part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.) ### What changes were proposed in this pull request? This PR introduces MergingSortWithSessionWindowStateIterator, which does "merge sort" between input rows and sessions in state based on group key and session's start time. Note that the iterator does merge sort among input rows and sessions grouped by grouping key. The iterator doesn't provide sessions in state which keys don't exist in input rows. For input rows, the iterator will provide all rows regardless of the existence of matching sessions in state. MergingSortWithSessionWindowStateIterator works on the precondition that given iterator is sorted by "group keys + start time of session window", and the iterator still retains the characteristic of the sort. ### Why are the changes needed? This part is a one of required on implementing SPARK-10816. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. Closes #33077 from HeartSaVioR/SPARK-34892-SPARK-10816-PR-31570-part-4. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]> (cherry picked from commit 12a576f) Signed-off-by: Jungtaek Lim <[email protected]>
|
Test build #141006 has finished for PR 33077 at commit
|
… sorting input rows and rows in state efficiently Introduction: this PR is a part of SPARK-10816 (EventTime based sessionization (session window)). Please refer apache#31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.) ### What changes were proposed in this pull request? This PR introduces MergingSortWithSessionWindowStateIterator, which does "merge sort" between input rows and sessions in state based on group key and session's start time. Note that the iterator does merge sort among input rows and sessions grouped by grouping key. The iterator doesn't provide sessions in state which keys don't exist in input rows. For input rows, the iterator will provide all rows regardless of the existence of matching sessions in state. MergingSortWithSessionWindowStateIterator works on the precondition that given iterator is sorted by "group keys + start time of session window", and the iterator still retains the characteristic of the sort. ### Why are the changes needed? This part is a one of required on implementing SPARK-10816. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. Closes apache#33077 from HeartSaVioR/SPARK-34892-SPARK-10816-PR-31570-part-4. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
Introduction: this PR is a part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)
What changes were proposed in this pull request?
This PR introduces MergingSortWithSessionWindowStateIterator, which does "merge sort" between input rows and sessions in state based on group key and session's start time.
Note that the iterator does merge sort among input rows and sessions grouped by grouping key. The iterator doesn't provide sessions in state which keys don't exist in input rows. For input rows, the iterator will provide all rows regardless of the existence of matching sessions in state.
MergingSortWithSessionWindowStateIterator works on the precondition that given iterator is sorted by "group keys + start time of session window", and the iterator still retains the characteristic of the sort.
Why are the changes needed?
This part is a one of required on implementing SPARK-10816.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT added.