diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 396cdd1247e0..ed52643a2b93 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -543,7 +543,7 @@ class SparkContext(config: SparkConf) extends Logging { valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { - // Add necessary security credentials to the JobConf before broadcasting it. + // Add necessary security credentials to the JobConf. SparkHadoopUtil.get.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions) } @@ -562,17 +562,15 @@ class SparkContext(config: SparkConf) extends Logging { valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { - // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. - val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, - confBroadcast, - Some(setInputPathsFunc), + hadoopConfiguration, inputFormatClass, keyClass, valueClass, - minPartitions).setName(path) + minPartitions, + Some(setInputPathsFunc)).setName(path) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6b63eb23e9ee..7866daa24de0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -17,34 +17,24 @@ package org.apache.spark.rdd +import java.io.EOFException import java.text.SimpleDateFormat import java.util.Date -import java.io.EOFException - -import scala.collection.immutable.Map -import scala.reflect.ClassTag -import scala.collection.mutable.ListBuffer import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.mapred.FileSplit -import org.apache.hadoop.mapred.InputFormat -import org.apache.hadoop.mapred.InputSplit -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.RecordReader -import org.apache.hadoop.mapred.Reporter -import org.apache.hadoop.mapred.JobID -import org.apache.hadoop.mapred.TaskAttemptID -import org.apache.hadoop.mapred.TaskID +import org.apache.hadoop.mapred.{FileSplit, InputFormat, InputSplit, JobConf, JobID, RecordReader, Reporter, TaskAttemptID, TaskID} import org.apache.hadoop.util.ReflectionUtils - import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD +import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} import org.apache.spark.util.{NextIterator, Utils} -import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation} + +import scala.collection.immutable.Map +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag /** @@ -86,44 +76,27 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * [[org.apache.spark.SparkContext.hadoopRDD()]] * * @param sc The SparkContext to associate the RDD with. - * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed - * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. - * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. - * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD - * creates. + * @param conf A general Hadoop Configuration, or a subclass of it. If the enclosed variable + * references an instance of JobConf, then that JobConf will be used for the Hadoop job. + * Otherwise, a new JobConf will be created using the enclosed Configuration. * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate. + * @param initLocalJobConfFuncOpt Optional closure used to initialize a JobConf. */ @DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], - initLocalJobConfFuncOpt: Option[JobConf => Unit], + @transient conf: Configuration, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int) + minPartitions: Int, + initLocalJobConfFuncOpt: Option[JobConf => Unit] = None) extends RDD[(K, V)](sc, Nil) with Logging { - def this( - sc: SparkContext, - conf: JobConf, - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minPartitions: Int) = { - this( - sc, - sc.broadcast(new SerializableWritable(conf)) - .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], - None /* initLocalJobConfFuncOpt */, - inputFormatClass, - keyClass, - valueClass, - minPartitions) - } + private val serializableConf = new SerializableWritable(conf) protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) @@ -133,26 +106,17 @@ class HadoopRDD[K, V]( private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = { - val conf: Configuration = broadcastedConf.value.value - if (conf.isInstanceOf[JobConf]) { - // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. - conf.asInstanceOf[JobConf] - } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { - // getJobConf() has been called previously, so there is already a local cache of the JobConf - // needed by this RDD. - HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] - } else { - // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the - // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). - // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - // Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456). - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { - val newJobConf = new JobConf(conf) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf - } + protected def createJobConf(): JobConf = { + // Each task gets its own deserialized copy of the HadoopRDD, and therefore gets its own copy + // of the configuration. + val conf: Configuration = serializableConf.value + conf match { + case jobConf: JobConf => + jobConf + case _: Configuration => + val jobConf = new JobConf(conf) + initLocalJobConfFuncOpt.foreach(f => f(jobConf)) + jobConf } } @@ -172,7 +136,7 @@ class HadoopRDD[K, V]( } override def getPartitions: Array[Partition] = { - val jobConf = getJobConf() + val jobConf = createJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) @@ -193,7 +157,7 @@ class HadoopRDD[K, V]( val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null - val jobConf = getJobConf() + val jobConf = createJobConf() val inputFormat = getInputFormat(jobConf) HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), context.getStageId, theSplit.index, context.getAttemptId.toInt, jobConf) @@ -271,13 +235,9 @@ class HadoopRDD[K, V]( override def checkpoint() { // Do nothing. Hadoop RDD should not be checkpointed. } - - def getConf: Configuration = getJobConf() } private[spark] object HadoopRDD extends Logging { - /** Constructing Configuration objects is not threadsafe, use this lock to serialize. */ - val CONFIGURATION_INSTANTIATION_LOCK = new Object() /** * The three methods below are helpers for accessing the local map, a property of the SparkEnv of diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 0cccdefc5ee0..099accead656 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -73,9 +73,7 @@ class NewHadoopRDD[K, V]( with SparkHadoopMapReduceUtil with Logging { - // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it - private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - // private val serializableConf = new SerializableWritable(conf) + private val serializableConf = new SerializableWritable(conf) private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -104,7 +102,7 @@ class NewHadoopRDD[K, V]( val iter = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = confBroadcast.value.value + val conf = serializableConf.value val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance @@ -189,8 +187,6 @@ class NewHadoopRDD[K, V]( } locs.getOrElse(split.getLocations.filter(_ != "localhost")) } - - def getConf: Configuration = confBroadcast.value.value } private[spark] object NewHadoopRDD { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 84fafcde63d0..fd59656459ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities @@ -30,7 +29,6 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} import org.apache.spark.SerializableWritable -import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions._ @@ -62,13 +60,6 @@ class HadoopTableReader( // TODO: set aws s3 credentials. - private val _broadcastedHiveConf = - sc.sparkContext.broadcast(new SerializableWritable(sc.hiveconf)) - - def broadcastedHiveConf = _broadcastedHiveConf - - def hiveConf = _broadcastedHiveConf.value.value - override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] = makeRDDForTable( hiveTable, @@ -95,7 +86,7 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf + val hiveconfWrapper = new SerializableWritable(sc.hiveconf) val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) @@ -109,9 +100,8 @@ class HadoopTableReader( val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, tableDesc.getProperties) + deserializer.initialize(hiveconfWrapper.value, tableDesc.getProperties) HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow) } @@ -159,8 +149,8 @@ class HadoopTableReader( } // Create local references so that the outer object isn't serialized. + val hiveconfWrapper = new SerializableWritable(sc.hiveconf) val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf val localDeserializer = partDeserializer val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) @@ -181,10 +171,10 @@ class HadoopTableReader( // Fill all partition keys to the given MutableRow object fillPartitionKeys(partValues, mutableRow) - createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value + val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + hivePartitionRDD.mapPartitions { iter => val deserializer = localDeserializer.newInstance() - deserializer.initialize(hconf, partProps) + deserializer.initialize(hiveconfWrapper.value, partProps) // fill the non partition key attributes HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow) @@ -226,12 +216,12 @@ class HadoopTableReader( val rdd = new HadoopRDD( sc.sparkContext, - _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], - Some(initializeJobConfFunc), + sc.hiveconf, inputFormatClass, classOf[Writable], classOf[Writable], - _minSplitsPerRDD) + _minSplitsPerRDD, + Some(initializeJobConfFunc)) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2)