Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
log warning in hive part
  • Loading branch information
cloud-fan committed Sep 14, 2015
commit 69b7d6588dd2c33b2c8a643ba0efde7499266160
23 changes: 16 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1052,10 +1052,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
if (speculationEnabled &&
hadoopConf.get("mapred.output.committer.class", "").contains("Direct")) {
logWarning("We may loss data when use direct output committer with speculation enabled, " +
"please make sure your output committer doesn't write data directly.")
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

FileOutputFormat.setOutputPath(hadoopConf,
Expand Down Expand Up @@ -1135,9 +1139,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
if (speculationEnabled && jobCommitter.getClass.getSimpleName.contains("Direct")) {
logWarning("We may loss data when use direct output committer with speculation enabled, " +
"please make sure your output committer doesn't write data directly.")
val outputCommitterClass = jobCommitter.getClass.getSimpleName
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

jobCommitter.setupJob(jobTaskContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,17 @@ case class InsertIntoHiveTable(
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)

// When speculation is enabled, it's not safe to use customized output committer classes,
// especially direct output committees (e.g. `DirectParquetOutputCommitter`).
// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
if (speculationEnabled) {
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
val outputCommitterClass = jobConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

val writerContainer = if (numDynamicPartitions > 0) {
Expand Down