Skip to content
Prev Previous commit
resolve the conflicts
  • Loading branch information
JkSelf committed Oct 15, 2019
commit 9c1dc5538afce26c4e693e353d8d4ef4231bb78c
25 changes: 4 additions & 21 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,7 @@ private[spark] class MapOutputTrackerMaster(
startPartition,
endPartition,
statuses,
<<<<<<< HEAD
mapId)
=======
useOldFetchProtocol,
Some(mapId))
>>>>>>> resolve the comments
}
case None =>
Iterator.empty
Expand Down Expand Up @@ -823,12 +818,8 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
s"partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
try {
<<<<<<< HEAD
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses, mapId)
=======
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses,
useOldFetchProtocol, Some(mapId))
>>>>>>> resolve the comments
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition,
statuses, Some(mapId))
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
Expand Down Expand Up @@ -980,7 +971,6 @@ private[spark] object MapOutputTracker extends Logging {
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus],
useOldFetchProtocol: Boolean,
mapId : Option[Int] = None): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
assert (statuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
Expand All @@ -994,15 +984,8 @@ private[spark] object MapOutputTracker extends Logging {
for (part <- startPartition until endPartition) {
val size = status.getSizeForBlock(part)
if (size != 0) {
if (useOldFetchProtocol) {
// While we use the old shuffle fetch protocol, we use mapIndex as mapId in the
// ShuffleBlockId.
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapIndex, part), size, mapIndex))
} else {
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex))
}
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
((ShuffleBlockId(shuffleId, status.mapId, part), size, mapIndex))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[spark] class BlockStoreShuffleReader[K, C](

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val blocksByAddress = (mapId) match {
val blocksByAddress = mapId match {
case (Some(mapId)) => mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId,
startPartition,
Expand Down