Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] private[spark] (
@transient initialValue: R,
initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
@transient partitions: Int,
@transient rdd: RDD[_ <: Product2[K, V]],
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
class SparkHadoopWriter(@transient jobConf: JobConf)
class SparkHadoopWriter(jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.util.control.NonFatal

private[spark] class PythonRDD(
@transient parent: RDD[_],
parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
Expand Down Expand Up @@ -785,7 +785,7 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By
* Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
* collects a list of pickled strings that we pass to Python through a socket.
*/
private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
private class PythonAccumulatorParam(@transient private val serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {

Utils.checkHost(serverHost, "Expected hostname")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
*/
@Experimental
class PortableDataStream(
@transient isplit: CombineFileSplit,
@transient context: TaskAttemptContext,
isplit: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
new RpcResponseCallback {
override def onSuccess(response: Array[Byte]): Unit = {
logTrace(s"Successfully uploaded block $blockId")
result.success()
result.success((): Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ private[spark] class BinaryFileRDD[T](
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
configurable.setConf(getConf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends P
}

private[spark]
class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {

@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
Expand Down Expand Up @@ -64,7 +64,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
*/
private[spark] def removeBlocks() {
blockIds.foreach { blockId =>
sc.env.blockManager.master.removeBlock(blockId)
sparkContext.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.util.Utils
private[spark]
class CartesianPartition(
idx: Int,
@transient rdd1: RDD[_],
@transient rdd2: RDD[_],
@transient private val rdd1: RDD[_],
@transient private val rdd2: RDD[_],
s1Index: Int,
s2Index: Int
) extends Partition {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition
/**
* An RDD that recovers checkpointed data from storage.
*/
private[spark] abstract class CheckpointRDD[T: ClassTag](@transient sc: SparkContext)
private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
extends RDD[T](sc, Nil) {

// CheckpointRDD should not be checkpointed again
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.storage.StorageLevel
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit)
extends Partition {

val inputSplit = new SerializableWritable[InputSplit](s)
Expand Down Expand Up @@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
*/
@DeveloperApi
class HadoopRDD[K, V](
@transient sc: SparkContext,
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand All @@ -109,7 +109,7 @@ class HadoopRDD[K, V](
extends RDD[(K, V)](sc, Nil) with Logging {

if (initLocalJobConfFuncOpt.isDefined) {
sc.clean(initLocalJobConfFuncOpt.get)
sparkContext.clean(initLocalJobConfFuncOpt.get)
}

def this(
Expand Down Expand Up @@ -137,7 +137,7 @@ class HadoopRDD[K, V](
// used to build JobTracker ID
private val createTime = new Date()

private val shouldCloneJobConf = sc.conf.getBoolean("spark.hadoop.cloneConf", false)
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.storage.RDDBlockId
* @param numPartitions the number of partitions in the checkpointed RDD
*/
private[spark] class LocalCheckpointRDD[T: ClassTag](
@transient sc: SparkContext,
sc: SparkContext,
rddId: Int,
numPartitions: Int)
extends CheckpointRDD[T](sc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {

/**
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.storage.StorageLevel
private[spark] class NewHadoopPartition(
rddId: Int,
val index: Int,
@transient rawSplit: InputSplit with Writable)
rawSplit: InputSplit with Writable)
extends Partition {

val serializableHadoopSplit = new SerializableWritable(rawSplit)
Expand Down Expand Up @@ -68,14 +68,14 @@ class NewHadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
@transient conf: Configuration)
@transient private val _conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {

// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
// private val serializableConf = new SerializableWritable(conf)
private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
// private val serializableConf = new SerializableWritable(_conf)

private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
Expand All @@ -88,10 +88,10 @@ class NewHadoopRDD[K, V](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
configurable.setConf(_conf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
val jobContext = newJobContext(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
Expand Down Expand Up @@ -262,18 +262,18 @@ private[spark] class WholeTextFileRDD(
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
keyClass: Class[String],
valueClass: Class[String],
@transient conf: Configuration,
conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
configurable.setConf(getConf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
}

private[spark] class ParallelCollectionRDD[T: ClassTag](
@transient sc: SparkContext,
@transient data: Seq[T],
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Par
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {

@transient
Expand All @@ -55,8 +55,8 @@ private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterF
*/
@DeveloperApi
class PartitionPruningRDD[T: ClassTag](
@transient prev: RDD[T],
@transient partitionFilterFunc: Int => Boolean)
prev: RDD[T],
partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
@transient preservesPartitioning: Boolean,
@transient seed: Long = Utils.random.nextLong)
preservesPartitioning: Boolean,
@transient private val seed: Long = Utils.random.nextLong)
extends RDD[U](prev) {

@transient override val partitioner = if (preservesPartitioning) prev.partitioner else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] object CheckpointState extends Enumeration {
* as well as, manages the post-checkpoint state by providing the updated partitions,
* iterator and preferred locations of the checkpointed RDD.
*/
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends Serializable {

import CheckpointState._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* An RDD that reads from checkpoint files previously written to reliable storage.
*/
private[spark] class ReliableCheckpointRDD[T: ClassTag](
@transient sc: SparkContext,
sc: SparkContext,
val checkpointPath: String)
extends CheckpointRDD[T](sc) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.SerializableConfiguration
* An implementation of checkpointing that writes the RDD data to reliable storage.
* This allows drivers to be restarted on failure with previously computed state.
*/
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {

// The directory to which the associated RDD has been checkpointed to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Ut
private[spark] class SqlNewHadoopPartition(
rddId: Int,
val index: Int,
@transient rawSplit: InputSplit with Writable)
rawSplit: InputSplit with Writable)
extends SparkPartition {

val serializableHadoopSplit = new SerializableWritable(rawSplit)
Expand All @@ -61,9 +61,9 @@ private[spark] class SqlNewHadoopPartition(
* changes based on [[org.apache.spark.rdd.HadoopRDD]].
*/
private[spark] class SqlNewHadoopRDD[V: ClassTag](
@transient sc : SparkContext,
sc : SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
@transient initDriverSideJobFuncOpt: Option[Job => Unit],
@transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[Void, V]],
valueClass: Class[V])
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import org.apache.spark.util.Utils
*/
private[spark] class UnionPartition[T: ClassTag](
idx: Int,
@transient rdd: RDD[T],
@transient private val rdd: RDD[T],
val parentRddIndex: Int,
@transient parentRddPartitionIndex: Int)
@transient private val parentRddPartitionIndex: Int)
extends Partition {

var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils

private[spark] class ZippedPartitionsPartition(
idx: Int,
@transient rdds: Seq[RDD[_]],
@transient private val rdds: Seq[RDD[_]],
@transient val preferredLocations: Seq[String])
extends Partition {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
* @tparam T parent RDD item type
*/
private[spark]
class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {
class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {

/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.{SparkException, Logging, SparkConf}
/**
* A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
*/
private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {

private[this] val maxRetries = RpcUtils.numRetries(conf)
Expand Down
Loading