Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
779c0f9
initial commit of sort-merge shuffle reader
jerryshao Sep 5, 2014
4f46dc0
Readability improvements to SortShuffleReader
sryza Oct 22, 2014
0861cf9
Clarify mergeWidth logic
sryza Oct 23, 2014
8f49b78
Add blocks remaining at level counter back in
sryza Oct 23, 2014
fcafa16
Small fix
sryza Oct 24, 2014
21dae69
Move merge to a separate class and use a priority queue instead of le…
sryza Oct 25, 2014
8e3766a
Rebase to the latest code and fix some conflicts
jerryshao Oct 30, 2014
98c039b
SortShuffleReader code improvement
jerryshao Nov 4, 2014
7d999ef
Changes to rebase to the latest master branch
jerryshao Nov 5, 2014
319e6d1
Don't spill more blocks than we need to
sryza Nov 5, 2014
96ef5c1
Fix bug: add to inMemoryBlocks
sryza Nov 5, 2014
d481c98
Fix another bug
sryza Nov 5, 2014
bf6a49d
Bug fix and revert ShuffleMemoryManager
jerryshao Nov 5, 2014
79dc823
Fix some bugs in spilling to disk
jerryshao Nov 7, 2014
2e04b85
Modify to use BlockObjectWriter to write data
jerryshao Nov 10, 2014
c1f97b6
Fix incorrect block size introduced bugs
jerryshao Nov 11, 2014
b5e472d
Address the comments
jerryshao Nov 12, 2014
40c59df
Fix some bugs
jerryshao Nov 12, 2014
42bf77d
Improve the failure process and expand ManagedBuffer
jerryshao Nov 14, 2014
a9eaef8
Copy the memory from off-heap to on-heap and some code style modifica…
jerryshao Nov 17, 2014
6f48c5c
Fix rebase introduced issue
jerryshao Nov 18, 2014
c2ddcce
Revert some unwanted changes
jerryshao Nov 18, 2014
f170db3
Clean up comments, break up large methods, spill based on actual bloc…
sryza Nov 24, 2014
123aea1
Log improve
jerryshao Nov 25, 2014
e035105
Fix scala style issue
jerryshao Nov 25, 2014
8b73701
Fix rebase issues
jerryshao Feb 22, 2015
d6c94da
Fix dead lock
jerryshao Apr 13, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Changes to rebase to the latest master branch
  • Loading branch information
jerryshao committed Apr 13, 2015
commit 7d999efe4eae6f07c82e15ed4f92328bf766e01d
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,4 @@ private[spark] class MixedShuffleReader[K, C](
}

override def read(): Iterator[Product2[K, C]] = shuffleReader.read()

override def stop(): Unit = shuffleReader.stop()
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import java.io.{BufferedOutputStream, FileOutputStream}
import java.util.Comparator

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.{Failure, Success, Try}

import org.apache.spark.{Logging, InterruptibleIterator, SparkEnv, TaskContext}
import org.apache.spark._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleReader, BaseShuffleHandle}
import org.apache.spark.shuffle.{BaseShuffleHandle, FetchFailedException, ShuffleReader}
import org.apache.spark.storage._
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.{TieredDiskMerger, MergeUtil}
import org.apache.spark.util.{CompletionIterator, Utils}
import org.apache.spark.util.collection.{MergeUtil, TieredDiskMerger}

/**
* SortShuffleReader merges and aggregates shuffle data that has already been sorted within each
Expand Down Expand Up @@ -69,6 +70,9 @@ private[spark] class SortShuffleReader[K, C](
/** ArrayBuffer to store in-memory shuffle blocks */
private val inMemoryBlocks = new ArrayBuffer[MemoryShuffleBlock]()

/** Manage the BlockManagerId and related shuffle blocks */
private var statuses: Array[(BlockManagerId, Long)] = _

/** keyComparator for mergeSort, id keyOrdering is not available,
* using hashcode of key to compare */
private val keyComparator: Comparator[K] = dep.keyOrdering.getOrElse(new Comparator[K] {
Expand All @@ -85,17 +89,26 @@ private[spark] class SortShuffleReader[K, C](
override def read(): Iterator[Product2[K, C]] = {
tieredMerger.start()

for ((blockId, blockData) <- fetchRawBlocks()) {
if (blockData.isEmpty) {
throw new IllegalStateException(s"block $blockId is empty for unknown reason")
for ((blockId, blockOption) <- fetchRawBlocks()) {
val blockData = blockOption match {
case Success(block) => block
case Failure(e) =>
blockId match {
case ShuffleBlockId (shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException (address, shufId.toInt, mapId.toInt, startPartition,
Utils.exceptionString (e))
case _ =>
throw new SparkException (
s"Failed to get block $blockId, which is not a shuffle block", e)
}
}

inMemoryBlocks += MemoryShuffleBlock(blockId, blockData.get)
inMemoryBlocks += MemoryShuffleBlock(blockId, blockData)

// Try to fit block in memory. If this fails, merge in-memory blocks to disk.
val blockSize = blockData.get.size
val granted = shuffleMemoryManager.tryToAcquire(blockData.get.size)
logInfo(s"Granted $granted memory for shuffle block")
val blockSize = blockData.size
val granted = shuffleMemoryManager.tryToAcquire(blockSize)

if (granted < blockSize) {
logInfo(s"Granted $granted memory is not enough to store shuffle block ($blockSize), " +
Expand Down Expand Up @@ -124,7 +137,10 @@ private[spark] class SortShuffleReader[K, C](

tieredMerger.registerOnDiskBlock(tmpBlockId, file)

logInfo(s"Merge ${inMemoryBlocks.size} in-memory blocks into file ${file.getName}")

for (block <- inMemoryBlocks) {
block.blockData.release()
shuffleMemoryManager.release(block.blockData.size)
}
inMemoryBlocks.clear()
Expand All @@ -143,7 +159,10 @@ private[spark] class SortShuffleReader[K, C](
// Release the in-memory block when iteration is completed.
val completionItr = CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](
mergedItr, () => {
inMemoryBlocks.foreach(block => shuffleMemoryManager.release(block.blockData.size))
inMemoryBlocks.foreach { block =>
block.blockData.release()
shuffleMemoryManager.release(block.blockData.size)
}
inMemoryBlocks.clear()
})

Expand All @@ -157,10 +176,8 @@ private[spark] class SortShuffleReader[K, C](
}
}

override def stop(): Unit = ???

private def fetchRawBlocks(): Iterator[(BlockId, Option[ManagedBuffer])] = {
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(handle.shuffleId, startPartition)
private def fetchRawBlocks(): Iterator[(BlockId, Try[ManagedBuffer])] = {
statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(handle.shuffleId, startPartition)

val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]()
for (((address, size), index) <- statuses.zipWithIndex) {
Expand All @@ -176,16 +193,16 @@ private[spark] class SortShuffleReader[K, C](

shuffleRawBlockFetcherItr = new ShuffleRawBlockFetcherIterator(
context,
SparkEnv.get.blockTransferService,
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)

val completionItr = CompletionIterator[
(BlockId, Option[ManagedBuffer]),
Iterator[(BlockId, Option[ManagedBuffer])]](shuffleRawBlockFetcherItr,
(BlockId, Try[ManagedBuffer]),
Iterator[(BlockId, Try[ManagedBuffer])]](shuffleRawBlockFetcherItr,
() => context.taskMetrics.updateShuffleReadMetrics())

new InterruptibleIterator[(BlockId, Option[ManagedBuffer])](context, completionItr)
new InterruptibleIterator[(BlockId, Try[ManagedBuffer])](context, completionItr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.util.{CompletionIterator, Utils}
private[spark]
final class ShuffleBlockFetcherIterator(
context: TaskContext,
blockTransferService: BlockTransferService,
shuffleClient: ShuffleClient,
blockManager: BlockManager,
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer,
Expand All @@ -42,7 +42,7 @@ final class ShuffleBlockFetcherIterator(

val shuffleRawBlockFetcherItr = new ShuffleRawBlockFetcherIterator(
context,
blockTransferService,
shuffleClient,
blockManager,
blocksByAddress,
maxBytesInFlight)
Expand Down