Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
149ea3e
ShuffleWriters write to temp file, then go through
squito Oct 21, 2015
cf8118e
assorted cleanup
squito Oct 22, 2015
ea1ae07
style
squito Oct 22, 2015
9356c67
fix compilation in StoragePerfTester
squito Oct 22, 2015
2b42eb5
mima
squito Oct 22, 2015
32d4b3b
update UnsafeShuffleWriterSuite
squito Oct 22, 2015
550e198
fix imports
squito Oct 22, 2015
4ff98bf
should work now, but needs cleanup
squito Oct 23, 2015
4a19702
only consider tmp files that exist; only consider the dest pre-existi…
squito Oct 23, 2015
89063dd
cleanup
squito Oct 23, 2015
4145651
ShuffleOutputCoordinatorSuite
squito Oct 23, 2015
2089e12
cleanup
squito Oct 23, 2015
2e9bbaa
Merge branch 'master' into SPARK-8029_first_wins
squito Oct 26, 2015
4cd423e
write the winning mapStatus to disk, so subsequent tasks can respond …
squito Oct 26, 2015
dc4b7f6
fix imports
squito Oct 26, 2015
b7a0981
fixes
squito Oct 26, 2015
830a097
shuffle writers must write always write all tmp files
squito Oct 27, 2015
5d11eca
more fixes for zero-sized blocks
squito Oct 27, 2015
3f5af9c
dont make ShuffleWriter return mapStatusFile
squito Oct 27, 2015
4b7c71a
rather than requiring all tmp files to exist, just write a zero-lengt…
squito Oct 27, 2015
eabf978
update test case
squito Oct 27, 2015
5bbeec3
minor cleanup
squito Oct 27, 2015
e141d82
test that shuffle output files are always the same
squito Oct 27, 2015
4df7955
fix compression settings of tmp files; minor cleanup
squito Oct 27, 2015
dc076b8
fix tests
squito Oct 27, 2015
cfdfd2c
review feedback
squito Nov 3, 2015
86f468a
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 4, 2015
5c8b247
fix imports
squito Nov 4, 2015
4d66df1
fix more imports
squito Nov 4, 2015
e59df41
couple more nits ...
squito Nov 4, 2015
c206fc5
minor cleanup
squito Nov 5, 2015
c0edff1
style
squito Nov 5, 2015
da33519
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 11, 2015
c0b93a5
create temporary files in same location as destination files
squito Nov 11, 2015
9d0d9d9
no more @VisibleForTesting
squito Nov 11, 2015
80e037d
unused import
squito Nov 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix compression settings of tmp files; minor cleanup
  • Loading branch information
squito committed Oct 27, 2015
commit 4df7955db9c46b1549b3c0f4e238f5be7970c337
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ public BypassMergeSortShuffleWriter(
@Override
public Seq<Tuple2<File, File>> write(Iterator<Product2<K, V>> records) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a javadoc explaining what the return value is? It's particularly cryptic because it's uses tuples; maybe it would be better to create a helper type where the fields have proper names.

assert (partitionWriters == null);
final File indexFile = blockManager.diskBlockManager().getFile(new ShuffleIndexBlockId(
shuffleId, mapId, IndexShuffleBlockResolver$.MODULE$.NOOP_REDUCE_ID())
);
final File indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId);
final File dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.ShuffleIndexBlockId;
import org.apache.spark.storage.ShuffleMapStatusBlockId;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;

Expand Down Expand Up @@ -234,9 +233,7 @@ Seq<Tuple2<File, File>> closeAndWriteOutput() throws IOException {
final File tmpIndexFile = shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
final File dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File indexFile = blockManager.diskBlockManager().getFile(
new ShuffleIndexBlockId(shuffleId, mapId, IndexShuffleBlockResolver$.MODULE$.NOOP_REDUCE_ID())
);
final File indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId);

return JavaConverters.asScalaBufferConverter(Arrays.asList(
new Tuple2<>(tmpIndexFile, indexFile),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
Array.tabulate[(DiskBlockObjectWriter, File)](numReducers) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
val (_, tmpBlockFile) = blockManager.diskBlockManager.createTempLocalBlock()
val (_, tmpBlockFile) = blockManager.diskBlockManager.createTempShuffleBlock()
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
if (blockFile.exists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

private def getIndexFile(shuffleId: Int, mapId: Int): File = {
private[shuffle] def getIndexFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,12 @@ private[spark] class SortShuffleWriter[K, V, C](
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val (_, tmpDataFile) = blockManager.diskBlockManager.createTempShuffleBlock()
val (_, tmpDataFile) = blockManager.diskBlockManager.createUncompressedTempShuffleBlock()
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmpDataFile)
val tmpIndexFile = shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
val dataFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val indexFile = blockManager.diskBlockManager.getFile(
ShuffleIndexBlockId(handle.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID))
val indexFile = shuffleBlockResolver.getIndexFile(dep.shuffleId, mapId)

mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
Seq(
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId {
override def name: String = "temp_shuffle_" + id
}

/**
* Id associated with temporary shuffle data managed as blocks, which is not
* compressed, regardless of spark.shuffle.compress and spark.shuffle.spill.compress. Used
* for the temporary location of data files until they are moved into place by the
* [[org.apache.spark.shuffle.ShuffleOutputCoordinator]]. Not serializable.
*/
private[spark] case class TempUncompressedShuffleBlockId(id: UUID) extends BlockId {
override def name: String = "temp_uncompressed_shuffle_" + id
}

// Intended only for testing purposes
private[spark] case class TestBlockId(id: String) extends BlockId {
override def name: String = "test_" + id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,20 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
(blockId, getFile(blockId))
}

/**
* Produces a unique block id and File suitable for storing shuffled data files, which are
* uncompressed, before they are moved to their final location by the
* [[org.apache.spark.shuffle.ShuffleOutputCoordinator]]
*/
def createUncompressedTempShuffleBlock(): (TempUncompressedShuffleBlockId, File) = {
var blockId = new TempUncompressedShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempUncompressedShuffleBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stray empty line

}

/**
* Create local directories for storing block data. These directories are
* located inside configured local directories and won't
Expand Down