Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.shuffle.sort.io;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -56,7 +57,10 @@ public void initializeExecutor(String appId, String execId, Map<String, String>
if (blockManager == null) {
throw new IllegalStateException("No blockManager available from the SparkEnv.");
}
blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
blockResolver =
new IndexShuffleBlockResolver(
sparkConf, blockManager, Collections.emptyMap() /* Shouldn't be accessed */
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.file.Files
import java.util.{Collections, Map => JMap}

import scala.collection.mutable.ArrayBuffer

Expand All @@ -37,6 +38,7 @@ import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.OpenHashSet

/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
Expand All @@ -52,7 +54,8 @@ import org.apache.spark.util.Utils
private[spark] class IndexShuffleBlockResolver(
conf: SparkConf,
// var for testing
var _blockManager: BlockManager = null)
var _blockManager: BlockManager = null,
val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] = Collections.emptyMap())
extends ShuffleBlockResolver
with Logging with MigratableResolver {

Expand Down Expand Up @@ -270,6 +273,21 @@ private[spark] class IndexShuffleBlockResolver(
throw SparkCoreErrors.failedRenameTempFileError(fileTmp, file)
}
}
blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
shuffleId, _ => new OpenHashSet[Long](8)
)
mapTaskIds.add(mapId)

case ShuffleDataBlockId(shuffleId, mapId, _) =>
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
shuffleId, _ => new OpenHashSet[Long](8)
)
mapTaskIds.add(mapId)

case _ => // Unreachable
}
blockManager.reportBlockStatus(blockId, BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager

private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)

override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
override val shuffleBlockResolver =
new IndexShuffleBlockResolver(conf, taskIdMapsForShuffle = taskIdMapsForShuffle)

/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ public void writeWithoutSpilling() throws Exception {

@Test
public void writeChecksumFileWithoutSpill() throws Exception {
IndexShuffleBlockResolver blockResolver = new IndexShuffleBlockResolver(conf, blockManager);
IndexShuffleBlockResolver blockResolver =
new IndexShuffleBlockResolver(conf, blockManager, Collections.emptyMap());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
Expand Down Expand Up @@ -344,7 +345,8 @@ public void writeChecksumFileWithoutSpill() throws Exception {

@Test
public void writeChecksumFileWithSpill() throws Exception {
IndexShuffleBlockResolver blockResolver = new IndexShuffleBlockResolver(conf, blockManager);
IndexShuffleBlockResolver blockResolver =
new IndexShuffleBlockResolver(conf, blockManager, Collections.emptyMap());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.storage

import java.io.File
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._

import org.apache.commons.io.FileUtils
import org.scalatest.concurrent.Eventually

import org.apache.spark._
Expand Down Expand Up @@ -352,4 +354,78 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
import scala.language.reflectiveCalls
assert(listener.removeReasonValidated)
}

test("SPARK-46957: Migrated shuffle files should be able to cleanup from executor") {

val sparkTempDir = System.getProperty("java.io.tmpdir")

def shuffleFiles: Seq[File] = {
FileUtils
.listFiles(new File(sparkTempDir), Array("data", "index"), true)
.asScala
.toSeq
}

val existingShuffleFiles = shuffleFiles

val conf = new SparkConf()
.setAppName("SPARK-46957")
.setMaster("local-cluster[2,1,1024]")
.set(config.DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val shuffleBlockUpdates = new ArrayBuffer[BlockId]()
var isDecommissionedExecutorRemoved = false
val execToDecommission = sc.getExecutorIds().head
sc.addSparkListener(new SparkListener {
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
if (blockUpdated.blockUpdatedInfo.blockId.isShuffle) {
shuffleBlockUpdates += blockUpdated.blockUpdatedInfo.blockId
}
}

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
assert(execToDecommission === executorRemoved.executorId)
isDecommissionedExecutorRemoved = true
}
})

// Run a job to create shuffle data
val result = sc.parallelize(1 to 1000, 10)
.map { i => (i % 2, i) }
.reduceByKey(_ + _).collect()

assert(result.head === (0, 250500))
assert(result.tail.head === (1, 250000))
sc.schedulerBackend
.asInstanceOf[StandaloneSchedulerBackend]
.decommissionExecutor(
execToDecommission,
ExecutorDecommissionInfo("test", None),
adjustTargetNumExecutors = true
)

eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(isDecommissionedExecutorRemoved)
// Ensure there are shuffle data have been migrated
assert(shuffleBlockUpdates.size >= 2)
}

val shuffleId = shuffleBlockUpdates
.find(_.isInstanceOf[ShuffleIndexBlockId])
.map(_.asInstanceOf[ShuffleIndexBlockId].shuffleId)
.get

val newShuffleFiles = shuffleFiles.diff(existingShuffleFiles)
assert(newShuffleFiles.size >= shuffleBlockUpdates.size)

// Remove the shuffle data
sc.shuffleDriverComponents.removeShuffle(shuffleId, true)

eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(newShuffleFiles.intersect(shuffleFiles).isEmpty)
}
}
}