Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class DAGScheduler(

private[scheduler] var eventProcessActor: ActorRef = _

/** If enabled, we may run certain actions like take() and first() locally. */
private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)

private def initializeEventProcessActor() {
// blocking the thread until supervisor is started, which ensures eventProcessActor is
// not null before any job is submitted
Expand Down Expand Up @@ -732,7 +735,9 @@ class DAGScheduler(
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}

before {
sc = new SparkContext("local", "DAGSchedulerSuite")
// Enable local execution for this test
val conf = new SparkConf().set("spark.localExecution.enabled", "true")
sc = new SparkContext("local", "DAGSchedulerSuite", conf)
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
failure = null
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,15 @@ Apart from these, the following properties are also available, and may be useful
(in milliseconds).
</td>
</tr>
<tr>
<td><code>spark.localExecution.enabled</code></td>
<td>false</td>
<td>
Enables Spark to run certain jobs, such as first() or take() on the driver, without sending
tasks to the cluster. This can make certain jobs execute very quickly, but may require
shipping a whole partition of data to the driver.
</td>
</tr>
</table>

#### Security
Expand Down