Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
149ea3e
ShuffleWriters write to temp file, then go through
squito Oct 21, 2015
cf8118e
assorted cleanup
squito Oct 22, 2015
ea1ae07
style
squito Oct 22, 2015
9356c67
fix compilation in StoragePerfTester
squito Oct 22, 2015
2b42eb5
mima
squito Oct 22, 2015
32d4b3b
update UnsafeShuffleWriterSuite
squito Oct 22, 2015
550e198
fix imports
squito Oct 22, 2015
4ff98bf
should work now, but needs cleanup
squito Oct 23, 2015
4a19702
only consider tmp files that exist; only consider the dest pre-existi…
squito Oct 23, 2015
89063dd
cleanup
squito Oct 23, 2015
4145651
ShuffleOutputCoordinatorSuite
squito Oct 23, 2015
2089e12
cleanup
squito Oct 23, 2015
2e9bbaa
Merge branch 'master' into SPARK-8029_first_wins
squito Oct 26, 2015
4cd423e
write the winning mapStatus to disk, so subsequent tasks can respond …
squito Oct 26, 2015
dc4b7f6
fix imports
squito Oct 26, 2015
b7a0981
fixes
squito Oct 26, 2015
830a097
shuffle writers must write always write all tmp files
squito Oct 27, 2015
5d11eca
more fixes for zero-sized blocks
squito Oct 27, 2015
3f5af9c
dont make ShuffleWriter return mapStatusFile
squito Oct 27, 2015
4b7c71a
rather than requiring all tmp files to exist, just write a zero-lengt…
squito Oct 27, 2015
eabf978
update test case
squito Oct 27, 2015
5bbeec3
minor cleanup
squito Oct 27, 2015
e141d82
test that shuffle output files are always the same
squito Oct 27, 2015
4df7955
fix compression settings of tmp files; minor cleanup
squito Oct 27, 2015
dc076b8
fix tests
squito Oct 27, 2015
cfdfd2c
review feedback
squito Nov 3, 2015
86f468a
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 4, 2015
5c8b247
fix imports
squito Nov 4, 2015
4d66df1
fix more imports
squito Nov 4, 2015
e59df41
couple more nits ...
squito Nov 4, 2015
c206fc5
minor cleanup
squito Nov 5, 2015
c0edff1
style
squito Nov 5, 2015
da33519
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 11, 2015
c0b93a5
create temporary files in same location as destination files
squito Nov 11, 2015
9d0d9d9
no more @VisibleForTesting
squito Nov 11, 2015
80e037d
unused import
squito Nov 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import javax.annotation.Nullable;

import scala.None$;
Expand All @@ -41,12 +42,15 @@
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.shuffle.IndexShuffleBlockResolver$;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.*;
import org.apache.spark.util.Utils;
import scala.collection.JavaConverters;
import scala.collection.Seq;

/**
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
Expand Down Expand Up @@ -121,13 +125,19 @@ public BypassMergeSortShuffleWriter(
}

@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
public Seq<Tuple2<File, File>> write(Iterator<Product2<K, V>> records) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a javadoc explaining what the return value is? It's particularly cryptic because it's uses tuples; maybe it would be better to create a helper type where the fields have proper names.

assert (partitionWriters == null);
final File indexFile = blockManager.diskBlockManager().getFile(new ShuffleIndexBlockId(
shuffleId, mapId, IndexShuffleBlockResolver$.MODULE$.NOOP_REDUCE_ID())
);
final File dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
File tmpIndexFile = shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
return JavaConverters.asScalaBufferConverter(Arrays.asList(
new Tuple2<>(tmpIndexFile, indexFile)
)).asScala();
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
Expand Down Expand Up @@ -155,10 +165,16 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
writer.commitAndClose();
}

partitionLengths =
writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
File tmpDataFile = blockManager.diskBlockManager().createTempShuffleBlock()._2();

partitionLengths = writePartitionedFile(tmpDataFile);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type of tmp block used here doesn't matter, since writePartitionedFile doesn't go through blockManager.getDiskWriter. However, I'm a bit confused how this worked before. The original partition files might be compressed, and those bytes just get copied to the final data file, though data files seem like they are never compressed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If compression is employed, then each region of the single "partitioned" file is compressed separately. So the concatenation here should be correct.

File tmpIndexFile = shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return JavaConverters.asScalaBufferConverter(Arrays.asList(
new Tuple2<>(tmpIndexFile, indexFile),
new Tuple2<>(tmpDataFile, dataFile)
)).asScala();

}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import javax.annotation.Nullable;
import java.io.*;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Iterator;

import scala.Option;
import scala.Product2;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
Expand All @@ -49,9 +52,11 @@
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.IndexShuffleBlockResolver$;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.ShuffleIndexBlockId;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
Expand Down Expand Up @@ -159,12 +164,12 @@ public long getPeakMemoryUsedBytes() {
* This convenience method should only be called in test code.
*/
@VisibleForTesting
public void write(Iterator<Product2<K, V>> records) throws IOException {
write(JavaConverters.asScalaIteratorConverter(records).asScala());
public Seq<Tuple2<File, File>> write(Iterator<Product2<K, V>> records) throws IOException {
return write(JavaConverters.asScalaIteratorConverter(records).asScala());
}

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
public Seq<Tuple2<File, File>> write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
Expand All @@ -173,8 +178,9 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
closeAndWriteOutput();
Seq<Tuple2<File, File>> result = closeAndWriteOutput();
success = true;
return result;
} finally {
if (sorter != null) {
try {
Expand Down Expand Up @@ -209,25 +215,37 @@ private void open() throws IOException {
}

@VisibleForTesting
void closeAndWriteOutput() throws IOException {
Seq<Tuple2<File, File>> closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
final File tmpDataFile;
try {
partitionLengths = mergeSpills(spills);
Tuple2<File, long[]> t = mergeSpills(spills);
partitionLengths = t._2();
tmpDataFile = t._1();
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
File tmpIndexFile = shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
File dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File indexFile = blockManager.diskBlockManager().getFile(
new ShuffleIndexBlockId(shuffleId, mapId, IndexShuffleBlockResolver$.MODULE$.NOOP_REDUCE_ID())
);

return JavaConverters.asScalaBufferConverter(Arrays.asList(
new Tuple2<>(tmpIndexFile, indexFile),
new Tuple2<>(tmpDataFile, dataFile)
)).asScala();
}

@VisibleForTesting
Expand Down Expand Up @@ -259,8 +277,12 @@ void forceSorterToSpill() throws IOException {
*
* @return the partition lengths in the merged file.
*/
private long[] mergeSpills(SpillInfo[] spills) throws IOException {
final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
private Tuple2<File, long[]> mergeSpills(SpillInfo[] spills) throws IOException {
final File outputFile = blockManager.diskBlockManager().createTempShuffleBlock()._2();
return new Tuple2<>(outputFile, mergeSpills(spills, outputFile));
}

private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.language.existentials
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.shuffle.{ShuffleOutputCoordinator, ShuffleWriter}

/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
Expand Down Expand Up @@ -70,8 +70,12 @@ private[spark] class ShuffleMapTask(
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
val tmpToDestFiles = writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
val mapStatus = writer.stop(success = true).get
// SPARK-8029 make sure only one task on this executor writes the final shuffle files
ShuffleOutputCoordinator.commitOutputs(dep.shuffleId, partitionId, tmpToDestFiles)
mapStatus
} catch {
case e: Exception =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,18 +615,22 @@ private[spark] class TaskSetManager(
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
val task = tasks(index)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
task, Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
// include the partition here b/c on a stage retry, the partition is *not* necessarily
// the same as info.id
logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}}, " +
s"partition ${task.partitionId}) in ${info.duration} ms on executor ${info.executorId} " +
s"(${info.host}) ($tasksSuccessful/$numTasks)")
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.shuffle

import java.io.File
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
Expand All @@ -31,7 +32,7 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH

/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
val writers: Array[DiskBlockObjectWriter]
val writers: Array[(DiskBlockObjectWriter, File)]

/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
def releaseWriters(success: Boolean)
Expand Down Expand Up @@ -80,10 +81,11 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)

val openStartTime = System.nanoTime
val serializerInstance = serializer.newInstance()
val writers: Array[DiskBlockObjectWriter] = {
Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
val writers: Array[(DiskBlockObjectWriter, File)] = {
Array.tabulate[(DiskBlockObjectWriter, File)](numReducers) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
val (_, tmpBlockFile) = blockManager.diskBlockManager.createTempLocalBlock()
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
if (blockFile.exists) {
Expand All @@ -93,8 +95,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
writeMetrics)
blockManager.getDiskWriter(blockId, tmpBlockFile, serializerInstance, bufferSize,
writeMetrics) -> blockFile
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
/**
* Write an index file with the offsets of each block, plus a final offset at the end for the
* end of the output file. This will be used by getBlockData to figure out where each block
* begins and ends.
* begins and ends. Writes to a temp file, and returns that file.
* */
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): File = {
val (_, tmpIndexFile) = blockManager.diskBlockManager.createTempShuffleBlock()
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tmpIndexFile)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
Expand All @@ -90,6 +90,7 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
offset += length
out.writeLong(offset)
}
tmpIndexFile
} {
out.close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle

import java.io.File

import org.apache.spark.Logging

/**
* Ensures that on each executor, there are no conflicting writes to the same shuffle files. It
* implements "first write wins", by atomically moving all shuffle files into their final location,
* only if the files did not already exist. See SPARK-8029
*/
object ShuffleOutputCoordinator extends Logging {

/**
* if all of the destination files do not exist, then move all of the temporary files to their
* destinations. If any destination files exist, then simply delete all temporary files
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stray empty line.

* @param tmpToDest pairs of (temporary, destination) file pairs
* @return
*/
def commitOutputs(
shuffleId: Int,
partitionId: Int, tmpToDest: Seq[(File, File)]): Boolean = synchronized {
logInfo(s"renaming: $tmpToDest")

// HashShuffleWriter might not write any records to some of its files -- that's OK, we only
// move the files that do exist
val toMove = tmpToDest.filter{_._1.exists()}

val destAlreadyExists = toMove.forall(_._2.exists)
if (!destAlreadyExists) {
// if any of the renames fail, delete all the dest files. otherwise, future
// attempts have no hope of succeeding
val renamesSucceeded = toMove.map { case (tmp, dest) =>
if (dest.exists()) {
dest.delete()
}
val r = tmp.renameTo(dest)
if (!r) {
logInfo(s"failed to rename $tmp to $dest. ${tmp.exists()}; ${dest.exists()}")
}
r
}.forall{identity}
if (!renamesSucceeded) {
toMove.foreach { case (tmp, dest) => if (dest.exists()) dest.delete() }
false
} else {
true
}
} else {
logInfo(s"shuffle output for shuffle $shuffleId, partition $partitionId already exists, " +
s"not overwriting. Another task must have created this shuffle output.")
toMove.foreach{ case (tmp, _) => tmp.delete()}
false
}
}
}
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

package org.apache.spark.shuffle

import java.io.IOException
import java.io.{File, IOException}

import org.apache.spark.scheduler.MapStatus

/**
* Obtained inside a map task to write out records to the shuffle system.
*/
private[spark] abstract class ShuffleWriter[K, V] {
/** Write a sequence of records to this task's output */
/**
* Write a sequence of records to this task's output. This should write all data
* to temporary files, but return (temporaryFile, destinationFile) pairs for each
* file written. The temporary files will get moved to their destination or deleted
* by the [[ShuffleOutputCoordinator]]
*/
@throws[IOException]
def write(records: Iterator[Product2[K, V]]): Unit
def write(records: Iterator[Product2[K, V]]): Seq[(File, File)]

/** Close this writer, passing along whether the map completed */
def stop(success: Boolean): Option[MapStatus]
Expand Down
Loading