Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
779c0f9
initial commit of sort-merge shuffle reader
jerryshao Sep 5, 2014
4f46dc0
Readability improvements to SortShuffleReader
sryza Oct 22, 2014
0861cf9
Clarify mergeWidth logic
sryza Oct 23, 2014
8f49b78
Add blocks remaining at level counter back in
sryza Oct 23, 2014
fcafa16
Small fix
sryza Oct 24, 2014
21dae69
Move merge to a separate class and use a priority queue instead of le…
sryza Oct 25, 2014
8e3766a
Rebase to the latest code and fix some conflicts
jerryshao Oct 30, 2014
98c039b
SortShuffleReader code improvement
jerryshao Nov 4, 2014
7d999ef
Changes to rebase to the latest master branch
jerryshao Nov 5, 2014
319e6d1
Don't spill more blocks than we need to
sryza Nov 5, 2014
96ef5c1
Fix bug: add to inMemoryBlocks
sryza Nov 5, 2014
d481c98
Fix another bug
sryza Nov 5, 2014
bf6a49d
Bug fix and revert ShuffleMemoryManager
jerryshao Nov 5, 2014
79dc823
Fix some bugs in spilling to disk
jerryshao Nov 7, 2014
2e04b85
Modify to use BlockObjectWriter to write data
jerryshao Nov 10, 2014
c1f97b6
Fix incorrect block size introduced bugs
jerryshao Nov 11, 2014
b5e472d
Address the comments
jerryshao Nov 12, 2014
40c59df
Fix some bugs
jerryshao Nov 12, 2014
42bf77d
Improve the failure process and expand ManagedBuffer
jerryshao Nov 14, 2014
a9eaef8
Copy the memory from off-heap to on-heap and some code style modifica…
jerryshao Nov 17, 2014
6f48c5c
Fix rebase introduced issue
jerryshao Nov 18, 2014
c2ddcce
Revert some unwanted changes
jerryshao Nov 18, 2014
f170db3
Clean up comments, break up large methods, spill based on actual bloc…
sryza Nov 24, 2014
123aea1
Log improve
jerryshao Nov 25, 2014
e035105
Fix scala style issue
jerryshao Nov 25, 2014
8b73701
Fix rebase issues
jerryshao Feb 22, 2015
d6c94da
Fix dead lock
jerryshao Apr 13, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Don't spill more blocks than we need to
  • Loading branch information
sryza authored and jerryshao committed Apr 13, 2015
commit 319e6d10c9888cab7534ac7913fa2ad686b81619
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.shuffle.sort
import java.io.{BufferedOutputStream, FileOutputStream}
import java.util.Comparator

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, Queue}
import scala.util.{Failure, Success, Try}

import org.apache.spark._
Expand Down Expand Up @@ -59,6 +59,9 @@ private[spark] class SortShuffleReader[K, C](
/** Shuffle block fetcher iterator */
private var shuffleRawBlockFetcherItr: ShuffleRawBlockFetcherIterator = _

/** Number of bytes left to fetch */
private var unfetchedBytes: Long = _

private val dep = handle.dependency
private val conf = SparkEnv.get.conf
private val blockManager = SparkEnv.get.blockManager
Expand All @@ -68,7 +71,7 @@ private[spark] class SortShuffleReader[K, C](
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

/** ArrayBuffer to store in-memory shuffle blocks */
private val inMemoryBlocks = new ArrayBuffer[MemoryShuffleBlock]()
private val inMemoryBlocks = new Queue[MemoryShuffleBlock]()

/** Manage the BlockManagerId and related shuffle blocks */
private var statuses: Array[(BlockManagerId, Long)] = _
Expand Down Expand Up @@ -104,55 +107,26 @@ private[spark] class SortShuffleReader[K, C](
}
}

inMemoryBlocks += MemoryShuffleBlock(blockId, blockData)

// Try to fit block in memory. If this fails, merge in-memory blocks to disk.
val blockSize = blockData.size
val granted = shuffleMemoryManager.tryToAcquire(blockSize)

if (granted < blockSize) {
logInfo(s"Granted $granted memory is not enough to store shuffle block ($blockSize), " +
s"try to consolidate in-memory blocks to release the memory")
s"spilling in-memory blocks to release the memory")

shuffleMemoryManager.release(granted)

// Write merged blocks to disk
val (tmpBlockId, file) = blockManager.diskBlockManager.createTempShuffleBlock()
val fos = new FileOutputStream(file)
val bos = new BufferedOutputStream(fos, fileBufferSize)

if (inMemoryBlocks.size > 1) {
val itrGroup = inMemoryBlocksToIterators()
val partialMergedItr =
MergeUtil.mergeSort(itrGroup, keyComparator, dep.keyOrdering, dep.aggregator)
blockManager.dataSerializeStream(tmpBlockId, bos, partialMergedItr, ser)
} else {
val buffer = inMemoryBlocks.map(_.blockData.nioByteBuffer()).head
val channel = fos.getChannel
while (buffer.hasRemaining) {
channel.write(buffer)
}
channel.close()
}

tieredMerger.registerOnDiskBlock(tmpBlockId, file)

logInfo(s"Merge ${inMemoryBlocks.size} in-memory blocks into file ${file.getName}")

for (block <- inMemoryBlocks) {
block.blockData.release()
shuffleMemoryManager.release(block.blockData.size)
}
inMemoryBlocks.clear()
spillInMemoryBlocks(MemoryShuffleBlock(blockId, blockData))
}

unfetchedBytes -= blockData.size()
shuffleRawBlockFetcherItr.currentResult = null
}
assert(unfetchedBytes == 0)

tieredMerger.doneRegisteringOnDiskBlocks()

// Merge on-disk blocks with in-memory blocks to directly feed to the reducer.
val finalItrGroup = inMemoryBlocksToIterators() ++ Seq(tieredMerger.readMerged())
val finalItrGroup = inMemoryBlocksToIterators(inMemoryBlocks) ++ Seq(tieredMerger.readMerged())
val mergedItr =
MergeUtil.mergeSort(finalItrGroup, keyComparator, dep.keyOrdering, dep.aggregator)

Expand All @@ -169,8 +143,53 @@ private[spark] class SortShuffleReader[K, C](
new InterruptibleIterator(context, completionItr.map(p => (p._1, p._2)))
}

private def inMemoryBlocksToIterators(): Seq[Iterator[Product2[K, C]]] = {
inMemoryBlocks.map{ case MemoryShuffleBlock(id, buf) =>
def spillInMemoryBlocks(tippingBlock: MemoryShuffleBlock): Unit = {
// Write merged blocks to disk
val (tmpBlockId, file) = blockManager.diskBlockManager.createTempShuffleBlock()
val fos = new FileOutputStream(file)
val bos = new BufferedOutputStream(fos, fileBufferSize)

// If the remaining unfetched data would fit inside our current allocation, we don't want to
// waste time spilling blocks beyond the space needed for it.
var bytesToSpill = unfetchedBytes
val blocksToSpill = new ArrayBuffer[MemoryShuffleBlock]()
blocksToSpill += tippingBlock
bytesToSpill -= tippingBlock.blockData.size
while (bytesToSpill > 0 && inMemoryBlocks.isEmpty) {
val block = inMemoryBlocks.dequeue()
blocksToSpill += block
bytesToSpill -= block.blockData.size
}

if (blocksToSpill.size > 1) {
val itrGroup = inMemoryBlocksToIterators(blocksToSpill)
val partialMergedItr =
MergeUtil.mergeSort(itrGroup, keyComparator, dep.keyOrdering, dep.aggregator)
blockManager.dataSerializeStream(tmpBlockId, bos, partialMergedItr, ser)
} else {
val buffer = blocksToSpill.map(_.blockData.nioByteBuffer()).head
val channel = fos.getChannel
while (buffer.hasRemaining) {
channel.write(buffer)
}
channel.close()
}

tieredMerger.registerOnDiskBlock(tmpBlockId, file)

logInfo(s"Merged ${blocksToSpill.size} in-memory blocks into file ${file.getName}")

for (block <- blocksToSpill) {
block.blockData.release()
if (block != tippingBlock) {
shuffleMemoryManager.release(block.blockData.size)
}
}
}

private def inMemoryBlocksToIterators(blocks: Seq[MemoryShuffleBlock])
: Seq[Iterator[Product2[K, C]]] = {
blocks.map{ case MemoryShuffleBlock(id, buf) =>
blockManager.dataDeserialize(id, buf.nioByteBuffer(), ser)
.asInstanceOf[Iterator[Product2[K, C]]]
}
Expand All @@ -190,6 +209,7 @@ private[spark] class SortShuffleReader[K, C](
}
(address, blocks.toSeq)
}
unfetchedBytes = blocksByAddress.flatMap(a => a._2.map(b => b._2)).sum

shuffleRawBlockFetcherItr = new ShuffleRawBlockFetcherIterator(
context,
Expand Down