-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch #18487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #78984 has finished for PR 18487 at commit
|
|
@rxin @cloud-fan Can you review this PR? |
| .createWithDefault(3) | ||
|
|
||
| private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS = | ||
| ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if it's a good idea to do this at reducer side, because there may be a lot of reducers fetching data from one shuffle service at the same time, and you wouldn't know that at reducer side. cc @jinxing64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this won't resolve all problems, but it is still good to add the limit to prevent fetching too much blocks from an address at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When shuffle service gets OOM, there are always lots of (thousands of) reducers(maybe from different apps) fetching blocks. I'm not sure if it will help much to limit in-flight blocks from reducer.
Also we've already have maxReqsInFlight and maxBytesInFlight. Is it little bit redundant to to have maxBlocksInFlightPerAddress?
Sorry for this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinxing64 After your fix for lazily loading the open blocks iterator, I am not seeing issues with the NM crashing on my end. However, in cases where a request was being made with a high no. of blocks which were under the max constraints caused increased load. This is an added layer of defense which can mitigate the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea that we should prevent fetching too much blocks from an address at a time, left a few comments.
| .createWithDefault(3) | ||
|
|
||
| private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS = | ||
| ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this won't resolve all problems, but it is still good to add the limit to prevent fetching too much blocks from an address at a time.
| private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS = | ||
| ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress") | ||
| .doc("This configuration limits the number of remote blocks being fetched from a given " + | ||
| " host port at any given point. When external shuffle is enabled and a large number of " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it affect the behavior whether the external shuffle service is enabled or not? AFAIK this should have little relation with external shuffle service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it would take down either the NM or the executor serving the map output tasks based on the shuffle mode. We emphasize the external shuffle case as crashing the NM is more severe than loosing an executor. I am open to re-wording it so that it is easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At lease we should state the configuration doesn't necessarily go with external shuffle service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay.
| " could crash the Node Manager under increased load. You can mitigate this issue by " + | ||
| " setting it to a lower value.") | ||
| .intConf | ||
| .createWithDefault(Int.MaxValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should add checkValue to ensure this is above zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay.
|
|
||
| // Checks if sending a new fetch request will exceed the max no. of blocks being fetched from a | ||
| // given remote address. | ||
| def isRemoteAddressMaxedOut(remoteHost: BlockManagerId, request: FetchRequest): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this remoteHost or remoteAddress?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be remoteAddress
| if (deferredFetchRequests.nonEmpty) { | ||
| for ((remoteAddress, defReqQueue) <- deferredFetchRequests) { | ||
| while (isRemoteBlockFetchable(defReqQueue) && | ||
| !isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the request.blocks.size is above the config value, then isRemoteAddressMaxedOut() will always return false, and thus we won't exit the while loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We check the no. of blocks being added to a fetch request. If it is larger than the configured no. we create a new request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Test build #79290 has finished for PR 18487 at commit
|
|
Test build #79292 has finished for PR 18487 at commit
|
|
@jiangxb1987 I have made the changes requested. Can you have a look. Thanks. |
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, but we may need to add some test cases.
| private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS = | ||
| ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress") | ||
| .doc("This configuration limits the number of remote blocks being fetched from a given " + | ||
| " host port at any given point. When external shuffle is enabled and a large number of " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At lease we should state the configuration doesn't necessarily go with external shuffle service.
|
|
||
| // Process any outstanding deferred fetch requests if possible. | ||
| if (deferredFetchRequests.nonEmpty) { | ||
| for ((remoteAddress, defReqQueue) <- deferredFetchRequests) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the traffic is heavy, it may takes some time to finish the iterator, do you have any idea on reducing the effort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't get you. We are just iterating to check if there are requests that can be scheduled. And they are handled asynchronously by the send calls. What effort are you addressing to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking maybe we can avoid adding extra deferredFetchRequests to handle deferred fetch requests, instead we can iterator over the fetchRequests to send the requests that are not maxed out, this way we may simplify the logic. Would you like to try?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes. That was the first choice and I gave it a try to avoid adding any extra bookkeeping. The are issues with that approach. Say you have a request which has to be deferred. You just remove it and push at the end and continue.
- This is good as far as you don't meet the deferred request again.
- Now if you meet the deferred request again, it may or may not be schedulable based on whether the remote finished processing earlier request. This would lead going up in circles (wasted effort). To avoid this we have to know when to stop. We would have to keep a marker for request which was already deferred so that we know we have to stop. But this marker would be only for a single request which corresponds to one remote. In the meanwhile other remotes could have finished processing their earlier requests and we can schedule requests to them. For this we can no longer stop at the first marker for a single address. We would have to check the requests again.
This makes it more complicated than scheduling all that's possible in a single shot and deferring what it encounters on its way. The next time we try to clear any backlog from previous run and after doing so proceed normally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, make sense.
|
This LGTM, @dhruve could you rebase it with the master branch please? |
|
@jiangxb1987 I have resolved the merge conflicts and reworded the config to make it more clear. |
|
Test build #79469 has finished for PR 18487 at commit
|
|
Jenkins, test this please |
|
Test build #79477 has finished for PR 18487 at commit
|
|
cc @cloud-fan |
|
Will this be convered by #18388 ? And another concern is how shall we expect users to tune this config? Can users just tune |
|
@jiangxb1987 Thanks for the review. @cloud-fan #18388, will reject any open block connections when the NM is under memory pressure. The changes proposed try to limit the concurrency, however it requires the user to know how it affects fetch failures as well. If a NM is under severe load, chances are that your fetch attempts would be closed multiple times and you would need to bump up the # of fetch retries or else your job could fail because of fetch failure. 'spark.reducer.maxReqsInFlight' can be used to control the overall requests being sent out. However all of them can still go out to a single host and max it out. If you reduce them, you loose out on throughput as it would take more time to fetch the results. |
| " from a given host port. When a large number of blocks are being requested from a given" + | ||
| " address in a single fetch or simultaneously, this could crash the serving executor or" + | ||
| " Node Manager. This is especially useful to reduce the load on the Node Manager when" + | ||
| "external shuffle is enabled. You can mitigate the issue by setting it to a lower value.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space before external
| private[this] val fetchRequests = new Queue[FetchRequest] | ||
|
|
||
| /** | ||
| * Queue of fetch requests which could not be issued the first time they were dequed. These |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/dequed/dequeued/
|
Test build #79614 has finished for PR 18487 at commit
|
|
|
|
@jinxing64 I performed few runs to see if we were observing any performance issues with the change. I ran a simple word count job over a random set of text - 3TB. I couldn't get 100's of executors hammering a single NM, however each NM ended up serving approximately a gig of shuffle data (This might be less in magnitude compared to the job that you are running). I ran it for different maxBlocks # - 10, 20 and 50 keeping the executors to be roughly same and didn't notice any performance difference in terms of the running time of the job across different runs. For a proper value of Since the changes are on the reducer side, I would recommend to run your job against the changes and set it to a low value and have couple of runs to see if you are seeing any performance hit with large no. of connections hitting a node, as I couldn't reproduce it with the 3TB word count. |
|
@jinxing64 We have the default set the int max so that by default there is no performance penalty for users. We have done some testing as Dhruve mentioned but we don't regularly hit the issue. My plan was to set it to 20 just like mapreduce does as a starting point. The exact configuration can be configuration dependent though as it depends on how much memory you give to your NM's, how many requests in parallel you think you will have, etc. We do plan on doing some more testing but it probably won't be for a couple days |
| } | ||
| if (curRequestSize >= targetRequestSize) { | ||
| if (curRequestSize >= targetRequestSize || | ||
| curBlocks.size >= maxBlocksInFlightPerAddress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may have a lot of adjacent fetch requests in the queue, shall we shuffle the request queue before fetching?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| " external shuffle is enabled. You can mitigate the issue by setting it to a lower value.") | ||
| .intConf | ||
| .checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.") | ||
| .createWithDefault(Int.MaxValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @tgravescs shall we change the default value to 20 or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine leaving it maxvalue for now to not change current behavior just like we have done with some of these other related configs. I would like to get more runtime on this in production and then we can set it later. Perhaps in 2.3, it would be nice to pull this back into branch 2.2 as well master.
|
retest this please |
|
+1, pending jenkins build. if no further comments I'm going to commit this later today. |
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Test build #79760 has finished for PR 18487 at commit
|
|
cherry pick to 2.2 wasn't clean so can you please put up a separate PR against branch 2.2 |
|
hm is this a bug fix? if not we shouldn't cherry pick it. |
|
@rxin it's kind of a stability fix(make shuffle service more stable), so I'm ok to backport if the conflict is small. |
| .doc("This configuration limits the number of remote blocks being fetched per reduce task" + | ||
| " from a given host port. When a large number of blocks are being requested from a given" + | ||
| " address in a single fetch or simultaneously, this could crash the serving executor or" + | ||
| " Node Manager. This is especially useful to reduce the load on the Node Manager when" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we say shuffle service instead of Node Manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the shuffle service fails it can take down the Node Manager which is more severe and hence i have used it. And in the following sentence i have mentioned the external shuffle. If it is not clear, I am okay to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Node Manager is for YARN only? Shuffle service is more general
| result match { | ||
| case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) => | ||
| if (address != blockManager.blockManagerId) { | ||
| numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do this earlier? e.g. right after the fetch result is enqueued to results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good point. Infact, we could also move the other bookkeeping stuff right after the fetch result is enqueued.
I would also want to look at the initialization of the BlockFetchingListener to see the effects of this as it would increase the size of the closure. Can we have a separate JIRA filed for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan filed a JIRA for this => https://issues.apache.org/jira/browse/SPARK-21500
| + s"${request.blocks.length} blocks") | ||
| send(remoteAddress, request) | ||
| if (defReqQueue.isEmpty) { | ||
| deferredFetchRequests -= remoteAddress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can leave the empty queue here, as we may still have fetch requests to put in this queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would have to unnecessarily iterate through the map for all the block manager ids for which we deferred fetch requests at an earlier point to check if they have any pending fetch requests when they don't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see
| logDebug(s"Deferring fetch request for $remoteAddress with ${request.blocks.size} blocks") | ||
| val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, new Queue[FetchRequest]()) | ||
| defReqQueue.enqueue(request) | ||
| deferredFetchRequests(remoteAddress) = defReqQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the defReqQueue is mutable, so we don't need to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is the first time that we want to defer a request, defReqQueue has to be associated with its corresponding 'remoteAddress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right
For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration `spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled. Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.) Author: Dhruve Ashar <[email protected]> Closes apache#18487 from dhruve/impr/SPARK-21243.
|
@cloud-fan replied to your comments. |
|
@tgravescs Thanks for merging this. I have created a PR for 2.2 #18691 I had to remove a couple of newer config entries which landed while resolving a merge conflict. |
For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration `spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled. Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.) Author: Dhruve Ashar <dhruveashargmail.com> Closes #18487 from dhruve/impr/SPARK-21243. Author: Dhruve Ashar <[email protected]> Closes #18691 from dhruve/branch-2.2.
For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration `spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled. Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.) Author: Dhruve Ashar <dhruveashargmail.com> Closes apache#18487 from dhruve/impr/SPARK-21243. Author: Dhruve Ashar <[email protected]> Closes apache#18691 from dhruve/branch-2.2.
What changes were proposed in this pull request?
For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration
spark.reducer.maxBlocksInFlightPerAddress, to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled.How was this patch tested?
Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.)