Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {

// When spilling is enabled sorting will happen externally, but not necessarily with an
// ExternalSorter.
// When spilling is enabled sorting will happen externally, but not necessarily with an
// ExternalSorter.
private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)

@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}

override def isCompleted: Boolean = jobWaiter.jobFinished

override def isCancelled: Boolean = _cancelled

override def value: Option[Try[T]] = {
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
* components to convey liveness or execution information for in-progress tasks. It will also
* components to convey liveness or execution information for in-progress tasks. It will also
* expire the hosts that have not heartbeated for more than spark.network.timeout.
*/
private[spark] case class Heartbeat(
Expand All @@ -43,8 +43,8 @@ private[spark] case class Heartbeat(
*/
private[spark] case object TaskSchedulerIsSet

private[spark] case object ExpireDeadHosts
private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
Expand All @@ -62,18 +62,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)

// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
private val slaveTimeoutMs =
private val slaveTimeoutMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s")
private val executorTimeoutMs =
private val executorTimeoutMs =
sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000

// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
private val timeoutIntervalMs =
private val timeoutIntervalMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
private val checkTimeoutIntervalMs =
private val checkTimeoutIntervalMs =
sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000

private var timeoutCheckingTask: ScheduledFuture[_] = null

// "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
Expand Down Expand Up @@ -140,7 +140,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
}
}
}

override def onStop(): Unit = {
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel(true)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ private[spark] class HttpFileServer(

def stop() {
httpServer.stop()
// If we only stop sc, but the driver process still run as a services then we need to delete

// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs
try {
Utils.deleteRecursively(baseDir)
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getSizeAsBytes(key: String, defaultValue: String): Long = {
Utils.byteStringAsBytes(get(key, defaultValue))
}

/**
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Kibibytes are assumed.
Expand All @@ -244,7 +244,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getSizeAsKb(key: String, defaultValue: String): Long = {
Utils.byteStringAsKb(get(key, defaultValue))
}

/**
* Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Mebibytes are assumed.
Expand All @@ -261,7 +261,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getSizeAsMb(key: String, defaultValue: String): Long = {
Utils.byteStringAsMb(get(key, defaultValue))
}

/**
* Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Gibibytes are assumed.
Expand All @@ -278,7 +278,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getSizeAsGb(key: String, defaultValue: String): Long = {
Utils.byteStringAsGb(get(key, defaultValue))
}

/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
Expand Down Expand Up @@ -480,7 +480,7 @@ private[spark] object SparkConf extends Logging {
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
}

Expand Down Expand Up @@ -508,7 +508,7 @@ private[spark] object SparkConf extends Logging {
"spark.reducer.maxSizeInFlight" -> Seq(
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
"spark.kryoserializer.buffer" ->
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${(s.toDouble * 1000).toInt}k")),
"spark.kryoserializer.buffer.max" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[spark] object TestUtils {
classpathUrls: Seq[URL] = Seq()): URL = {
val tempDir = Utils.createTempDir()
val files1 = for (name <- classNames) yield {
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
}
val files2 = for ((childName, baseName) <- classNamesWithBase) yield {
createCompiledClass(childName, tempDir, toStringValue, baseName, classpathUrls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double])
*/
def sample(withReplacement: Boolean, fraction: JDouble): JavaDoubleRDD =
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
*/
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,18 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])

/**
* Return a sampled subset of this RDD.
*
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
*/
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] =
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
*
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,10 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:

val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536)

/**
/**
* We try to reuse a single Socket to transfer accumulator updates, as they are all added
* by the DAGScheduler's single-threaded actor anyway.
*/
*/
@transient var socket: Socket = _

def openSocket(): Socket = synchronized {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/api/r/RBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ private[spark] class RBackend {
bossGroup = new NioEventLoopGroup(2)
val workerGroup = bossGroup
val handler = new RBackendHandler(this)

bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(classOf[NioServerSocketChannel])

bootstrap.childHandler(new ChannelInitializer[SocketChannel]() {
def initChannel(ch: SocketChannel): Unit = {
ch.pipeline()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend)
val reply = bos.toByteArray
ctx.write(reply)
}

override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
ctx.flush()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ private[spark] object SparkSubmitUtils {
md.addDependency(dd)
}
}

/** Add exclusion rules for dependencies already included in the spark-assembly */
def addExclusionRules(
ivySettings: IvySettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
*/
private[history] class HistoryServerArguments(conf: SparkConf, args: Array[String])
private[history] class HistoryServerArguments(conf: SparkConf, args: Array[String])
extends Logging {
private var propertiesFile: String = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkCuratorUtil
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
extends PersistenceEngine
with Logging {

private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,22 @@ class TaskMetrics extends Serializable {
private var _hostname: String = _
def hostname: String = _hostname
private[spark] def setHostname(value: String) = _hostname = value

/**
* Time taken on the executor to deserialize this task
*/
private var _executorDeserializeTime: Long = _
def executorDeserializeTime: Long = _executorDeserializeTime
private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value


/**
* Time the executor spends actually running the task (including fetching shuffle data)
*/
private var _executorRunTime: Long = _
def executorRunTime: Long = _executorRunTime
private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value

/**
* The number of bytes this task transmitted back to the driver as the TaskResult
*/
Expand Down Expand Up @@ -315,7 +315,7 @@ class ShuffleReadMetrics extends Serializable {
def remoteBlocksFetched: Int = _remoteBlocksFetched
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value

/**
* Number of local blocks fetched in this shuffle by this task
*/
Expand All @@ -333,7 +333,7 @@ class ShuffleReadMetrics extends Serializable {
def fetchWaitTime: Long = _fetchWaitTime
private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value

/**
* Total number of remote bytes read from the shuffle by this task
*/
Expand Down Expand Up @@ -381,15 +381,15 @@ class ShuffleWriteMetrics extends Serializable {
def shuffleBytesWritten: Long = _shuffleBytesWritten
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value

/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
@volatile private var _shuffleWriteTime: Long = _
def shuffleWriteTime: Long = _shuffleWriteTime
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value

/**
* Total number of records written to the shuffle by this task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class Slf4jSink(
val property: Properties,
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
securityMgr: SecurityManager)
extends Sink {
val SLF4J_DEFAULT_PERIOD = 10
val SLF4J_DEFAULT_UNIT = "SECONDS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ package org.apache.spark.metrics
/**
* Sinks used in Spark's metrics system.
*/
package object sink
package object sink
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
numPartsToTry = partsScanned * 4
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max(1,
numPartsToTry = Math.max(1,
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class NewHadoopRDD[K, V](
override def getPreferredLocations(hsplit: Partition): Seq[String] = {
val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
case Some(c) =>
case Some(c) =>
try {
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
Some(HadoopRDD.convertSplitLocationInfo(infos))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
reduceByKeyLocally(func)
}

/**
/**
* Count the number of elements for each key, collecting the results to a local Map.
*
* Note that this method should only be used if the resulting map is expected to be small, as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
*
* @param logData Stream containing event log data.
* @param sourceName Filename (or other source identifier) from whence @logData is being read
* @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
* @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
* encountered, log file might not finished writing) or not
*/
def replay(
Expand All @@ -62,7 +62,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
if (!maybeTruncated || lines.hasNext) {
throw jpe
} else {
logWarning(s"Got JsonParseException from log file $sourceName" +
logWarning(s"Got JsonParseException from log file $sourceName" +
s" at line $lineNumber, the file might not have finished writing cleanly.")
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
if (interruptThread && taskThread != null) {
taskThread.interrupt()
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,9 +861,9 @@ private[spark] class TaskSetManager(
case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
case _ => null
}

if (localityWaitKey != null) {
conf.getTimeAsMs(localityWaitKey, defaultWait)
conf.getTimeAsMs(localityWaitKey, defaultWait)
} else {
0L
}
Expand Down
Loading