Skip to content
Prev Previous commit
Next Next commit
small rebase fix
  • Loading branch information
JkSelf committed Oct 15, 2019
commit d21c99fa6f7fd09dc52da8ad9b7c30c3276efcb4
19 changes: 9 additions & 10 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,13 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
* tuples describing the shuffle blocks that are stored at that block manager.
* tuples describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int,
mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])]
mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]

/**
* Deletes map output status information for the specified shuffle stage.
Expand Down Expand Up @@ -749,9 +749,8 @@ private[spark] class MapOutputTrackerMaster(
startPartition: Int,
endPartition: Int,
mapId: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" +

s"partitions $startPartition-$endPartition")
shuffleStatuses.get(shuffleId) match {
case Some (shuffleStatus) =>
Expand Down Expand Up @@ -817,7 +816,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
shuffleId: Int,
startPartition: Int,
endPartition: Int,
mapId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
mapId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" +
s"partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
Expand Down Expand Up @@ -1016,9 +1015,9 @@ private[spark] object MapOutputTracker extends Logging {
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus],
mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
mapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
assert (statuses != null && statuses.length >= mapId)
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
val status = statuses(mapId)
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
Expand All @@ -1028,11 +1027,11 @@ private[spark] object MapOutputTracker extends Logging {
for (part <- startPartition until endPartition) {
val size = status.getSizeForBlock(part)
if (size != 0) {
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), size))
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), size, mapId))
}
}
}
}
splitsByAddress.toIterator
}
}