Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
array -> seq
  • Loading branch information
yifeih committed May 14, 2019
commit 09eeb13aa29a658ee1d39e1a032b60cd152c02f3
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.spark.api.java.Optional;

import java.io.Serializable;
import java.util.List;

/**
* Represents metadata about where shuffle blocks were written in a single map task.
Expand All @@ -36,7 +37,7 @@ public interface MapShuffleLocations extends Serializable {
/**
* Get the location for a given shuffle block written by this map task.
*/
ShuffleLocation[] getLocationsForBlock(int reduceId);
List<ShuffleLocation> getLocationsForBlock(int reduceId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned this on the doc, but I'm skeptical about supporting different locations for each (map, reduce) block, instead of just replicating the entire output of one map task to the same places. I don't think I properly understood that part even before this change ... I'll need to look through this more carefully to figure out what the effect of that would be, in particular how much bookkeeping is required on the driver.


/**
* Deletes a host or a host/port combination from this MapShuffleLocations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import com.google.common.collect.ImmutableList;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.api.shuffle.ShuffleLocation;
import org.apache.spark.storage.BlockManagerId;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class DefaultMapShuffleLocations extends ShuffleLocation implements MapShuffleLocations {
Expand All @@ -46,19 +49,19 @@ public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) {
});

private final BlockManagerId location;
private final ShuffleLocation[] locationsArray;
private final List<ShuffleLocation> locationsArray;

public DefaultMapShuffleLocations(BlockManagerId blockManagerId) {
this.location = blockManagerId;
this.locationsArray = new ShuffleLocation[] {this};
this.locationsArray = ImmutableList.of(this);
}

public static DefaultMapShuffleLocations get(BlockManagerId blockManagerId) {
return DEFAULT_SHUFFLE_LOCATIONS_CACHE.getUnchecked(blockManagerId);
}

@Override
public ShuffleLocation[] getLocationsForBlock(int reduceId) {
public List<ShuffleLocation> getLocationsForBlock(int reduceId) {
return locationsArray;
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private class ShuffleStatus(numPartitions: Int) {
* This is a no-op if there is no registered map output or if the registered output is from a
* different block manager.
*/
def removeMapOutput(mapId: Int, shuffleLocations: Array[ShuffleLocation]): Unit = synchronized {
def removeMapOutput(mapId: Int, shuffleLocations: Seq[ShuffleLocation]): Unit = synchronized {
if (mapStatuses(mapId) != null) {
var shouldDelete = false
if (shuffleLocations == null) {
Expand Down Expand Up @@ -444,7 +444,7 @@ private[spark] class MapOutputTrackerMaster(
}

/** Unregister map output information of the given shuffle, mapper and block manager */
def unregisterMapOutput(shuffleId: Int, mapId: Int, shuffleLocations: Array[ShuffleLocation]) {
def unregisterMapOutput(shuffleId: Int, mapId: Int, shuffleLocations: Seq[ShuffleLocation]) {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.removeMapOutput(mapId, shuffleLocations)
Expand Down Expand Up @@ -911,7 +911,7 @@ private[spark] object MapOutputTracker extends Logging {
((ShuffleBlockId(shuffleId, mapId, part), size))
} else {
val shuffleLocations = status.mapShuffleLocations.getLocationsForBlock(part)
splitsByAddress.getOrElseUpdate(shuffleLocations.toSeq, ListBuffer()) +=
splitsByAddress.getOrElseUpdate(shuffleLocations.asScala, ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), size))
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ case object Resubmitted extends TaskFailedReason {
*/
@DeveloperApi
case class FetchFailed(
shuffleLocation: Array[ShuffleLocation], // Note that shuffleLocation can be null
shuffleLocation: Seq[ShuffleLocation], // Note that shuffleLocation can be null
shuffleId: Int,
mapId: Int,
reduceId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,8 +854,8 @@ private[spark] class TaskSetManager(
// implementation (DefaultMapShuffleLocations) of fetching from executor disk should result
// in blacklistable executors.
if (fetchFailed.shuffleLocation != null && fetchFailed.shuffleLocation.nonEmpty
&& fetchFailed.shuffleLocation(0).isInstanceOf[DefaultMapShuffleLocations]) {
val bmAddress = fetchFailed.shuffleLocation(0)
&& fetchFailed.shuffleLocation.head.isInstanceOf[DefaultMapShuffleLocations]) {
val bmAddress = fetchFailed.shuffleLocation.head
.asInstanceOf[DefaultMapShuffleLocations]
.getBlockManagerId
blacklistTracker.foreach(_.updateBlacklistForFetchFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.util.Utils
* (or risk triggering any other exceptions). See SPARK-19276.
*/
private[spark] class FetchFailedException(
shuffleLocations: Array[ShuffleLocation],
shuffleLocations: Seq[ShuffleLocation],
shuffleId: Int,
mapId: Int,
reduceId: Int,
Expand All @@ -42,7 +42,7 @@ private[spark] class FetchFailedException(
extends Exception(message, cause) {

def this(
shuffleLocations: Array[ShuffleLocation],
shuffleLocations: Seq[ShuffleLocation],
shuffleId: Int,
mapId: Int,
reduceId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}

Expand Down Expand Up @@ -581,7 +582,7 @@ final class ShuffleBlockFetcherIterator(
blockId match {
case ShuffleBlockId(shufId, mapId, reduceId) =>
throw new FetchFailedException(
DefaultMapShuffleLocations.get(address).getLocationsForBlock(reduceId),
DefaultMapShuffleLocations.get(address).getLocationsForBlock(reduceId).asScala,
shufId.toInt, mapId.toInt, reduceId, e)
case _ =>
throw new SparkException(
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.{Properties, UUID}

import scala.collection.JavaConverters._
import scala.collection.Map

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
Expand Down Expand Up @@ -439,9 +440,9 @@ private[spark] object JsonProtocol {
("Reason" -> reason) ~ json
}

def shuffleLocationsToJson(shuffleLocations: Array[ShuffleLocation]): JValue = {
def shuffleLocationsToJson(shuffleLocations: Seq[ShuffleLocation]): JValue = {
if (shuffleLocations != null && shuffleLocations.nonEmpty) {
if (shuffleLocations(0).isInstanceOf[DefaultMapShuffleLocations]) {
if (shuffleLocations.head.isInstanceOf[DefaultMapShuffleLocations]) {
val array = JArray(shuffleLocations.map(location => {
val blockManagerId = location.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId
blockManagerIdToJson(blockManagerId)
Expand Down Expand Up @@ -1015,13 +1016,13 @@ private[spark] object JsonProtocol {
}
}

def shuffleLocationsFromJson(json: JValue): Array[ShuffleLocation] = {
def shuffleLocationsFromJson(json: JValue): Seq[ShuffleLocation] = {
val shuffleType = (json \ "type").extract[String]
if (shuffleType == "Default") {
(json \ "data").children.map(value => {
val block = blockManagerIdFromJson(value)
DefaultMapShuffleLocations.get(block)
}).toArray
})
} else {
null
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
// Make the first task in the first stage attempt fail.
throw new FetchFailedException(
shuffleLocations =
Array(DefaultMapShuffleLocations.get(SparkEnv.get.blockManager.blockManagerId)),
shuffleId = 0, mapId = 0, reduceId = 0, cause = new java.io.IOException("fake"))
Seq(DefaultMapShuffleLocations.get(SparkEnv.get.blockManager.blockManagerId)),
0, 0, 0, new java.io.IOException("fake"))
} else {
// Make the second task in the first stage attempt sleep to generate a zombie task
Thread.sleep(60000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ class FetchFailureThrowingRDD(sc: SparkContext) extends RDD[Int](sc, Nil) {
override def hasNext: Boolean = true
override def next(): Int = {
throw new FetchFailedException(
Array(DefaultMapShuffleLocations.get(BlockManagerId("1", "hostA", 1234))),
Seq(DefaultMapShuffleLocations.get(BlockManagerId("1", "hostA", 1234))),
shuffleId = 0,
mapId = 0,
reduceId = 0,
Expand Down
Loading