-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26140] Enable custom metrics implementation in shuffle reader #23105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| * shuffle dependency, and all temporary metrics will be merged into the [[ShuffleReadMetrics]] at | ||
| * last. | ||
| */ | ||
| private[spark] class TempShuffleReadMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was moved to TempShuffleReadMetrics
|
Test build #99128 has finished for PR 23105 at commit
|
|
Test build #99131 has finished for PR 23105 at commit
|
|
Test build #99129 has finished for PR 23105 at commit
|
| package org.apache.spark.shuffle | ||
|
|
||
| /** | ||
| * An interface for reporting shuffle information, for each shuffle. This interface assumes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for each shuffle -> for each reducer of a shuffle?
| * All methods have additional Spark visibility modifier to allow public, concrete implementations | ||
| * that still have these methods marked as private[spark]. | ||
| */ | ||
| private[spark] trait ShuffleReadMetricsReporter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we plan to use this interface later on? It's not used in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking just submitted a PR on how to use it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#23128 :)
| endPartition: Int, | ||
| context: TaskContext): ShuffleReader[K, C] | ||
| context: TaskContext, | ||
| metrics: ShuffleMetricsReporter): ShuffleReader[K, C] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we should pass a read metrics reporter here, as this method is getReader which is called by the reducers to read shuffle files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a read metrics here actually. In the write PR this is renamed ShuffleReadMetricsReporter.
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks! Merged to master.
## What changes were proposed in this pull request? In #23105, due to working on two parallel PRs at once, I made the mistake of committing the copy of the PR that used the name ShuffleMetricsReporter for the interface, rather than the appropriate one ShuffleReadMetricsReporter. This patch fixes that. ## How was this patch tested? This should be fine as long as compilation passes. Closes #23147 from rxin/ShuffleReadMetricsReporter. Authored-by: Reynold Xin <[email protected]> Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? This patch defines an internal Spark interface for reporting shuffle metrics and uses that in shuffle reader. Before this patch, shuffle metrics is tied to a specific implementation (using a thread local temporary data structure and accumulators). After this patch, callers that define their own shuffle RDDs can create a custom metrics implementation. With this patch, we would be able to create a better metrics for the SQL layer, e.g. reporting shuffle metrics in the SQL UI, for each exchange operator. Note that I'm separating read side and write side implementations, as they are very different, to simplify code review. Write side change is at apache#23106 ## How was this patch tested? No behavior change expected, as it is a straightforward refactoring. Updated all existing test cases. Closes apache#23105 from rxin/SPARK-26140. Authored-by: Reynold Xin <[email protected]> Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? In apache#23105, due to working on two parallel PRs at once, I made the mistake of committing the copy of the PR that used the name ShuffleMetricsReporter for the interface, rather than the appropriate one ShuffleReadMetricsReporter. This patch fixes that. ## How was this patch tested? This should be fine as long as compilation passes. Closes apache#23147 from rxin/ShuffleReadMetricsReporter. Authored-by: Reynold Xin <[email protected]> Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? This is the write side counterpart to apache#23105 ## How was this patch tested? No behavior change expected, as it is a straightforward refactoring. Updated all existing test cases. Closes apache#23106 from rxin/SPARK-26141. Authored-by: Reynold Xin <[email protected]> Signed-off-by: Reynold Xin <[email protected]>
What changes were proposed in this pull request?
This patch defines an internal Spark interface for reporting shuffle metrics and uses that in shuffle reader. Before this patch, shuffle metrics is tied to a specific implementation (using a thread local temporary data structure and accumulators). After this patch, callers that define their own shuffle RDDs can create a custom metrics implementation.
With this patch, we would be able to create a better metrics for the SQL layer, e.g. reporting shuffle metrics in the SQL UI, for each exchange operator.
Note that I'm separating read side and write side implementations, as they are very different, to simplify code review. Write side change is at #23106
How was this patch tested?
No behavior change expected, as it is a straightforward refactoring. Updated all existing test cases.