Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*
* Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
* not use output committer that writes data directly.
* There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
* result of using direct output committer with speculation enabled.
*/
def saveAsHadoopFile(
path: String,
Expand All @@ -1030,10 +1035,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
conf.setOutputFormat(outputFormatClass)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
Expand All @@ -1047,6 +1049,19 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
Expand All @@ -1057,6 +1072,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Configuration object for that storage system. The Conf should set an OutputFormat and any
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*
* Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
* not use output committer that writes data directly.
* There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
* result of using direct output committer with speculation enabled.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
Expand Down Expand Up @@ -1115,6 +1135,20 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = jobCommitter.getClass.getSimpleName
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
Expand All @@ -1129,7 +1163,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val wrappedConf = new SerializableConfiguration(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
Expand Down Expand Up @@ -1157,7 +1190,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.preSetup()

val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -62,7 +62,7 @@ case class InsertIntoHiveTable(

def output: Seq[Attribute] = Seq.empty

def saveAsHiveFile(
private def saveAsHiveFile(
rdd: RDD[InternalRow],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
Expand Down Expand Up @@ -178,6 +178,19 @@ case class InsertIntoHiveTable(
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = jobConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.hive.common.FileUtils

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down