Skip to content

Conversation

@aarondav
Copy link
Contributor

@aarondav aarondav commented Jul 7, 2014

Currently, local execution of Spark jobs is only used by take(), and it can be problematic as it can load a significant amount of data onto the driver. The worst case scenarios occur if the RDD is cached (guaranteed to load whole partition), has very large elements, or the partition is just large and we apply a filter with high selectivity or computational overhead.

Additionally, jobs that run locally in this manner do not show up in the web UI, and are thus harder to track or understand what is occurring.

This PR adds a flag to disable local execution, which is turned OFF by default, with the intention of perhaps eventually removing this functionality altogether. Removing it now is a tougher proposition since it is part of the public runJob API. An alternative solution would be to limit the flag to take()/first() to avoid impacting any external users of this API, but such usage (or, at least, reliance upon the feature) is hopefully minimal.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16384/

@rxin
Copy link
Contributor

rxin commented Jul 8, 2014

Maybe we should also solve the problem that local execution should not transfer the whole in-memory block (as a matter of fact, perhaps local execution should just bypass the in-memory data)?

@pwendell
Copy link
Contributor

@rxin is there a case where you think local execution will yield a relevant performance improvement? I don't see why shipping a task for a few milliseconds is a bit deal. The main use case I see for this is people running take in a repl... in this case the cluster scheduler is not backlogged because they can't access the repl at all until the prior command has finished anyways.

@rxin
Copy link
Contributor

rxin commented Jul 15, 2014

When the cluster is busy and backlogged ...

@aarondav
Copy link
Contributor Author

I think it makes more sense if you can't run a command than certain commands happen to be runnable while there are no cluster resources. This sort of execution puts more stress on the driver, as well, and things like OutOfMemoryErrors on the driver are far more serious than on an Executor (for example, this issue).

My hypothesis is that this feature is rarely useful, and often leads to more confusion for users and potentially less stability.

@rxin
Copy link
Contributor

rxin commented Aug 14, 2014

Now I think about it more. LGTM.

@mengxr
Copy link
Contributor

mengxr commented Aug 14, 2014

:)

@mengxr
Copy link
Contributor

mengxr commented Aug 14, 2014

Jenkins, retest this please.

Currently, local execution of Spark jobs is only used by take(), and it can
be problematic as it can load a significant amount of data onto the driver.
The worst case scenarios occur if the RDD is cached (guaranteed to load whole
partition), has very large elements, or the partition is just large and we
apply a filter with high selectivity or computational overhead.

Additionally, jobs that run locally in this manner do not show up in the web UI,
and are thus harder to track or understand what is occurring.

This PR adds a flag to disable local execution, which is turned OFF by default, with
the intention of perhaps eventually removing this functionality altogether. Removing it
now is a tougher proposition since it is part of the public runJob API. An alternative
solution would be to limit the flag to take()/first() to avoid impacting any external
users of this API, but such usage (or at least, reliance upon the feature) is hopefully
minimal.
@SparkQA
Copy link

SparkQA commented Aug 14, 2014

QA tests have started for PR 1321. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18523/consoleFull

@aarondav aarondav changed the title [RFC] Disable local execution of Spark jobs by default [SPARK-3029] Disable local execution of Spark jobs by default Aug 14, 2014
@SparkQA
Copy link

SparkQA commented Aug 14, 2014

QA results for PR 1321:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18523/consoleFull

@rxin
Copy link
Contributor

rxin commented Aug 14, 2014

You need to update the test suites.

@SparkQA
Copy link

SparkQA commented Aug 14, 2014

QA tests have started for PR 1321. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18531/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 14, 2014

QA results for PR 1321:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18531/consoleFull

@rxin
Copy link
Contributor

rxin commented Aug 14, 2014

Merging this in master and branch-1.1. Thanks!

@asfgit asfgit closed this in d069c5d Aug 14, 2014
asfgit pushed a commit that referenced this pull request Aug 14, 2014
Currently, local execution of Spark jobs is only used by take(), and it can be problematic as it can load a significant amount of data onto the driver. The worst case scenarios occur if the RDD is cached (guaranteed to load whole partition), has very large elements, or the partition is just large and we apply a filter with high selectivity or computational overhead.

Additionally, jobs that run locally in this manner do not show up in the web UI, and are thus harder to track or understand what is occurring.

This PR adds a flag to disable local execution, which is turned OFF by default, with the intention of perhaps eventually removing this functionality altogether. Removing it now is a tougher proposition since it is part of the public runJob API. An alternative solution would be to limit the flag to take()/first() to avoid impacting any external users of this API, but such usage (or, at least, reliance upon the feature) is hopefully minimal.

Author: Aaron Davidson <[email protected]>

Closes #1321 from aarondav/allowlocal and squashes the following commits:

136b253 [Aaron Davidson] Fix DAGSchedulerSuite
5599d55 [Aaron Davidson] [RFC] Disable local execution of Spark jobs by default

(cherry picked from commit d069c5d)
Signed-off-by: Reynold Xin <[email protected]>
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Currently, local execution of Spark jobs is only used by take(), and it can be problematic as it can load a significant amount of data onto the driver. The worst case scenarios occur if the RDD is cached (guaranteed to load whole partition), has very large elements, or the partition is just large and we apply a filter with high selectivity or computational overhead.

Additionally, jobs that run locally in this manner do not show up in the web UI, and are thus harder to track or understand what is occurring.

This PR adds a flag to disable local execution, which is turned OFF by default, with the intention of perhaps eventually removing this functionality altogether. Removing it now is a tougher proposition since it is part of the public runJob API. An alternative solution would be to limit the flag to take()/first() to avoid impacting any external users of this API, but such usage (or, at least, reliance upon the feature) is hopefully minimal.

Author: Aaron Davidson <[email protected]>

Closes apache#1321 from aarondav/allowlocal and squashes the following commits:

136b253 [Aaron Davidson] Fix DAGSchedulerSuite
5599d55 [Aaron Davidson] [RFC] Disable local execution of Spark jobs by default
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…e being rolled (apache#1321)

### What changes were proposed in this pull request?

This PR aims to support a new configuration to support the minimum number of tasks per executor before being selected as the executor rolling target.

### Why are the changes needed?

Newly created executors might have a long initial setup time during its initial tasks.
In this case, some rolling policies like `AVERAGE_DURATION` might kill those newly created executors.
This PR aims to protect newly created executors until `totalTasks` reaches the minimum number of tasks.

### Does this PR introduce _any_ user-facing change?

No. The default value is 0.

### How was this patch tested?

Pass the CIs with the newly added test case.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants