Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
149ea3e
ShuffleWriters write to temp file, then go through
squito Oct 21, 2015
cf8118e
assorted cleanup
squito Oct 22, 2015
ea1ae07
style
squito Oct 22, 2015
9356c67
fix compilation in StoragePerfTester
squito Oct 22, 2015
2b42eb5
mima
squito Oct 22, 2015
32d4b3b
update UnsafeShuffleWriterSuite
squito Oct 22, 2015
550e198
fix imports
squito Oct 22, 2015
4ff98bf
should work now, but needs cleanup
squito Oct 23, 2015
4a19702
only consider tmp files that exist; only consider the dest pre-existi…
squito Oct 23, 2015
89063dd
cleanup
squito Oct 23, 2015
4145651
ShuffleOutputCoordinatorSuite
squito Oct 23, 2015
2089e12
cleanup
squito Oct 23, 2015
2e9bbaa
Merge branch 'master' into SPARK-8029_first_wins
squito Oct 26, 2015
4cd423e
write the winning mapStatus to disk, so subsequent tasks can respond …
squito Oct 26, 2015
dc4b7f6
fix imports
squito Oct 26, 2015
b7a0981
fixes
squito Oct 26, 2015
830a097
shuffle writers must write always write all tmp files
squito Oct 27, 2015
5d11eca
more fixes for zero-sized blocks
squito Oct 27, 2015
3f5af9c
dont make ShuffleWriter return mapStatusFile
squito Oct 27, 2015
4b7c71a
rather than requiring all tmp files to exist, just write a zero-lengt…
squito Oct 27, 2015
eabf978
update test case
squito Oct 27, 2015
5bbeec3
minor cleanup
squito Oct 27, 2015
e141d82
test that shuffle output files are always the same
squito Oct 27, 2015
4df7955
fix compression settings of tmp files; minor cleanup
squito Oct 27, 2015
dc076b8
fix tests
squito Oct 27, 2015
cfdfd2c
review feedback
squito Nov 3, 2015
86f468a
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 4, 2015
5c8b247
fix imports
squito Nov 4, 2015
4d66df1
fix more imports
squito Nov 4, 2015
e59df41
couple more nits ...
squito Nov 4, 2015
c206fc5
minor cleanup
squito Nov 5, 2015
c0edff1
style
squito Nov 5, 2015
da33519
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 11, 2015
c0b93a5
create temporary files in same location as destination files
squito Nov 11, 2015
9d0d9d9
no more @VisibleForTesting
squito Nov 11, 2015
80e037d
unused import
squito Nov 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update UnsafeShuffleWriterSuite
  • Loading branch information
squito committed Oct 22, 2015
commit 32d4b3b69651263922b317db54bef992d67a3a3c
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ public long getPeakMemoryUsedBytes() {
* This convenience method should only be called in test code.
*/
@VisibleForTesting
public void write(Iterator<Product2<K, V>> records) throws IOException {
write(JavaConverters.asScalaIteratorConverter(records).asScala());
public Seq<Tuple2<File, File>> write(Iterator<Product2<K, V>> records) throws IOException {
return write(JavaConverters.asScalaIteratorConverter(records).asScala());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import scala.*;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Iterators;
Expand All @@ -48,6 +49,7 @@
import org.apache.spark.io.LZ4CompressionCodec;
import org.apache.spark.io.LZFCompressionCodec;
import org.apache.spark.io.SnappyCompressionCodec;
import org.apache.spark.shuffle.ShuffleOutputCoordinator;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.network.util.LimitedInputStream;
Expand All @@ -71,7 +73,7 @@ public class UnsafeShuffleWriterSuite {
File mergedOutputFile;
File tempDir;
long[] partitionSizesInMergedFile;
final LinkedList<File> spillFilesCreated = new LinkedList<File>();
final LinkedList<File> tmpShuffleFilesCreated = new LinkedList<File>();
SparkConf conf;
final Serializer serializer = new KryoSerializer(new SparkConf());
TaskMetrics taskMetrics;
Expand Down Expand Up @@ -109,8 +111,10 @@ public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
tempDir = Utils.createTempDir("test", "test");
mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir);
// the ShuffleOutputCoordinator requires that this file does not exist
mergedOutputFile.delete();
partitionSizesInMergedFile = null;
spillFilesCreated.clear();
tmpShuffleFilesCreated.clear();
conf = new SparkConf().set("spark.buffer.pageSize", "128m");
taskMetrics = new TaskMetrics();

Expand Down Expand Up @@ -169,11 +173,11 @@ public OutputStream answer(InvocationOnMock invocation) throws Throwable {
);

when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile);
doAnswer(new Answer<Void>() {
doAnswer(new Answer<File>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
public File answer(InvocationOnMock invocationOnMock) throws Throwable {
partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
return null;
return diskBlockManager.createTempShuffleBlock()._2();
}
}).when(shuffleBlockResolver).writeIndexFile(anyInt(), anyInt(), any(long[].class));

Expand All @@ -184,11 +188,23 @@ public Tuple2<TempShuffleBlockId, File> answer(
InvocationOnMock invocationOnMock) throws Throwable {
TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID());
File file = File.createTempFile("spillFile", ".spill", tempDir);
spillFilesCreated.add(file);
tmpShuffleFilesCreated.add(file);
return Tuple2$.MODULE$.apply(blockId, file);
}
});

when(diskBlockManager.getFile(any(BlockId.class))).thenAnswer(
new Answer<File>() {
@Override
public File answer(InvocationOnMock invocationOnMock) throws Throwable {
File f = File.createTempFile("shuffleFile",".index", tempDir);
// the ShuffleOutputCoordinator requires that this file does not exist
f.delete();
return f;
}
}
);

when(taskContext.taskMetrics()).thenReturn(taskMetrics);
when(taskContext.internalMetricsToAccumulators()).thenReturn(null);

Expand All @@ -212,7 +228,7 @@ private UnsafeShuffleWriter<Object, Object> createWriter(
}

private void assertSpillFilesWereCleanedUp() {
for (File spillFile : spillFilesCreated) {
for (File spillFile : tmpShuffleFilesCreated) {
assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up",
spillFile.exists());
}
Expand Down Expand Up @@ -274,8 +290,9 @@ class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Obje
@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
Seq<Tuple2<File, File>> files = writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
ShuffleOutputCoordinator.commitOutputs(0, 0, files);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
Expand All @@ -294,8 +311,9 @@ public void writeWithoutSpilling() throws Exception {
dataToWrite.add(new Tuple2<Object, Object>(i, i));
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(dataToWrite.iterator());
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
final Option<MapStatus> mapStatus = writer.stop(true);
ShuffleOutputCoordinator.commitOutputs(0, 0, files);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());

Expand Down Expand Up @@ -339,11 +357,13 @@ private void testMergingSpills(
writer.forceSorterToSpill();
writer.insertRecordIntoSorter(dataToWrite.get(4));
writer.insertRecordIntoSorter(dataToWrite.get(5));
writer.closeAndWriteOutput();
Seq<Tuple2<File, File>> files = writer.closeAndWriteOutput();
final Option<MapStatus> mapStatus = writer.stop(true);
ShuffleOutputCoordinator.commitOutputs(0, 0, files);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertEquals(2, spillFilesCreated.size());
// this includes the tmp index & data files, before the output is committed
assertEquals(4, tmpShuffleFilesCreated.size());

long sumOfPartitionSizes = 0;
for (long size: partitionSizesInMergedFile) {
Expand Down Expand Up @@ -416,10 +436,12 @@ public void writeEnoughDataToTriggerSpill() throws Exception {
for (int i = 0; i < 128 + 1; i++) {
dataToWrite.add(new Tuple2<Object, Object>(i, bigByteArray));
}
writer.write(dataToWrite.iterator());
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
verify(shuffleMemoryManager, times(5)).tryToAcquire(anyLong());
assertEquals(2, spillFilesCreated.size());
// this includes the tmp index & data files, before the output is committed
assertEquals(4, tmpShuffleFilesCreated.size());
writer.stop(true);
ShuffleOutputCoordinator.commitOutputs(0, 0, files);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
Expand All @@ -442,10 +464,12 @@ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exce
for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE; i++) {
dataToWrite.add(new Tuple2<Object, Object>(i, i));
}
writer.write(dataToWrite.iterator());
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
verify(shuffleMemoryManager, times(5)).tryToAcquire(anyLong());
assertEquals(2, spillFilesCreated.size());
// this includes the tmp index & data files, before the output is committed
assertEquals(4, tmpShuffleFilesCreated.size());
writer.stop(true);
ShuffleOutputCoordinator.commitOutputs(0, 0, files);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
Expand All @@ -464,8 +488,9 @@ public void writeRecordsThatAreBiggerThanDiskWriteBufferSize() throws Exception
final byte[] bytes = new byte[(int) (ShuffleExternalSorter.DISK_WRITE_BUFFER_SIZE * 2.5)];
new Random(42).nextBytes(bytes);
dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(bytes)));
writer.write(dataToWrite.iterator());
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
writer.stop(true);
ShuffleOutputCoordinator.commitOutputs(0, 0, files);
assertEquals(
HashMultiset.create(dataToWrite),
HashMultiset.create(readRecordsFromFile()));
Expand All @@ -485,8 +510,9 @@ public void writeRecordsThatAreBiggerThanMaxRecordSize() throws Exception {
final byte[] exceedsMaxRecordSize = new byte[writer.maxRecordSizeBytes() + 1];
new Random(42).nextBytes(exceedsMaxRecordSize);
dataToWrite.add(new Tuple2<Object, Object>(3, ByteBuffer.wrap(exceedsMaxRecordSize)));
writer.write(dataToWrite.iterator());
Seq<Tuple2<File, File>> files = writer.write(dataToWrite.iterator());
writer.stop(true);
ShuffleOutputCoordinator.commitOutputs(0, 0, files);
assertEquals(
HashMultiset.create(dataToWrite),
HashMultiset.create(readRecordsFromFile()));
Expand Down