Skip to content

Commit 78af229

Browse files
committed
Merge remote-tracking branch 'upstream/master' into cache_memory_leak
2 parents 26c9bb6 + 77eeb10 commit 78af229

File tree

244 files changed

+4324
-1244
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

244 files changed

+4324
-1244
lines changed

R/pkg/R/RDD.R

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
8585

8686
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
8787
# This transformation is the first in its stage:
88-
.Object@func <- func
88+
.Object@func <- cleanClosure(func)
8989
.Object@prev_jrdd <- getJRDD(prev)
9090
.Object@env$prev_serializedMode <- prev@env$serializedMode
9191
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
@@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9494
pipelinedFunc <- function(split, iterator) {
9595
func(split, prev@func(split, iterator))
9696
}
97-
.Object@func <- pipelinedFunc
97+
.Object@func <- cleanClosure(pipelinedFunc)
9898
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
9999
# Get the serialization mode of the parent RDD
100100
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
@@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
144144
return(rdd@env$jrdd_val)
145145
}
146146

147-
computeFunc <- function(split, part) {
148-
rdd@func(split, part)
149-
}
150-
151147
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
152148
connection = NULL)
153149

154150
broadcastArr <- lapply(ls(.broadcastNames),
155151
function(name) { get(name, .broadcastNames) })
156152

157-
serializedFuncArr <- serialize(computeFunc, connection = NULL)
153+
serializedFuncArr <- serialize(rdd@func, connection = NULL)
158154

159155
prev_jrdd <- rdd@prev_jrdd
160156

@@ -279,7 +275,7 @@ setMethod("unpersist",
279275
#' @examples
280276
#'\dontrun{
281277
#' sc <- sparkR.init()
282-
#' setCheckpointDir(sc, "checkpoints")
278+
#' setCheckpointDir(sc, "checkpoint")
283279
#' rdd <- parallelize(sc, 1:10, 2L)
284280
#' checkpoint(rdd)
285281
#'}
@@ -551,11 +547,7 @@ setMethod("mapPartitions",
551547
setMethod("lapplyPartitionsWithIndex",
552548
signature(X = "RDD", FUN = "function"),
553549
function(X, FUN) {
554-
FUN <- cleanClosure(FUN)
555-
closureCapturingFunc <- function(split, part) {
556-
FUN(split, part)
557-
}
558-
PipelinedRDD(X, closureCapturingFunc)
550+
PipelinedRDD(X, FUN)
559551
})
560552

561553
#' @rdname lapplyPartitionsWithIndex

R/pkg/R/context.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ broadcast <- function(sc, object) {
216216
#' @examples
217217
#'\dontrun{
218218
#' sc <- sparkR.init()
219-
#' setCheckpointDir(sc, "~/checkpoints")
219+
#' setCheckpointDir(sc, "~/checkpoint")
220220
#' rdd <- parallelize(sc, 1:2, 2L)
221221
#' checkpoint(rdd)
222222
#'}

R/pkg/R/pairRDD.R

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -694,10 +694,6 @@ setMethod("cogroup",
694694
for (i in 1:rddsLen) {
695695
rdds[[i]] <- lapply(rdds[[i]],
696696
function(x) { list(x[[1]], list(i, x[[2]])) })
697-
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
698-
# will not be captured into UDF if getJRDD is not invoked.
699-
# It should be resolved together with that issue.
700-
getJRDD(rdds[[i]]) # Capture the closure.
701697
}
702698
union.rdd <- Reduce(unionRDD, rdds)
703699
group.func <- function(vlist) {

R/pkg/inst/tests/test_rdd.R

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
141141
unpersist(rdd2)
142142
expect_false(rdd2@env$isCached)
143143

144-
setCheckpointDir(sc, "checkpoints")
144+
tempDir <- tempfile(pattern = "checkpoint")
145+
setCheckpointDir(sc, tempDir)
145146
checkpoint(rdd2)
146147
expect_true(rdd2@env$isCheckpointed)
147148

@@ -152,7 +153,7 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
152153
# make sure the data is collectable
153154
collect(rdd2)
154155

155-
unlink("checkpoints")
156+
unlink(tempDir)
156157
})
157158

158159
test_that("reduce on RDD", {

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,16 +80,16 @@ private[spark] class ExecutorAllocationManager(
8080
Integer.MAX_VALUE)
8181

8282
// How long there must be backlogged tasks for before an addition is triggered (seconds)
83-
private val schedulerBacklogTimeout = conf.getLong(
84-
"spark.dynamicAllocation.schedulerBacklogTimeout", 5)
83+
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
84+
"spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
8585

86-
// Same as above, but used only after `schedulerBacklogTimeout` is exceeded
87-
private val sustainedSchedulerBacklogTimeout = conf.getLong(
88-
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
86+
// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
87+
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
88+
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")
8989

9090
// How long an executor must be idle for before it is removed (seconds)
91-
private val executorIdleTimeout = conf.getLong(
92-
"spark.dynamicAllocation.executorIdleTimeout", 600)
91+
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
92+
"spark.dynamicAllocation.executorIdleTimeout", "600s")
9393

9494
// During testing, the methods to actually kill and add executors are mocked out
9595
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
@@ -150,14 +150,14 @@ private[spark] class ExecutorAllocationManager(
150150
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
151151
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
152152
}
153-
if (schedulerBacklogTimeout <= 0) {
153+
if (schedulerBacklogTimeoutS <= 0) {
154154
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
155155
}
156-
if (sustainedSchedulerBacklogTimeout <= 0) {
156+
if (sustainedSchedulerBacklogTimeoutS <= 0) {
157157
throw new SparkException(
158158
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
159159
}
160-
if (executorIdleTimeout <= 0) {
160+
if (executorIdleTimeoutS <= 0) {
161161
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
162162
}
163163
// Require external shuffle service for dynamic allocation
@@ -262,8 +262,8 @@ private[spark] class ExecutorAllocationManager(
262262
} else if (addTime != NOT_SET && now >= addTime) {
263263
val delta = addExecutors(maxNeeded)
264264
logDebug(s"Starting timer to add more executors (to " +
265-
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
266-
addTime += sustainedSchedulerBacklogTimeout * 1000
265+
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
266+
addTime += sustainedSchedulerBacklogTimeoutS * 1000
267267
delta
268268
} else {
269269
0
@@ -351,7 +351,7 @@ private[spark] class ExecutorAllocationManager(
351351
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
352352
if (removeRequestAcknowledged) {
353353
logInfo(s"Removing executor $executorId because it has been idle for " +
354-
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
354+
s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
355355
executorsPendingToRemove.add(executorId)
356356
true
357357
} else {
@@ -407,8 +407,8 @@ private[spark] class ExecutorAllocationManager(
407407
private def onSchedulerBacklogged(): Unit = synchronized {
408408
if (addTime == NOT_SET) {
409409
logDebug(s"Starting timer to add executors because pending tasks " +
410-
s"are building up (to expire in $schedulerBacklogTimeout seconds)")
411-
addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
410+
s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
411+
addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000
412412
}
413413
}
414414

@@ -431,8 +431,8 @@ private[spark] class ExecutorAllocationManager(
431431
if (executorIds.contains(executorId)) {
432432
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
433433
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
434-
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
435-
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
434+
s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
435+
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
436436
}
437437
} else {
438438
logWarning(s"Attempted to mark unknown executor $executorId idle")

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
6262

6363
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
6464
// "milliseconds"
65-
private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000).
66-
getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000))
67-
65+
private val slaveTimeoutMs =
66+
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s")
67+
private val executorTimeoutMs =
68+
sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000
69+
6870
// "spark.network.timeoutInterval" uses "seconds", while
6971
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
70-
private val checkTimeoutIntervalMs =
71-
sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
72-
getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
72+
private val timeoutIntervalMs =
73+
sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
74+
private val checkTimeoutIntervalMs =
75+
sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000
7376

7477
private var timeoutCheckingTask: ScheduledFuture[_] = null
7578

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private[spark] class HttpServer(
160160
throw new ServerStateException("Server is not started")
161161
} else {
162162
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
163-
s"$scheme://${Utils.localIpAddress}:$port"
163+
s"$scheme://${Utils.localHostNameForURI()}:$port"
164164
}
165165
}
166166
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,42 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
174174
getOption(key).getOrElse(defaultValue)
175175
}
176176

177+
/**
178+
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
179+
* suffix is provided then seconds are assumed.
180+
* @throws NoSuchElementException
181+
*/
182+
def getTimeAsSeconds(key: String): Long = {
183+
Utils.timeStringAsSeconds(get(key))
184+
}
185+
186+
/**
187+
* Get a time parameter as seconds, falling back to a default if not set. If no
188+
* suffix is provided then seconds are assumed.
189+
*
190+
*/
191+
def getTimeAsSeconds(key: String, defaultValue: String): Long = {
192+
Utils.timeStringAsSeconds(get(key, defaultValue))
193+
}
194+
195+
/**
196+
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
197+
* suffix is provided then milliseconds are assumed.
198+
* @throws NoSuchElementException
199+
*/
200+
def getTimeAsMs(key: String): Long = {
201+
Utils.timeStringAsMs(get(key))
202+
}
203+
204+
/**
205+
* Get a time parameter as milliseconds, falling back to a default if not set. If no
206+
* suffix is provided then milliseconds are assumed.
207+
*/
208+
def getTimeAsMs(key: String, defaultValue: String): Long = {
209+
Utils.timeStringAsMs(get(key, defaultValue))
210+
}
211+
212+
177213
/** Get a parameter as an Option */
178214
def getOption(key: String): Option[String] = {
179215
Option(settings.get(key))

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class LocalSparkCluster(
5353
/* Start the Master */
5454
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
5555
masterActorSystems += masterSystem
56-
val masterUrl = "spark://" + localHostname + ":" + masterPort
56+
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
5757
val masters = Array(masterUrl)
5858

5959
/* Start the Workers */

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,10 @@ import org.apache.hadoop.conf.Configuration
2424
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2525
import org.apache.hadoop.fs.FileSystem.Statistics
2626
import org.apache.hadoop.mapred.JobConf
27-
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
28-
import org.apache.hadoop.security.Credentials
29-
import org.apache.hadoop.security.UserGroupInformation
27+
import org.apache.hadoop.mapreduce.JobContext
28+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3029

31-
import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
30+
import org.apache.spark.{Logging, SparkConf, SparkException}
3231
import org.apache.spark.annotation.DeveloperApi
3332
import org.apache.spark.util.Utils
3433

@@ -201,6 +200,37 @@ class SparkHadoopUtil extends Logging {
201200
val baseStatus = fs.getFileStatus(basePath)
202201
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
203202
}
203+
204+
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
205+
206+
/**
207+
* Substitute variables by looking them up in Hadoop configs. Only variables that match the
208+
* ${hadoopconf- .. } pattern are substituted.
209+
*/
210+
def substituteHadoopVariables(text: String, hadoopConf: Configuration): String = {
211+
text match {
212+
case HADOOP_CONF_PATTERN(matched) => {
213+
logDebug(text + " matched " + HADOOP_CONF_PATTERN)
214+
val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. }
215+
val eval = Option[String](hadoopConf.get(key))
216+
.map { value =>
217+
logDebug("Substituted " + matched + " with " + value)
218+
text.replace(matched, value)
219+
}
220+
if (eval.isEmpty) {
221+
// The variable was not found in Hadoop configs, so return text as is.
222+
text
223+
} else {
224+
// Continue to substitute more variables.
225+
substituteHadoopVariables(eval.get, hadoopConf)
226+
}
227+
}
228+
case _ => {
229+
logDebug(text + " didn't match " + HADOOP_CONF_PATTERN)
230+
text
231+
}
232+
}
233+
}
204234
}
205235

206236
object SparkHadoopUtil {

0 commit comments

Comments
 (0)