Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
correct annotation and code clean
  • Loading branch information
WinkerDu committed Aug 2, 2020
commit 269f09ba0348adfd5177ccdd9fe6487ee004c783
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,23 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
* dynamically, i.e., for speculative tasks, we first write files
* to task attempt paths under a staging directory, e.g.
* /path/to/staging/.spark-staging-{jobId}/_temporary/
* dynamically, i.e., we first write files to task attempt paths
* under a staging directory, e.g.
* /path/to/outputPath/.spark-staging-{jobId}/_temporary/
* {appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet.
* When committing the job, we first move files from task attempt
* 1. When [[FileOutputCommitter]] algorithm version set to 1,
* we firstly move files from task attempt
* paths to corresponding partition directories under the staging
* directory, e.g.
* /path/to/staging/.spark-staging-{jobId}/a=1/b=1.
* directory during committing job, e.g.
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
* Secondly, move the partition directories under staging
* directory to partition directories under destination path,
* e.g. /path/to/destination/a=1/b=1
* directory to destination path, e.g. /path/to/outputPath/a=1/b=1
* 2. When [[FileOutputCommitter]] algorithm version set to 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this isn't the normal behavior of the algorithm version 2, right? Normally it writes the task files directly to the final output location. The whole point of algorithm 2 is to prevent all of the extra moves on the driver at the end of the job. For large jobs this time can be huge. I'm not sure the benefit here of algorithm 2 because that is all happening distributed on each task?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v2 isn't safe in the presence of failures during task commit; at least here if the entire job fails then, provided job ids are unique, the output doesn't become visible. it is essentially a second attempt at the v1 rename algorithm with (hopefully) smaller output datasets.

* committing tasks directly move files to staging directory,
* e.g. /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
* Then move this partition directories under staging directory
* to destination path during job committing, e.g.
* /path/to/outputPath/a=1/b=1
Copy link
Member

@Ngone51 Ngone51 Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your detailed explanation. But since these are underlying details rather than implemented by Spark, I think it's better to simplify it. e.g., we first..., then move ... from ... to... and move to ... at the end during the job committing.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

*/
class HadoopMapReduceCommitProtocol(
jobId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ case class InsertIntoHadoopFsRelationCommand(
val committerOutputPath = if (dynamicPartitionOverwrite) {
FileCommitProtocol.getStagingDir(outputPath.toString, jobId)
.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else qualifiedOutputPath
} else {
qualifiedOutputPath
}

val updatedPartitionPaths =
FileFormatWriter.write(
Expand Down