-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21934][CORE] Expose Shuffle Netty memory usage to MetricsSystem #19160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #81540 has finished for PR 19160 at commit
|
|
Jenkins, retest this please. |
|
Test build #81548 has finished for PR 19160 at commit
|
|
Jenkins, retest this please. |
|
Test build #81567 has finished for PR 19160 at commit
|
|
@squito would you please help to review this PR, thanks a lot. |
|
@zsxwing @jiangxb1987 would you please help to review this PR when you have time, thanks a lot. |
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small comments about naming etc. mostly looks good
| if (externalShuffleServiceEnabled) { | ||
| new ShuffleMetricsSource("ExternalShuffle", shuffleClient.shuffleMetrics()) | ||
| } else { | ||
| new ShuffleMetricsSource("NettyBlockTransfer", shuffleClient.shuffleMetrics()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think we really need to distinguish these two cases? whether or not you have the external shuffle service, this memory is still owned by the executor JVM (its really only external on the remote end).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the external shuffle, we only have Transport client in the executor side, while for NettyBlockTransfer each executor will both server as transport client as well as server. So from my thought I explicitly distinguish those two cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I guess I haven't seen enough setups to know how users subscribe to these and whether they'd actually want that distinction. (especially since it seems like some of the distinction probably shouldn't be there, eg. the openBlockLatency metric.) But I don't think there is any clear right answer here, if you think this is the best naming, that is fine with me.
| package org.apache.spark.network.netty | ||
|
|
||
| import java.nio.ByteBuffer | ||
| import java.util |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you are trying to avoid confusion w/ scala's hashmaps, I think our convention is to rename w/ "J" prefix
import java.util.{HashMap => JHashMap}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will change it.
| @ThreadSafe | ||
| private class ExternalShuffleServiceSource | ||
| (blockHandler: ExternalShuffleBlockHandler) extends Source { | ||
| private class ExternalShuffleServiceSource extends Source { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for my own understanding, not directly related to this change --
I hadn't realized that the ExternalShuffleBlockHandler had its own ShuffleMetrics already. Some of those metrics really seem like they should be part of regular shuffle server, in the executor. Eg., openBlockRequestLatencyMillis. Do you know why its separate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, maybe it should a part of regular shuffle server.
| if (!isLocal) { | ||
| env.metricsSystem.registerSource(executorSource) | ||
| env.blockManager.initialize(conf.getAppId) | ||
| env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: can you put the reigsterSource() calls next to each other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me update it.
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall except some nits.
| TempShuffleFileManager tempShuffleFileManager); | ||
|
|
||
| /** | ||
| * Get the shuffle MetricsSet from ShuffleClient, this will be used used in MetricsSystem to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this will be used used -> this will be used
|
|
||
| @Override | ||
| public MetricSet shuffleMetrics() { | ||
| checkInit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to the change here -- but should we also checkInit in the close() function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it should be, but looks like we never touch this issue before.
|
Test build #81962 has finished for PR 19160 at commit
|
|
lgtm @jerryshao I didn't merge yet in case you want to keep discussing the naming, but I'm fine with this as is, so feel free to merge. |
WeichenXu123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
Thanks all for your review, let me merge to master. |
What changes were proposed in this pull request?
This is a followup work of SPARK-9104 to expose the Netty memory usage to MetricsSystem. Current the shuffle Netty memory usage of
NettyBlockTransferServicewill be exposed, if using external shuffle, then the Netty memory usage ofExternalShuffleClientandExternalShuffleServicewill be exposed instead. Currently I don't expose Netty memory usage ofYarnShuffleService, becauseYarnShuffleServicedoesn't haveMetricsSystemitself, and is better to connect to Hadoop's MetricsSystem.How was this patch tested?
Manually verified in local cluster.