Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
81d52c5
WIP on UnsafeSorter
JoshRosen Apr 29, 2015
abf7bfe
Add basic test case.
JoshRosen Apr 29, 2015
57a4ea0
Make initialSize configurable in UnsafeSorter
JoshRosen Apr 30, 2015
e900152
Add test for empty iterator in UnsafeSorter
JoshRosen May 1, 2015
767d3ca
Fix invalid range in UnsafeSorter.
JoshRosen May 1, 2015
3db12de
Minor simplification and sanity checks in UnsafeSorter
JoshRosen May 1, 2015
4d2f5e1
WIP
JoshRosen May 1, 2015
8e3ec20
Begin code cleanup.
JoshRosen May 1, 2015
253f13e
More cleanup
JoshRosen May 1, 2015
9c6cf58
Refactor to use DiskBlockObjectWriter.
JoshRosen May 1, 2015
e267cee
Fix compilation of UnsafeSorterSuite
JoshRosen May 1, 2015
e2d96ca
Expand serializer API and use new function to help control when new U…
JoshRosen May 1, 2015
d3cc310
Flag that SparkSqlSerializer2 supports relocation
JoshRosen May 1, 2015
87e721b
Renaming and comments
JoshRosen May 1, 2015
0748458
Port UnsafeShuffleWriter to Java.
JoshRosen May 2, 2015
026b497
Re-use a buffer in UnsafeShuffleWriter
JoshRosen May 2, 2015
1433b42
Store record length as int instead of long.
JoshRosen May 2, 2015
240864c
Remove PrefixComputer and require prefix to be specified as part of i…
JoshRosen May 2, 2015
bfc12d3
Add tests for serializer relocation property.
JoshRosen May 3, 2015
b8a09fe
Back out accidental log4j.properties change
JoshRosen May 3, 2015
c2fca17
Small refactoring of SerializerPropertiesSuite to enable test re-use:
JoshRosen May 3, 2015
f17fa8f
Add missing newline
JoshRosen May 3, 2015
8958584
Fix bug in calculating free space in current page.
JoshRosen May 3, 2015
595923a
Remove some unused variables.
JoshRosen May 3, 2015
5e100b2
Super-messy WIP on external sort
JoshRosen May 4, 2015
2776aca
First passing test for ExternalSorter.
JoshRosen May 4, 2015
f156a8f
Hacky metrics integration; refactor some interfaces.
JoshRosen May 4, 2015
3490512
Misc. cleanup
JoshRosen May 4, 2015
3aeaff7
More refactoring and cleanup; begin cleaning iterator interfaces
JoshRosen May 4, 2015
7ee918e
Re-order imports in tests
JoshRosen May 5, 2015
69232fd
Enable compressible address encoding for off-heap mode.
JoshRosen May 5, 2015
57f1ec0
WIP towards packed record pointers for use in optimized shuffle sort.
JoshRosen May 5, 2015
f480fb2
WIP in mega-refactoring towards shuffle-specific sort.
JoshRosen May 5, 2015
133c8c9
WIP towards testing UnsafeShuffleWriter.
JoshRosen May 5, 2015
4f70141
Fix merging; now passes UnsafeShuffleSuite tests.
JoshRosen May 5, 2015
aaea17b
Add comments to UnsafeShuffleSpillWriter.
JoshRosen May 6, 2015
b674412
Merge remote-tracking branch 'origin/master' into unsafe-sort
JoshRosen May 6, 2015
11feeb6
Update TODOs related to shuffle write metrics.
JoshRosen May 7, 2015
8a6fe52
Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter
JoshRosen May 7, 2015
cfe0ec4
Address a number of minor review comments:
JoshRosen May 7, 2015
e67f1ea
Remove upper type bound in ShuffleWriter interface.
JoshRosen May 7, 2015
5e8cf75
More minor cleanup
JoshRosen May 7, 2015
1ce1300
More minor cleanup
JoshRosen May 7, 2015
b95e642
Refactor and document logic that decides when to spill.
JoshRosen May 7, 2015
9883e30
Merge remote-tracking branch 'origin/master' into unsafe-sort
JoshRosen May 8, 2015
722849b
Add workaround for transferTo() bug in merging code; refactor tests.
JoshRosen May 8, 2015
7cd013b
Begin refactoring to enable proper tests for spilling.
JoshRosen May 9, 2015
9b7ebed
More defensive programming RE: cleaning up spill files and memory aft…
JoshRosen May 9, 2015
e8718dd
Merge remote-tracking branch 'origin/master' into unsafe-sort
JoshRosen May 9, 2015
1929a74
Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolve…
JoshRosen May 9, 2015
01afc74
Actually read data in UnsafeShuffleWriterSuite
JoshRosen May 10, 2015
8f5061a
Strengthen assertion to check partitioning
JoshRosen May 10, 2015
67d25ba
Update Exchange operator's copying logic to account for new shuffle m…
JoshRosen May 10, 2015
fd4bb9e
Use own ByteBufferOutputStream rather than Kryo's
JoshRosen May 10, 2015
9d1ee7c
Fix MiMa excludes for ShuffleWriter change
JoshRosen May 10, 2015
fcd9a3c
Add notes + tests for maximum record / page sizes.
JoshRosen May 10, 2015
27b18b0
That for inserting records AT the max record size.
JoshRosen May 10, 2015
4a01c45
Remove unnecessary log message
JoshRosen May 10, 2015
f780fb1
Add test demonstrating which compression codecs support concatenation.
JoshRosen May 11, 2015
b57c17f
Disable some overly-verbose logs that rendered DEBUG useless.
JoshRosen May 11, 2015
1ef56c7
Revise compression codec support in merger; test cross product of con…
JoshRosen May 11, 2015
b3b1924
Properly implement close() and flush() in DummySerializerInstance.
JoshRosen May 11, 2015
0d4d199
Bump up shuffle.memoryFraction to make tests pass.
JoshRosen May 11, 2015
ec6d626
Add notes on maximum # of supported shuffle partitions.
JoshRosen May 11, 2015
ae538dc
Document UnsafeShuffleManager.
JoshRosen May 11, 2015
ea4f85f
Roll back an unnecessary change in Spillable.
JoshRosen May 11, 2015
1e3ad52
Delete unused ByteBufferOutputStream class.
JoshRosen May 11, 2015
39434f9
Avoid integer multiplication overflow in getMemoryUsage (thanks FindB…
JoshRosen May 11, 2015
e1855e5
Fix a handful of misc. IntelliJ inspections
JoshRosen May 11, 2015
7c953f9
Add test that covers UnsafeShuffleSortDataFormat.swap().
JoshRosen May 11, 2015
8531286
Add tests that automatically trigger spills.
JoshRosen May 11, 2015
69d5899
Remove some unnecessary override vals
JoshRosen May 11, 2015
d4e6d89
Update to bit shifting constants
JoshRosen May 11, 2015
4f0b770
Attempt to implement proper shuffle write metrics.
JoshRosen May 12, 2015
e58a6b4
Add more tests for PackedRecordPointer encoding.
JoshRosen May 12, 2015
e995d1a
Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS.
JoshRosen May 12, 2015
56781a1
Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter
JoshRosen May 12, 2015
0ad34da
Fix off-by-one in nextInt() call
JoshRosen May 12, 2015
85da63f
Cleanup in UnsafeShuffleSorterIterator.
JoshRosen May 12, 2015
fdcac08
Guard against overflow when expanding sort buffer.
JoshRosen May 12, 2015
2d4e4f4
Address some minor comments in UnsafeShuffleExternalSorter.
JoshRosen May 12, 2015
57312c9
Clarify fileBufferSize units
JoshRosen May 12, 2015
6276168
Remove ability to disable spilling in UnsafeShuffleExternalSorter.
JoshRosen May 12, 2015
4a2c785
rename 'sort buffer' to 'pointer array'
JoshRosen May 12, 2015
e3b8855
Cleanup in UnsafeShuffleWriter
JoshRosen May 12, 2015
c2ce78e
Fix a missed usage of MAX_PARTITION_ID
JoshRosen May 12, 2015
d5779c6
Merge remote-tracking branch 'origin/master' into unsafe-sort
JoshRosen May 12, 2015
5e189c6
Track time spend closing / flushing files; split TimeTrackingOutputSt…
JoshRosen May 12, 2015
df07699
Attempt to clarify confusing metrics update code
JoshRosen May 12, 2015
de40b9d
More comments to try to explain metrics code
JoshRosen May 12, 2015
4023fa4
Add @Private annotation to some Java classes.
JoshRosen May 12, 2015
51812a7
Change shuffle manager sort name to tungsten-sort
JoshRosen May 13, 2015
52a9981
Fix some bugs in the address packing code.
JoshRosen May 13, 2015
d494ffe
Fix deserialization of JavaSerializer instances.
JoshRosen May 13, 2015
7610f2f
Add tests for proper cleanup of shuffle data.
JoshRosen May 13, 2015
ef0a86e
Fix scalastyle errors
JoshRosen May 13, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add comments to UnsafeShuffleSpillWriter.
  • Loading branch information
JoshRosen committed May 6, 2015
commit aaea17b5c07a1b3d1ebe99020eee31eb1d9d87e1
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import java.io.File;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrong import order


/**
* Metadata for a block of data written by {@link UnsafeShuffleSpillWriter}.
*/
final class SpillInfo {
final long[] partitionLengths;
final File file;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,41 @@

package org.apache.spark.shuffle.unsafe;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;

import org.apache.spark.storage.*;
import scala.Tuple2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockObjectWriter;
import org.apache.spark.storage.TempLocalBlockId;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;

/**
* External sorter based on {@link UnsafeShuffleSorter}.
* An external sorter that is specialized for sort-based shuffle.
* <p>
* Incoming records are appended to data pages. When all records have been inserted (or when the
* current thread's shuffle memory limit is reached), the in-memory records are sorted according to
* their partition ids (using a {@link UnsafeShuffleSorter}). The sorted records are then written
* to a single output file (or multiple files, if we've spilled). The format of the output files is
* the same as the format of the final output file written by
* {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are
* written as a single serialized, compressed stream that can be read with a new decompression and
* deserialization stream.
* <p>
* Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its
* spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
* specialized merge procedure that avoids extra serialization/deserialization.
*/
public final class UnsafeShuffleSpillWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd name this UnsafeShuffleSorter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual underlying sorter is already named UnsafeShuffleSorter, so I'll rename this to UnsafeShuffleExternalSorter to clarify.


Expand All @@ -51,23 +62,31 @@ public final class UnsafeShuffleSpillWriter {

private final int initialSize;
private final int numPartitions;
private UnsafeShuffleSorter sorter;

private final TaskMemoryManager memoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
private final TaskContext taskContext;
private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<MemoryBlock>();
private final boolean spillingEnabled;
private final int fileBufferSize;
private ShuffleWriteMetrics writeMetrics;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSize;

private MemoryBlock currentPage = null;
private long currentPagePosition = -1;
/**
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
* this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
* itself).
*/
private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<MemoryBlock>();

private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();

// All three of these variables are reset after spilling:
private UnsafeShuffleSorter sorter;
private MemoryBlock currentPage = null;
private long currentPagePosition = -1;

public UnsafeShuffleSpillWriter(
TaskMemoryManager memoryManager,
ShuffleMemoryManager shuffleMemoryManager,
Expand All @@ -90,6 +109,10 @@ public UnsafeShuffleSpillWriter(

// TODO: metrics tracking + integration with shuffle write metrics

/**
* Allocates a new sorter. Called when opening the spill writer for the first time and after
* each spill.
*/
private void openSorter() throws IOException {
this.writeMetrics = new ShuffleWriteMetrics();
// TODO: connect write metrics to task metrics?
Expand All @@ -106,22 +129,41 @@ private void openSorter() throws IOException {
this.sorter = new UnsafeShuffleSorter(initialSize);
}

/**
* Sorts the in-memory records, writes the sorted records to a spill file, and frees the in-memory
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is inaccurate; the resources are freed in a separate call. I'll update the comment to reflect this.

* data structures associated with this sort. New data structures are not automatically allocated.
*/
private SpillInfo writeSpillFile() throws IOException {
final UnsafeShuffleSorter.UnsafeShuffleSorterIterator sortedRecords = sorter.getSortedIterator();
// This call performs the actual sort.
final UnsafeShuffleSorter.UnsafeShuffleSorterIterator sortedRecords =
sorter.getSortedIterator();

int currentPartition = -1;
// Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
// after SPARK-5581 is fixed.
BlockObjectWriter writer = null;

// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
// records in a byte array. This array only needs to be big enough to hold a single record.
final byte[] arr = new byte[SER_BUFFER_SIZE];

final Tuple2<TempLocalBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempLocalBlock();
// Because this output will be read during shuffle, its compression codec must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more details.
final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = spilledFileInfo._2();
final BlockId blockId = spilledFileInfo._1();
final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

// Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
// Our write path doesn't actually use this serializer (since we end up calling the `write()`
// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
// around this, we pass a dummy no-op serializer.
final SerializerInstance ser = new DummySerializerInstance();
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetrics);

int currentPartition = -1;
while (sortedRecords.hasNext()) {
sortedRecords.loadNext();
final int partition = sortedRecords.packedRecordPointer.getPartitionId();
Expand Down Expand Up @@ -153,7 +195,9 @@ private SpillInfo writeSpillFile() throws IOException {

if (writer != null) {
writer.commitAndClose();
// TODO: comment and explain why our handling of empty spills, etc.
// If `writeSpillFile()` was called from `closeAndGetSpills()` and no records were inserted,
// then the spill file might be empty. Note that it might be better to avoid calling
// writeSpillFile() in that case.
if (currentPartition != -1) {
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
spills.add(spillInfo);
Expand All @@ -162,24 +206,30 @@ private SpillInfo writeSpillFile() throws IOException {
return spillInfo;
}

@VisibleForTesting
public void spill() throws IOException {
final SpillInfo spillInfo = writeSpillFile();
/**
* Sort and spill the current records in response to memory pressure.
*/
private void spill() throws IOException {
final long threadId = Thread.currentThread().getId();
logger.info("Thread " + threadId + " spilling sort data of " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.spark.util.Utils.bytesToString(getMemoryUsage()) + " to disk (" +
(spills.size() + (spills.size() > 1 ? " times" : " time")) + " so far)");

final SpillInfo spillInfo = writeSpillFile();
final long sorterMemoryUsage = sorter.getMemoryUsage();
sorter = null;
shuffleMemoryManager.release(sorterMemoryUsage);
final long spillSize = freeMemory();
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
taskContext.taskMetrics().incDiskBytesSpilled(spillInfo.file.length());
final long threadId = Thread.currentThread().getId();
// TODO: messy; log _before_ spill
logger.info("Thread " + threadId + " spilling in-memory map of " +
org.apache.spark.util.Utils.bytesToString(spillSize) + " to disk (" +
(spills.size() + ((spills.size() > 1) ? " times" : " time")) + " so far)");

openSorter();
}

private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * PAGE_SIZE);
}

private long freeMemory() {
long memoryFreed = 0;
final Iterator<MemoryBlock> iter = allocatedPages.iterator();
Expand All @@ -194,7 +244,15 @@ private long freeMemory() {
return memoryFreed;
}

private void ensureSpaceInDataPage(int requiredSpace) throws Exception {
/**
* Checks whether there is enough space to insert a new record into the sorter. If there is
* insufficient space, either allocate more memory or spill the current sort data (if spilling
* is enabled), then insert the record.
*/
private void ensureSpaceInDataPage(int requiredSpace) throws IOException {
// TODO: we should re-order the `if` cases in this function so that the most common case (there
// is enough space) appears first.

// TODO: merge these steps to first calculate total memory requirements for this insert,
// then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
// data page.
Expand All @@ -219,7 +277,7 @@ private void ensureSpaceInDataPage(int requiredSpace) throws Exception {
}
if (requiredSpace > PAGE_SIZE) {
// TODO: throw a more specific exception?
throw new Exception("Required space " + requiredSpace + " is greater than page size (" +
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
PAGE_SIZE + ")");
} else if (requiredSpace > spaceInCurrentPage) {
if (spillingEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm somewhat confused; maybe more comemnts would help. do we always spill as soon as we reach the size of a page?

Expand All @@ -230,7 +288,7 @@ private void ensureSpaceInDataPage(int requiredSpace) throws Exception {
final long memoryAcquiredAfterSpill = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpill != PAGE_SIZE) {
shuffleMemoryManager.release(memoryAcquiredAfterSpill);
throw new Exception("Can't allocate memory!");
throw new IOException("Can't allocate memory!");
}
}
}
Expand All @@ -241,11 +299,14 @@ private void ensureSpaceInDataPage(int requiredSpace) throws Exception {
}
}

/**
* Write a record to the shuffle sorter.
*/
public void insertRecord(
Object recordBaseObject,
long recordBaseOffset,
int lengthInBytes,
int prefix) throws Exception {
int partitionId) throws IOException {
// Need 4 bytes to store the record length.
ensureSpaceInDataPage(lengthInBytes + 4);

Expand All @@ -262,12 +323,20 @@ public void insertRecord(
lengthInBytes);
currentPagePosition += lengthInBytes;

sorter.insertRecord(recordAddress, prefix);
sorter.insertRecord(recordAddress, partitionId);
}

/**
* Close the sorter, causing any buffered data to be sorted and written out to disk.
*
* @return metadata for the spill files written by this sorter. If no records were ever inserted
* into this sorter, then this will return an empty array.
* @throws IOException
*/
public SpillInfo[] closeAndGetSpills() throws IOException {
if (sorter != null) {
writeSpillFile();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should look at the potential exception paths, and handle them to make sure we can clean up the tmp files in the case of exceptions

freeMemory();
}
return spills.toArray(new SpillInfo[0]);
}
Expand Down