Skip to content

Conversation

@yhuai
Copy link
Contributor

@yhuai yhuai commented Aug 18, 2015

https://issues.apache.org/jira/browse/SPARK-10087

In some cases, when spark.shuffle.reduceLocality.enabled is enabled, we are scheduling all reducers to the same executor (the cluster has plenty of resources). Changing spark.shuffle.reduceLocality.enabled to false resolve the problem.

Here is a little bit more information. For one of my query, all 200 reducers were scheduled to the same reducer and every reducer has about 800 KB input.

@yhuai yhuai changed the title [CORE] Disable spark.shuffle.reduceLocality.enabled by default. [SPARK-10087] [CORE] Disable spark.shuffle.reduceLocality.enabled by default. Aug 18, 2015
@rxin
Copy link
Contributor

rxin commented Aug 18, 2015

cc @shivaram

@shivaram
Copy link
Contributor

Could you provide some more information about the map output ? The reducer locality should not kick in unless a certain map output location has more than 20% of the output data. How many map tasks were run and what were their output sizes ?

@shivaram
Copy link
Contributor

cc @mateiz who has also been looking at this code recently

@yhuai
Copy link
Contributor Author

yhuai commented Aug 18, 2015

The reduce stage had a 2-way join in it. The two map stages had 30 and 1 tasks, respectively. For the stage having 30 tasks, here is the screenshot of task info

image

For the stage having 1 task, here is the screenshot of task info

image

@shivaram
Copy link
Contributor

Thanks for the info -- And just to confirm, is everything getting assigned to Executor ID 23 (10.0.145.27) in the reduce stage ?

@yhuai
Copy link
Contributor Author

yhuai commented Aug 18, 2015

ah, sorry i missed the reducer stage's screenshot. Yes, executor 23 was the one got all reduce tasks.

image

@shivaram
Copy link
Contributor

So my hypothesis right now is that the RDD in the reduce stage has two Shuffle dependencies and the first shuffle dependency happens to be the single map task stage -- so the locality preference ends up giving all the tasks to the single host.

Hmm so my guess is that we need to be able to differentiate among different shuffle dependencies ideally. Here is another suggestion: Can we turn this off if we have more than one shuffle dependency ? it should be pretty cheap to count that

@shivaram
Copy link
Contributor

The diff I'm proposing is something like

+    val numShuffleDeps = rdd.dependencies.filter(_.isInstanceOf[ShuffleDependency[_, _, _]]).length
+
     // If the RDD has shuffle dependencies and shuffle locality is enabled, pick locations that
     // have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
-    if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) {
+    if (numShuffleDeps == 1 && shuffleLocalityEnabled &&
+        rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) {

@SparkQA
Copy link

SparkQA commented Aug 18, 2015

Test build #41149 has finished for PR 8280 at commit f77e574.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mateiz
Copy link
Contributor

mateiz commented Aug 19, 2015

It does sound good to turn it off if there are multiple dependencies. However, an even better solution may be to move this into ShuffledRDD, so that we control where exactly it occurs.

BTW to make this robust, I'd also make it affect locality only if the amount of data sent to that task is substantial (say over 100 MB). Otherwise scheduling for locality based in 1-2 MB is unnecessary.

@mateiz
Copy link
Contributor

mateiz commented Aug 19, 2015

BTW it may also be fine to turn it off by default for 1.5, but in general, with these things, there's not much point having them in the code if they're off by default. We get very little feedback on them and they increase the number of configurations we have to worry about. I'm generally more inclined to turn these on and expand their scope gradually.

@rxin
Copy link
Contributor

rxin commented Aug 19, 2015

Why don't we turn it on in master but off in 1.5? At this point in the 1.5 cycle, I'm worry about potential bugs this would cause after more fixes.

@shivaram
Copy link
Contributor

But to Matei's point we don't get feedback if its on in the master branch as I guess many more people use a release. I think turning it off for the multiple dependency makes it strictly narrower than what we have now, so I'm not sure it will cause new bugs.

@rxin
Copy link
Contributor

rxin commented Aug 19, 2015

Sorry just too risky right now for 1.5.

@yhuai
Copy link
Contributor Author

yhuai commented Aug 19, 2015

I created #8296 to change the default setting to false for branch 1.5.

@rxin
Copy link
Contributor

rxin commented Aug 19, 2015

Let's close this one.

@shivaram can you submit a proper fix for master?

@shivaram
Copy link
Contributor

Ok - lets leave it on in master and I'll work with @mateiz on changes to move this to ShuffleRDD and capture more use cases. @yhuai could you put in the query you ran in the JIRA so we can test / track this ?

@yhuai yhuai closed this Aug 19, 2015
@yhuai
Copy link
Contributor Author

yhuai commented Aug 19, 2015

@shivaram Sure. Just updated the JIRA description.

asfgit pushed a commit that referenced this pull request Aug 19, 2015
…y.enabled by default.

https://issues.apache.org/jira/browse/SPARK-10087

In some cases, when spark.shuffle.reduceLocality.enabled is enabled, we are scheduling all reducers to the same executor (the cluster has plenty of resources). Changing spark.shuffle.reduceLocality.enabled to false resolve the problem.

Comments of #8280 provide more details of the symptom of this issue.

This PR changes the default setting of `spark.shuffle.reduceLocality.enabled` to `false` for branch 1.5.

Author: Yin Huai <[email protected]>

Closes #8296 from yhuai/setNumPartitionsCorrectly-branch1.5.
@mateiz
Copy link
Contributor

mateiz commented Aug 19, 2015

@shivaram did you create a JIRA for making this affect only ShuffledRDD? I might do it as part of https://issues.apache.org/jira/browse/SPARK-9852, which I'm working on a patch on (just haven't sent it yet because it depends on another in-review PR).

@shivaram
Copy link
Contributor

Not yet - I was hoping to keep SPARK-10087 open, but I guess thats closed now. Doing it as a part of SPARK-9852 sounds good to me. Let me know if you want me to review the other PR and unblock this etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants