Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/master' into remove-ttl-based-cl…
…eaning
  • Loading branch information
JoshRosen committed Jan 2, 2016
commit 5ffe30f99f4213457cc6c84032932319905f4420
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",
"Please use spark.kryoserializer.buffer instead. The default value for " +
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used any more.")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -871,11 +871,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
Expand Down Expand Up @@ -920,11 +920,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
Expand Down Expand Up @@ -1097,13 +1097,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
assertNotStopped()
// The call to new NewHadoopJob automatically adds security credentials to conf,
// The call to NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
val job = NewHadoopJob.getInstance(conf)
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}

Expand Down Expand Up @@ -1366,7 +1366,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
val isDir = fs.getFileStatus(hadoopPath).isDir
val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
Expand Down
12 changes: 3 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemor
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
import org.apache.spark.rpc.akka.AkkaRpcEnv
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
import org.apache.spark.serializer.Serializer
Expand Down Expand Up @@ -97,9 +96,7 @@ class SparkEnv (
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
if (!rpcEnv.isInstanceOf[AkkaRpcEnv]) {
actorSystem.shutdown()
}
actorSystem.shutdown()
rpcEnv.shutdown()

// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
Expand Down Expand Up @@ -248,14 +245,11 @@ object SparkEnv extends Logging {

val securityManager = new SecurityManager(conf)

// Create the ActorSystem for Akka and get the port it binds to.
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
// Create the ActorSystem for Akka and get the port it binds to.
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
val actorSystem: ActorSystem =
if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
} else {
val actorSystem: ActorSystem = {
val actorSystemPort =
if (port == 0 || rpcEnv.address == null) {
port
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.Date
import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskType

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
Expand All @@ -37,10 +38,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
class SparkHadoopWriter(jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {

private val now = new Date()
private val conf = new SerializableJobConf(jobConf)
Expand Down Expand Up @@ -131,7 +129,7 @@ class SparkHadoopWriter(jobConf: JobConf)

private def getJobContext(): JobContext = {
if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
jobContext = new JobContextImpl(conf.value, jID.value)
}
jobContext
}
Expand All @@ -143,14 +141,20 @@ class SparkHadoopWriter(jobConf: JobConf)
taskContext
}

protected def newTaskAttemptContext(
conf: JobConf,
attemptId: TaskAttemptID): TaskAttemptContext = {
new TaskAttemptContextImpl(conf, attemptId)
}

private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
jobID = jobid
splitID = splitid
attemptID = attemptid

jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
}
}

Expand All @@ -168,9 +172,9 @@ object SparkHadoopWriter {
}
val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
if (outputPath == null || fs == null) {
if (fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
outputPath.makeQualified(fs)
outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* An interface for all the broadcast implementations in Spark (to allow
* multiple broadcast implementations). SparkContext uses a user-specified
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
@DeveloperApi
trait BroadcastFactory {
private[spark] trait BroadcastFactory {

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong

import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf, SecurityManager}


private[spark] class BroadcastManager(
val isDriver: Boolean,
Expand All @@ -39,15 +39,8 @@ private[spark] class BroadcastManager(
private def initialize() {
synchronized {
if (!initialized) {
val broadcastFactoryClass =
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")

broadcastFactory =
Utils.classForName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

// Initialize appropriate BroadcastFactory and BroadcastObject
broadcastFactory = new TorrentBroadcastFactory
broadcastFactory.initialize(isDriver, conf, securityManager)

initialized = true
}
}
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.