Skip to content

Conversation

@cloud-fan
Copy link
Contributor

This is a follow-up of #8317.

When speculation is enabled, there may be multiply tasks writing to the same path. Generally it's OK as we will write to a temporary directory first and only one task can commit the temporary directory to target path.

However, when we use direct output committer, tasks will write data to target path directly without temporary directory. This causes problems like corrupted data. Please see PR comment for more details.

Unfortunately, we don't have a simple flag to tell if a output committer will write to temporary directory or not, so for safety, we have to disable any customized output committer when speculation is true.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42248 has finished for PR 8687 at commit 9987c86.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit confused here. jobCommitter was retrieved via jobFormat.getOutputCommitter(), which should return the default output committer bundled with the output format. Does this mean saveAsNewAPIHadoopDataset() never respects user-defined output committers (I mean the version before this change)? If that's true, we should just leave this method as is. Restricting output committer class to be FileOutputCommitter is too strong a limitation.

cc @yhuai

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a too strong limitation, but my concern is that: there is possibility that a user-defined output formatter provides a direct output committer which we should reject. However, these is no simple way to see if a output committer is direct or not, so I just restrict it to FileOutputCommitter only here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jobFormat.getOutputCommitter() will return the output committer associated with the output format. Basically, in normal cases, you cannot specify output committer for mapreduce api (the new api). So, I think we should not make change at here. Also, jobCommitter.getClass != classOf[org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] is a strong condition, which is not always the case. Every output format implemented with mapreduce API can have its own output committer (e.g. org.apache.parquet.hadoop.ParquetOutputCommitter).

Let's leave this part unchanged.

@liancheng
Copy link
Contributor

As commented above, I feel that there probably isn't a reliable way to disallow users to use customized output committer classes together with speculation at the level of Spark Core. Maybe we should just leave a big red alert in the documentation.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42262 has finished for PR 8687 at commit 9987c86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Sep 10, 2015

Yeah, I think a warming message in the log and mention it in the document is good.

@cloud-fan Can you also take a look at the place that we save data to Hive?

@cloud-fan cloud-fan changed the title [SPARK-9899][SQL][WIP] Disables customized output committer when speculation is enabled [SPARK-9899][SQL] Disables customized output committer when speculation is enabled Sep 11, 2015
@SparkQA
Copy link

SparkQA commented Sep 11, 2015

Test build #42331 has finished for PR 8687 at commit e2bda91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just log a warning at here, right?

@yhuai
Copy link
Contributor

yhuai commented Sep 11, 2015

Let's also update the title to reflect the change.

@cloud-fan cloud-fan changed the title [SPARK-9899][SQL] Disables customized output committer when speculation is enabled [SPARK-9899][SQL] log warning for direct output committer with speculation enabled Sep 12, 2015
@SparkQA
Copy link

SparkQA commented Sep 12, 2015

Test build #42368 has finished for PR 8687 at commit db59c25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
  val warningMessage =
    s"$outputCommitterClass may be a output committer that writes data directly to the final location. " + 
    "Because speculation is enabled, this output committer may cause data loss (see the case in SPARK-10063). " +
    "If possible, please use a output committer that does not have this behavior (e.g. FileOutputCommitter)."
  logWarning(warningMessage)
}

@yhuai
Copy link
Contributor

yhuai commented Sep 14, 2015

LGTM. Will merge to master once it passes jenkins.

@SparkQA
Copy link

SparkQA commented Sep 14, 2015

Test build #42429 has finished for PR 8687 at commit 69b7d65.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Sep 14, 2015

Thanks. Merging to master.

@asfgit asfgit closed this in 32407bf Sep 14, 2015
@cloud-fan
Copy link
Contributor Author

I test it locally with these code:

sparkContext.conf.set("spark.speculation", "true")

sparkContext.hadoopConfiguration.set("mapred.output.committer.class",
  "org.apache.spark.sql.hive.execution.DirectDummyOutputCommitter")
sparkContext.makeRDD(Seq(1, 2)).saveAsTextFile("tmp")


sparkContext.hadoopConfiguration.set("mapreduce.job.outputformat.class",
  "org.apache.spark.sql.hive.execution.DummyOutputFormatter")
sparkContext.hadoopConfiguration.set("mapred.output.dir", "tmp")
sparkContext.makeRDD(Seq(1 ->"a", 2 -> "b")).saveAsNewAPIHadoopDataset(sparkContext.hadoopConfiguration)

DummyOutputFormatter is a subclass of FileOutputFormat but override the getOutputCommitter method to return a customized OutputCommitter with "Direct" in its name.

And the warning message do get logged. The hive write path should be similar.

@cloud-fan cloud-fan deleted the direct-committer branch September 15, 2015 17:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants