-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort #5868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
81d52c5
abf7bfe
57a4ea0
e900152
767d3ca
3db12de
4d2f5e1
8e3ec20
253f13e
9c6cf58
e267cee
e2d96ca
d3cc310
87e721b
0748458
026b497
1433b42
240864c
bfc12d3
b8a09fe
c2fca17
f17fa8f
8958584
595923a
5e100b2
2776aca
f156a8f
3490512
3aeaff7
7ee918e
69232fd
57f1ec0
f480fb2
133c8c9
4f70141
aaea17b
b674412
11feeb6
8a6fe52
cfe0ec4
e67f1ea
5e8cf75
1ce1300
b95e642
9883e30
722849b
7cd013b
9b7ebed
e8718dd
1929a74
01afc74
8f5061a
67d25ba
fd4bb9e
9d1ee7c
fcd9a3c
27b18b0
4a01c45
f780fb1
b57c17f
1ef56c7
b3b1924
0d4d199
ec6d626
ae538dc
ea4f85f
1e3ad52
39434f9
e1855e5
7c953f9
8531286
69d5899
d4e6d89
4f0b770
e58a6b4
e995d1a
56781a1
0ad34da
85da63f
fdcac08
2d4e4f4
57312c9
6276168
4a2c785
e3b8855
c2ce78e
d5779c6
5e189c6
df07699
de40b9d
4023fa4
51812a7
52a9981
d494ffe
7610f2f
ef0a86e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,8 @@ final class PackedRecordPointer { | |
|
|
||
| static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes | ||
|
|
||
| static final int MAXIMUM_PARTITION_ID = 1 << 24; // 16777216 | ||
|
|
||
| /** Bit mask for the lower 40 bits of a long. */ | ||
| private static final long MASK_LONG_LOWER_40_BITS = 0xFFFFFFFFFFL; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to use |
||
|
|
||
|
|
@@ -62,6 +64,7 @@ final class PackedRecordPointer { | |
| * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class. | ||
| */ | ||
| public static long packPointer(long recordPointer, int partitionId) { | ||
| assert (partitionId <= MAXIMUM_PARTITION_ID); | ||
| // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page. | ||
| // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses. | ||
| final int pageNumber = (int) ((recordPointer & MASK_LONG_UPPER_13_BITS) >>> 51); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,8 +17,10 @@ | |
|
|
||
| package org.apache.spark.shuffle.unsafe; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Comparator; | ||
|
|
||
| import org.apache.spark.unsafe.memory.MemoryBlock; | ||
| import org.apache.spark.util.collection.Sorter; | ||
|
|
||
| final class UnsafeShuffleSorter { | ||
|
||
|
|
@@ -59,8 +61,17 @@ public long getMemoryUsage() { | |
| return sortBuffer.length * 8L; | ||
| } | ||
|
|
||
| // TODO: clairify assumption that pointer points to record length. | ||
| public void insertRecord(long recordPointer, int partitionId) { | ||
| /** | ||
| * Inserts a record to be sorted. | ||
| * | ||
| * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to | ||
| * certain pointer compression techniques used by the sorter, the sort can | ||
| * only operate on pointers that point to locations in the first | ||
| * {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page. | ||
| * @param partitionId the partition id, which must be less than or equal to | ||
| * {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}. | ||
| */ | ||
| public void insertRecord(long recordPointer, int partitionId) throws IOException { | ||
| if (!hasSpaceForAnotherRecord()) { | ||
| expandSortBuffer(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -100,6 +100,12 @@ public UnsafeShuffleWriter( | |
| int mapId, | ||
| TaskContext taskContext, | ||
| SparkConf sparkConf) { | ||
| final int numPartitions = handle.dependency().partitioner().numPartitions(); | ||
| if (numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here should be updated to MAXIMUM_PARTITION_ID + 1 |
||
| throw new IllegalArgumentException( | ||
| "UnsafeShuffleWriter can only be used for shuffles with at most " + | ||
| PackedRecordPointer.MAXIMUM_PARTITION_ID + " reduce partitions"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here should be updated to MAXIMUM_PARTITION_ID + 1 |
||
| } | ||
| this.blockManager = blockManager; | ||
| this.shuffleBlockResolver = shuffleBlockResolver; | ||
| this.memoryManager = memoryManager; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MAXIMUM_PARTITION_IDshould be "(1 << 24) - 1", since it's 24-bit.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good catch.