Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
149ea3e
ShuffleWriters write to temp file, then go through
squito Oct 21, 2015
cf8118e
assorted cleanup
squito Oct 22, 2015
ea1ae07
style
squito Oct 22, 2015
9356c67
fix compilation in StoragePerfTester
squito Oct 22, 2015
2b42eb5
mima
squito Oct 22, 2015
32d4b3b
update UnsafeShuffleWriterSuite
squito Oct 22, 2015
550e198
fix imports
squito Oct 22, 2015
4ff98bf
should work now, but needs cleanup
squito Oct 23, 2015
4a19702
only consider tmp files that exist; only consider the dest pre-existi…
squito Oct 23, 2015
89063dd
cleanup
squito Oct 23, 2015
4145651
ShuffleOutputCoordinatorSuite
squito Oct 23, 2015
2089e12
cleanup
squito Oct 23, 2015
2e9bbaa
Merge branch 'master' into SPARK-8029_first_wins
squito Oct 26, 2015
4cd423e
write the winning mapStatus to disk, so subsequent tasks can respond …
squito Oct 26, 2015
dc4b7f6
fix imports
squito Oct 26, 2015
b7a0981
fixes
squito Oct 26, 2015
830a097
shuffle writers must write always write all tmp files
squito Oct 27, 2015
5d11eca
more fixes for zero-sized blocks
squito Oct 27, 2015
3f5af9c
dont make ShuffleWriter return mapStatusFile
squito Oct 27, 2015
4b7c71a
rather than requiring all tmp files to exist, just write a zero-lengt…
squito Oct 27, 2015
eabf978
update test case
squito Oct 27, 2015
5bbeec3
minor cleanup
squito Oct 27, 2015
e141d82
test that shuffle output files are always the same
squito Oct 27, 2015
4df7955
fix compression settings of tmp files; minor cleanup
squito Oct 27, 2015
dc076b8
fix tests
squito Oct 27, 2015
cfdfd2c
review feedback
squito Nov 3, 2015
86f468a
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 4, 2015
5c8b247
fix imports
squito Nov 4, 2015
4d66df1
fix more imports
squito Nov 4, 2015
e59df41
couple more nits ...
squito Nov 4, 2015
c206fc5
minor cleanup
squito Nov 5, 2015
c0edff1
style
squito Nov 5, 2015
da33519
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 11, 2015
c0b93a5
create temporary files in same location as destination files
squito Nov 11, 2015
9d0d9d9
no more @VisibleForTesting
squito Nov 11, 2015
80e037d
unused import
squito Nov 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rather than requiring all tmp files to exist, just write a zero-lengt…
…h dest file either way
  • Loading branch information
squito committed Oct 27, 2015
commit 4b7c71a938d69be93baecb8ce320a2151b7a4658
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ private[spark] object ShuffleOutputCoordinator extends Logging {
* destinations, and return (true, the given MapStatus). If all destination files exist, then
* delete all temporary files, and return (false, the MapStatus from previously committed shuffle
* output).

* @param shuffleId
* @param partitionId
*
* Note that this will write to all destination files. If the tmp file is missing, then a
* zero-length destination file will be created. This is so the ShuffleOutputCoordinator can work
* even when there is a non-determinstic data, where the output exists in one attempt, but is
* empty in another attempt.
*
* @param tmpToDest Seq of (temporary, destination) file pairs
* @param mapStatus the [[MapStatus]] for the output already written to the the temporary files
* @return pair of (true iff the set of temporary files was moved to the destination, the
* MapStatus of the winn
* @return pair of: (1) true iff the set of temporary files was moved to the destination and (2)
* the MapStatus of the committed attempt.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stray empty line.

*/
def commitOutputs(
Expand All @@ -66,8 +69,6 @@ private[spark] object ShuffleOutputCoordinator extends Logging {
mapStatus: MapStatus,
mapStatusFile: File,
serializer: SerializerInstance): (Boolean, MapStatus) = synchronized {
tmpToDest.foreach { case (tmp, _) => require(tmp.exists(), s"Cannot commit non-existent " +
s"shuffle output $tmp -- must be at least a zero-length file.")}
val destAlreadyExists = tmpToDest.forall{_._2.exists()} && mapStatusFile.exists()
if (!destAlreadyExists) {
tmpToDest.foreach { case (tmp, dest) =>
Expand All @@ -78,7 +79,14 @@ private[spark] object ShuffleOutputCoordinator extends Logging {
if (dest.exists()) {
dest.delete()
}
tmp.renameTo(dest)
if (tmp.exists()) {
tmp.renameTo(dest)
} else {
// we always create the destination files, so this works correctly even when the
// input data is non-deterministic (potentially empty in one iteration, and non-empty
// in another)
dest.createNewFile()
}
}
val out = serializer.serializeStream(new FileOutputStream(mapStatusFile))
out.writeObject(mapStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,6 @@ private[spark] class DiskBlockObjectWriter(
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
} else {
if (!file.exists()) {
// SPARK-8029 -- we need to write a zero-length file so we can commit the same set of files
// on all attempts (even if the data is non-deterministic)
file.createNewFile()
}
finalPosition = file.length()
}
commitAndCloseHasBeenCalled = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,12 +679,6 @@ private[spark] class ExternalSorter[K, V, C](
}
}

// SPARK-8029 the ShuffleOutputCoordinator requires all shuffle output files to always exist,
// even if they are zero-length
if (!outputFile.exists()) {
outputFile.createNewFile()
}

context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.internalMetricsToAccumulators(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ShuffleOutputCoordinatorSuite extends SparkFunSuite with BeforeAndAfterEac
var tempDir: File = _
var mapStatusFile: File = _
// use the "port" as a way to distinguish mapstatuses, just for the test
def mapStatus(id: Int) = MapStatus(BlockManagerId("1", "a.b.c", id), Array(0L, 1L))
def mapStatus(id: Int): MapStatus = MapStatus(BlockManagerId("1", "a.b.c", id), Array(0L, 1L))
def ser: SerializerInstance = new JavaSerializer(new SparkConf()).newInstance()

override def beforeEach(): Unit = {
Expand Down Expand Up @@ -114,11 +114,23 @@ class ShuffleOutputCoordinatorSuite extends SparkFunSuite with BeforeAndAfterEac
secondAttempt.foreach{ case (t, d) => assert(!t.exists())}
}

test("no missing tmp files") {
test("missing tmp files become zero-length destination files") {
val extraDestFile = new File(tempDir, "blah")
val firstAttempt = generateAttempt(0) ++
Seq(new File(tempDir, "bogus") -> new File(tempDir, "blah"))
val ex = intercept[IllegalArgumentException] {commit(firstAttempt, 1)}
assert(ex.getMessage.contains("Cannot commit non-existent shuffle output"))
Seq(new File(tempDir, "bogus") -> extraDestFile)
assert(commit(firstAttempt, 1)._1)
verifyFiles(0)
assert(extraDestFile.exists())
assert(extraDestFile.length() === 0)

// if we attempt the move again and *only* the missing tmp file is missing, we still
// do the move
extraDestFile.delete()
val secondAttempt = generateAttempt(1) ++
Seq(new File(tempDir, "flippy") -> extraDestFile)
assert(commit(secondAttempt, 2)._1)
verifyFiles(1)
assert(extraDestFile.exists())
assert(extraDestFile.length() === 0)
}

}