Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
779c0f9
initial commit of sort-merge shuffle reader
jerryshao Sep 5, 2014
4f46dc0
Readability improvements to SortShuffleReader
sryza Oct 22, 2014
0861cf9
Clarify mergeWidth logic
sryza Oct 23, 2014
8f49b78
Add blocks remaining at level counter back in
sryza Oct 23, 2014
fcafa16
Small fix
sryza Oct 24, 2014
21dae69
Move merge to a separate class and use a priority queue instead of le…
sryza Oct 25, 2014
8e3766a
Rebase to the latest code and fix some conflicts
jerryshao Oct 30, 2014
98c039b
SortShuffleReader code improvement
jerryshao Nov 4, 2014
7d999ef
Changes to rebase to the latest master branch
jerryshao Nov 5, 2014
319e6d1
Don't spill more blocks than we need to
sryza Nov 5, 2014
96ef5c1
Fix bug: add to inMemoryBlocks
sryza Nov 5, 2014
d481c98
Fix another bug
sryza Nov 5, 2014
bf6a49d
Bug fix and revert ShuffleMemoryManager
jerryshao Nov 5, 2014
79dc823
Fix some bugs in spilling to disk
jerryshao Nov 7, 2014
2e04b85
Modify to use BlockObjectWriter to write data
jerryshao Nov 10, 2014
c1f97b6
Fix incorrect block size introduced bugs
jerryshao Nov 11, 2014
b5e472d
Address the comments
jerryshao Nov 12, 2014
40c59df
Fix some bugs
jerryshao Nov 12, 2014
42bf77d
Improve the failure process and expand ManagedBuffer
jerryshao Nov 14, 2014
a9eaef8
Copy the memory from off-heap to on-heap and some code style modifica…
jerryshao Nov 17, 2014
6f48c5c
Fix rebase introduced issue
jerryshao Nov 18, 2014
c2ddcce
Revert some unwanted changes
jerryshao Nov 18, 2014
f170db3
Clean up comments, break up large methods, spill based on actual bloc…
sryza Nov 24, 2014
123aea1
Log improve
jerryshao Nov 25, 2014
e035105
Fix scala style issue
jerryshao Nov 25, 2014
8b73701
Fix rebase issues
jerryshao Feb 22, 2015
d6c94da
Fix dead lock
jerryshao Apr 13, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix incorrect block size introduced bugs
  • Loading branch information
jerryshao committed Apr 13, 2015
commit c1f97b608fd8ab329f72cd4a8cc39841b5e64b3a
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Comparator

import org.apache.spark.executor.ShuffleWriteMetrics

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable
import scala.util.{Failure, Success, Try}

import org.apache.spark._
Expand Down Expand Up @@ -73,11 +73,19 @@ private[spark] class SortShuffleReader[K, C](
private var _memoryBytesSpilled: Long = 0L
private var _diskBytesSpilled: Long = 0L

/** ArrayBuffer to store in-memory shuffle blocks */
private val inMemoryBlocks = new ArrayBuffer[MemoryShuffleBlock]()
/** Queue to store in-memory shuffle blocks */
private val inMemoryBlocks = new mutable.Queue[MemoryShuffleBlock]()

/** Manage the BlockManagerId and related shuffle blocks */
private var statuses: Array[(BlockManagerId, Long)] = _
/** number of bytes left to fetch */
private var unfetchedBytes: Long = 0L

/**
* Maintain the relation between shuffle block and its size. The reason we should maintain this
* is that the request shuffle block size is not equal to the result size because of
* compression of size. So here we should maintain this make sure the correctness of our
* algorithm.
*/
private val shuffleBlockMap = new mutable.HashMap[ShuffleBlockId, (BlockManagerId, Long)]()

/** keyComparator for mergeSort, id keyOrdering is not available,
* using hashcode of key to compare */
Expand All @@ -99,37 +107,45 @@ private[spark] class SortShuffleReader[K, C](
override def read(): Iterator[Product2[K, C]] = {
tieredMerger.start()

computeShuffleBlocks()

for ((blockId, blockOption) <- fetchRawBlocks()) {
val blockData = blockOption match {
case Success(block) => block
case Failure(e) =>
blockId match {
case ShuffleBlockId (shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException (address, shufId.toInt, mapId.toInt, startPartition,
case b @ ShuffleBlockId(shuffleId, mapId, _) =>
val address = shuffleBlockMap(b)._1
throw new FetchFailedException (address, shuffleId.toInt, mapId.toInt, startPartition,
Utils.exceptionString (e))
case _ =>
throw new SparkException (
s"Failed to get block $blockId, which is not a shuffle block", e)
}
}

inMemoryBlocks += MemoryShuffleBlock(blockId, blockData)
shuffleRawBlockFetcherItr.currentResult = null

// Try to fit block in memory. If this fails, merge in-memory blocks to disk.
val blockSize = blockData.size
val granted = shuffleMemoryManager.tryToAcquire(blockSize)
val block = MemoryShuffleBlock(blockId, blockData)
if (granted < blockSize) {
if (granted >= blockSize) {
inMemoryBlocks += MemoryShuffleBlock(blockId, blockData)
} else {
logInfo(s"Granted $granted memory is not enough to store shuffle block id $blockId, " +
s"block size $blockSize, spilling in-memory blocks to release the memory")

shuffleMemoryManager.release(granted)
spillInMemoryBlocks(block)
}

unfetchedBytes -= shuffleBlockMap(blockId.asInstanceOf[ShuffleBlockId])._2
}

// Make sure all the blocks have been fetched.
assert(unfetchedBytes == 0L)

tieredMerger.doneRegisteringOnDiskBlocks()

// Merge on-disk blocks with in-memory blocks to directly feed to the reducer.
Expand Down Expand Up @@ -158,10 +174,25 @@ private[spark] class SortShuffleReader[K, C](
// Write merged blocks to disk
val (tmpBlockId, file) = blockManager.diskBlockManager.createTempShuffleBlock()

_memoryBytesSpilled += inMemoryBlocks.map(_.blockData.size()).sum
// If the remaining unfetched data would fit inside our current allocation, we don't want to
// waste time spilling blocks beyond the space needed for it.
// We use the request size to calculate the remaining spilled size to make sure the
// correctness, since the request size is slightly different from result block size because
// of size compression.
var bytesToSpill = unfetchedBytes
val blocksToSpill = new mutable.ArrayBuffer[MemoryShuffleBlock]()
blocksToSpill += tippingBlock
bytesToSpill -= shuffleBlockMap(tippingBlock.blockId.asInstanceOf[ShuffleBlockId])._2
while (bytesToSpill > 0 && !inMemoryBlocks.isEmpty) {
val block = inMemoryBlocks.dequeue()
blocksToSpill += block
bytesToSpill -= shuffleBlockMap(block.blockId.asInstanceOf[ShuffleBlockId])._2
}

_memoryBytesSpilled += blocksToSpill.map(_.blockData.size()).sum

if (inMemoryBlocks.size > 1) {
val itrGroup = inMemoryBlocksToIterators(inMemoryBlocks)
val itrGroup = inMemoryBlocksToIterators(blocksToSpill)
val partialMergedItr =
MergeUtil.mergeSort(itrGroup, keyComparator, dep.keyOrdering, dep.aggregator)
val curWriteMetrics = new ShuffleWriteMetrics()
Expand Down Expand Up @@ -190,7 +221,7 @@ private[spark] class SortShuffleReader[K, C](

} else {
val fos = new FileOutputStream(file)
val buffer = inMemoryBlocks.map(_.blockData.nioByteBuffer()).head
val buffer = blocksToSpill.map(_.blockData.nioByteBuffer()).head
var channel = fos.getChannel
var success = false

Expand Down Expand Up @@ -236,26 +267,36 @@ private[spark] class SortShuffleReader[K, C](
}
}

private def fetchRawBlocks(): Iterator[(BlockId, Try[ManagedBuffer])] = {
statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(handle.shuffleId, startPartition)
private def computeShuffleBlocks(): Unit = {
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(handle.shuffleId, startPartition)

val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]()
val splitsByAddress = new mutable.HashMap[BlockManagerId, mutable.ArrayBuffer[(Int, Long)]]()
for (((address, size), index) <- statuses.zipWithIndex) {
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
splitsByAddress.getOrElseUpdate(address, mutable.ArrayBuffer()) += ((index, size))
}

val blocksByAddress = splitsByAddress.toSeq.map { case (address, splits) =>
val blocks = splits.map { s =>
(ShuffleBlockId(handle.shuffleId, s._1, startPartition), s._2)
splitsByAddress.foreach { case (id, blocks) =>
blocks.foreach { case (idx, len) =>
shuffleBlockMap.put(ShuffleBlockId(handle.shuffleId, idx, startPartition), (id, len))
unfetchedBytes += len
}
(address, blocks.toSeq)
}
}

private def fetchRawBlocks(): Iterator[(BlockId, Try[ManagedBuffer])] = {
val blocksByAddress = new mutable.HashMap[BlockManagerId,
mutable.ArrayBuffer[(ShuffleBlockId, Long)]]()

shuffleBlockMap.foreach { case (block, (id, len)) =>
blocksByAddress.getOrElseUpdate(id,
mutable.ArrayBuffer[(ShuffleBlockId, Long)]()) += ((block, len))
}

shuffleRawBlockFetcherItr = new ShuffleRawBlockFetcherIterator(
context,
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
blocksByAddress.toSeq,
conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)

val completionItr = CompletionIterator[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.util.collection

import java.io.{File, FileOutputStream, BufferedOutputStream}
import java.io.File
import java.util.Comparator
import java.util.concurrent.{PriorityBlockingQueue, CountDownLatch}

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable

import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
Expand Down Expand Up @@ -159,10 +159,11 @@ private[spark] class TieredDiskMerger[K, C](
* either receive the notification that no more blocks are coming in, or until maxMergeFactor
* merge is required no matter what.
*
* E.g. if maxMergeFactor is 10 and we have 19 or more on-disk blocks, a 10-block merge will put us
* at 10 or more blocks, so we might as well carry it out now.
* E.g. if maxMergeFactor is 10 and we have 19 or more on-disk blocks, a 10-block merge will put
* us at 10 or more blocks, so we might as well carry it out now.
*/
private def shouldMergeNow(): Boolean = doneRegistering || onDiskBlocks.size() >= maxMergeFactor * 2 - 1
private def shouldMergeNow(): Boolean = doneRegistering ||
onDiskBlocks.size() >= maxMergeFactor * 2 - 1

private final class DiskToDiskMerger extends Thread {
setName(s"tiered-merge-thread-${Thread.currentThread().getId}")
Expand All @@ -180,7 +181,7 @@ private[spark] class TieredDiskMerger[K, C](
}

if (onDiskBlocks.size() > maxMergeFactor) {
val blocksToMerge = new ArrayBuffer[DiskShuffleBlock]()
val blocksToMerge = new mutable.ArrayBuffer[DiskShuffleBlock]()
// Try to pick the smallest merge width that will result in the next merge being the final
// merge.
val mergeFactor = math.min(onDiskBlocks.size - maxMergeFactor + 1, maxMergeFactor)
Expand Down