Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
149ea3e
ShuffleWriters write to temp file, then go through
squito Oct 21, 2015
cf8118e
assorted cleanup
squito Oct 22, 2015
ea1ae07
style
squito Oct 22, 2015
9356c67
fix compilation in StoragePerfTester
squito Oct 22, 2015
2b42eb5
mima
squito Oct 22, 2015
32d4b3b
update UnsafeShuffleWriterSuite
squito Oct 22, 2015
550e198
fix imports
squito Oct 22, 2015
4ff98bf
should work now, but needs cleanup
squito Oct 23, 2015
4a19702
only consider tmp files that exist; only consider the dest pre-existi…
squito Oct 23, 2015
89063dd
cleanup
squito Oct 23, 2015
4145651
ShuffleOutputCoordinatorSuite
squito Oct 23, 2015
2089e12
cleanup
squito Oct 23, 2015
2e9bbaa
Merge branch 'master' into SPARK-8029_first_wins
squito Oct 26, 2015
4cd423e
write the winning mapStatus to disk, so subsequent tasks can respond …
squito Oct 26, 2015
dc4b7f6
fix imports
squito Oct 26, 2015
b7a0981
fixes
squito Oct 26, 2015
830a097
shuffle writers must write always write all tmp files
squito Oct 27, 2015
5d11eca
more fixes for zero-sized blocks
squito Oct 27, 2015
3f5af9c
dont make ShuffleWriter return mapStatusFile
squito Oct 27, 2015
4b7c71a
rather than requiring all tmp files to exist, just write a zero-lengt…
squito Oct 27, 2015
eabf978
update test case
squito Oct 27, 2015
5bbeec3
minor cleanup
squito Oct 27, 2015
e141d82
test that shuffle output files are always the same
squito Oct 27, 2015
4df7955
fix compression settings of tmp files; minor cleanup
squito Oct 27, 2015
dc076b8
fix tests
squito Oct 27, 2015
cfdfd2c
review feedback
squito Nov 3, 2015
86f468a
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 4, 2015
5c8b247
fix imports
squito Nov 4, 2015
4d66df1
fix more imports
squito Nov 4, 2015
e59df41
couple more nits ...
squito Nov 4, 2015
c206fc5
minor cleanup
squito Nov 5, 2015
c0edff1
style
squito Nov 5, 2015
da33519
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 11, 2015
c0b93a5
create temporary files in same location as destination files
squito Nov 11, 2015
9d0d9d9
no more @VisibleForTesting
squito Nov 11, 2015
80e037d
unused import
squito Nov 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix tests
  • Loading branch information
squito committed Oct 27, 2015
commit dc076b83f892723fe614e80b42da562b5ebf8d9f
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class UnsafeShuffleWriterSuite {
TaskMemoryManager taskMemoryManager;
final HashPartitioner hashPartitioner = new HashPartitioner(NUM_PARTITITONS);
File mergedOutputFile;
File indexFile;
File mapStatusFile;
File tempDir;
long[] partitionSizesInMergedFile;
Expand Down Expand Up @@ -106,9 +107,11 @@ public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
tempDir = Utils.createTempDir("test", "test");
mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir);
indexFile = File.createTempFile("shuffle",".index", tempDir);
mapStatusFile = File.createTempFile("shuffle", ".mapstatus", tempDir);
// the ShuffleOutputCoordinator requires that this file does not exist
mergedOutputFile.delete();
indexFile.delete();
mapStatusFile.delete();
partitionSizesInMergedFile = null;
tmpShuffleFilesCreated.clear();
Expand Down Expand Up @@ -171,6 +174,7 @@ public OutputStream answer(InvocationOnMock invocation) throws Throwable {
when(blockManager.shuffleServerId()).thenReturn(BlockManagerId$.MODULE$.apply("1", "a.b.c", 1));

when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile);
when(shuffleBlockResolver.getIndexFile(anyInt(), anyInt())).thenReturn(indexFile);
doAnswer(new Answer<File>() {
@Override
public File answer(InvocationOnMock invocationOnMock) throws Throwable {
Expand All @@ -191,18 +195,6 @@ public Tuple2<TempShuffleBlockId, File> answer(
}
});

when(diskBlockManager.getFile(any(BlockId.class))).thenAnswer(
new Answer<File>() {
@Override
public File answer(InvocationOnMock invocationOnMock) throws Throwable {
File f = File.createTempFile("shuffleFile",".index", tempDir);
// the ShuffleOutputCoordinator requires that this file does not exist
f.delete();
return f;
}
}
);

when(taskContext.taskMetrics()).thenReturn(taskMetrics);
when(taskContext.internalMetricsToAccumulators()).thenReturn(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
when(dependency.serializer).thenReturn(Some(new JavaSerializer(conf)))
when(taskContext.taskMetrics()).thenReturn(taskMetrics)
when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile)
when(blockResolver.getIndexFile(0, 0)).thenReturn(indexFile)
// the index file will be empty, but that is fine for these tests
when(blockResolver.writeIndexFile(anyInt(), anyInt(), any())).thenAnswer(new Answer[File] {
override def answer(invocationOnMock: InvocationOnMock): File = {
Expand Down Expand Up @@ -120,12 +121,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
when(diskBlockManager.getFile(any[BlockId])).thenAnswer(
new Answer[File] {
override def answer(invocation: InvocationOnMock): File = {
val blk = invocation.getArguments.head.asInstanceOf[BlockId]
if (blk == new ShuffleIndexBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID)) {
indexFile
} else {
blockIdToFileMap.get(invocation.getArguments.head.asInstanceOf[BlockId]).get
}
blockIdToFileMap.get(invocation.getArguments.head.asInstanceOf[BlockId]).get
}
})
}
Expand Down