Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
53f77ac
start thinking about cleanup timers
holdenk Sep 30, 2024
86a9f7f
Start a bit on putting in block tracking.
holdenk Oct 25, 2024
74c2608
Test compiles
holdenk Dec 2, 2024
94c290a
Add the Block TTL Integration suite
holdenk Dec 2, 2024
7bc2f7a
Increase logging a little bit.
holdenk Dec 3, 2024
cd30325
Make the TTL cleaners clean and add some simple tests
holdenk Dec 5, 2024
0f23de4
Switch to using MDC logging
holdenk Dec 5, 2024
ce72a3c
hmmm fails in CI maybe our TTL is too tight
holdenk Dec 14, 2024
0fd8744
Bump TTL a bit
holdenk Dec 15, 2024
5b6ae44
Use eventually instead of sleep
holdenk Dec 16, 2024
92ae9af
Back out un-needed change to ShuffledRDD.scala
sfc-gh-hkarau Nov 3, 2025
c7dbcdd
Add a note about why JHashMap
sfc-gh-hkarau Nov 3, 2025
feec34b
Add a version 4.1.0 to the new config options.
sfc-gh-hkarau Nov 3, 2025
eb4828c
CR feedback
sfc-gh-hkarau Nov 13, 2025
c5aa83c
Merge branch 'master' into SPARK-49788-Restore-Cleaner-TTL-Functionality
sfc-gh-hkarau Nov 13, 2025
0354749
Merge branch 'master' into SPARK-49788-Restore-Cleaner-TTL-Functionality
sfc-gh-hkarau Nov 14, 2025
f474dc0
Deal with the interruption excdeption from shutdownnow
sfc-gh-hkarau Nov 15, 2025
3a37726
Merge branch 'master' into SPARK-49788-Restore-Cleaner-TTL-Functionality
sfc-gh-hkarau Nov 27, 2025
53a65c6
Merge branch 'master' into SPARK-49788-Restore-Cleaner-TTL-Functionality
sfc-gh-hkarau Dec 18, 2025
4ac8a19
Merge branch 'master' into SPARK-49788-Restore-Cleaner-TTL-Functionality
sfc-gh-hkarau Dec 19, 2025
afc9a6a
Merge branch 'master' into SPARK-49788-Restore-Cleaner-TTL-Functionality
sfc-gh-hkarau Dec 22, 2025
f3a7531
Reduce the TTL in the tests so they don't take so long, move the time…
sfc-gh-hkarau Dec 22, 2025
34090b1
Increase TTL so the blocks don't get cleaned before the test has a ch…
sfc-gh-hkarau Dec 23, 2025
1ca2378
Merge branch 'master' into SPARK-49788-Restore-Cleaner-TTL-Functionality
sfc-gh-hkarau Dec 29, 2025
e04d2e1
Bump up the blockTTL for testing since CI seems to cleanup the blocks…
sfc-gh-hkarau Dec 29, 2025
f9f66e1
hmmm
sfc-gh-hkarau Dec 29, 2025
c7fc32d
Merge branch 'master' into SPARK-49788-Restore-Cleaner-TTL-Functionality
sfc-gh-hkarau Jan 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.{ByteArrayInputStream, InputStream, IOException, ObjectInputStream, ObjectOutputStream}
import java.nio.ByteBuffer
import java.util.{HashMap => JHashMap}
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock

Expand Down Expand Up @@ -711,6 +712,10 @@ private[spark] class MapOutputTrackerMaster(
private[spark] val isLocal: Boolean)
extends MapOutputTracker(conf) {

// Keep track of last access times for shuffle based TTL. Note: we don't use concurrent
// here because we don't care about overwriting times that are "close."
private[spark] val shuffleAccessTime = new JHashMap[Int, Long]

// The size at which we use Broadcast to send the map output statuses to the executors
private val minSizeForBroadcast = conf.get(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST).toInt

Expand Down Expand Up @@ -745,6 +750,16 @@ private[spark] class MapOutputTrackerMaster(

private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver = true)

private[spark] val cleanerThreadpool: Option[ThreadPoolExecutor] = {
if (conf.get(SPARK_TTL_SHUFFLE_BLOCK_CLEANER).isDefined) {
val pool = ThreadUtils.newDaemonFixedThreadPool(1, "map-output-ttl-cleaner")
pool.execute(new TTLCleaner)
Some(pool)
} else {
None
}
}

// Thread pool used for handling map output status requests. This is a separate thread pool
// to ensure we don't block the normal dispatcher threads.
private val threadpool: ThreadPoolExecutor = {
Expand All @@ -758,6 +773,68 @@ private[spark] class MapOutputTrackerMaster(

private val availableProcessors = Runtime.getRuntime.availableProcessors()

def updateShuffleAtime(shuffleId: Int): Unit = {
if (conf.get(SPARK_TTL_SHUFFLE_BLOCK_CLEANER).isDefined) {
shuffleAccessTime.put(shuffleId, System.currentTimeMillis())
}
}

private class TTLCleaner extends Runnable {
override def run(): Unit = {
try {
// Poll the shuffle access times if we're configured for it.
conf.get(SPARK_TTL_SHUFFLE_BLOCK_CLEANER) match {
case Some(ttl) =>
while (true) {
val maxAge = System.currentTimeMillis() - ttl
// Find the elements to be removed & update oldest remaining time (if any)
var oldest = System.currentTimeMillis()
// Make a copy here to reduce the chance of CME
try {
val toBeRemoved = shuffleAccessTime.asScala.toList.flatMap {
case (shuffleId, atime) =>
if (atime < maxAge) {
Some(shuffleId)
} else {
if (atime < oldest) {
oldest = atime
}
None
}
}.toList
toBeRemoved.map { shuffleId =>
try {
// Remove the shuffle access time regardless of
// if we cleanup the shuffle successfully or not
// since we could have a shuffle that's already
// been cleaned up elsewhere.
shuffleAccessTime.remove(shuffleId)
unregisterAllMapAndMergeOutput(shuffleId)
} catch {
case NonFatal(e) =>
logDebug(
log"Error removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", e)
}
}
// Wait until the next possible element to be removed
val delay = math.max((oldest + ttl) - System.currentTimeMillis(), 100)
Thread.sleep(delay)
} catch {
case _: java.util.ConcurrentModificationException =>
// Just retry, blocks were stored while we were iterating
Thread.sleep(100)
}
}
case None =>
logDebug("Tried to start TTL cleaner when not configured.")
}
} catch {
case _: InterruptedException =>
logInfo("MapOutputTrackerMaster TTLCleaner thread interrupted, exiting.")
}
}
}

// Make sure that we aren't going to exceed the max RPC message size by making sure
// we use broadcast to send large map output statuses.
if (minSizeForBroadcast > maxRpcMessageSize) {
Expand All @@ -783,6 +860,7 @@ private[spark] class MapOutputTrackerMaster(
val shuffleStatus = shuffleStatuses.get(shuffleId).head
logDebug(s"Handling request to send ${if (needMergeOutput) "map/merge" else "map"}" +
s" output locations for shuffle $shuffleId to $hostPort")
updateShuffleAtime(shuffleId)
if (needMergeOutput) {
context.reply(
shuffleStatus.
Expand Down Expand Up @@ -834,6 +912,7 @@ private[spark] class MapOutputTrackerMaster(
}

def registerShuffle(shuffleId: Int, numMaps: Int, numReduces: Int): Unit = {
updateShuffleAtime(shuffleId)
if (pushBasedShuffleEnabled) {
if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces)).isDefined) {
throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
Expand Down Expand Up @@ -880,6 +959,7 @@ private[spark] class MapOutputTrackerMaster(
shuffleStatus.removeOutputsByFilter(x => true)
shuffleStatus.removeMergeResultsByFilter(x => true)
shuffleStatus.removeShuffleMergerLocations()
shuffleAccessTime.remove(shuffleId)
incrementEpoch()
}

Expand Down Expand Up @@ -1257,12 +1337,14 @@ private[spark] class MapOutputTrackerMaster(

// This method is only called in local-mode.
override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
updateShuffleAtime(shuffleId)
shuffleStatuses.get(shuffleId).map(_.getShufflePushMergerLocations).getOrElse(Seq.empty)
}

override def stop(): Unit = {
mapOutputTrackerMasterMessages.offer(PoisonPill)
threadpool.shutdown()
cleanerThreadpool.map(_.shutdownNow())
try {
sendTracker(StopMapOutputTracker)
} catch {
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2919,4 +2919,24 @@ package object config {
.checkValue(v => v.forall(Set("stdout", "stderr").contains),
"The value only can be one or more of 'stdout, stderr'.")
.createWithDefault(Seq("stdout", "stderr"))

private[spark] val SPARK_TTL_BLOCK_CLEANER =
ConfigBuilder("spark.cleaner.ttl.all")
.doc("Add a TTL for all blocks tracked in Spark. By default blocks are only removed after " +
" GC on driver which with DataFrames or RDDs at the global scope will not occur. " +
"This must be configured before starting the SparkContext (e.g. can not be added to " +
"a running Spark instance.)")
.version("4.1.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val SPARK_TTL_SHUFFLE_BLOCK_CLEANER =
ConfigBuilder("spark.cleaner.ttl.shuffle")
.doc("Add a TTL for shuffle blocks tracked in Spark. By default blocks are only removed " +
"after GC on driver which with DataFrames or RDDs at the global scope will not occur." +
"This must be configured before starting the SparkContext (e.g. can not be added to " +
"a running Spark instance.)")
.version("4.1.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
}
29 changes: 19 additions & 10 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,18 @@ sealed abstract class BlockId {
(isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId] ||
isInstanceOf[ShuffleDataBlockId] || isInstanceOf[ShuffleIndexBlockId])
}
def asShuffleId: Option[ShuffleId] = if (isShuffle) Some(asInstanceOf[ShuffleId]) else None
def isShuffleChunk: Boolean = isInstanceOf[ShuffleBlockChunkId]
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]

override def toString: String = name
}

@DeveloperApi
trait ShuffleId {
def shuffleId: Int
}

@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = "rdd_" + rddId + "_" + splitIndex
Expand All @@ -59,7 +65,8 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
// Format of the shuffle block ids (including data and index) should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
case class ShuffleBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId
with ShuffleId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

Expand All @@ -69,7 +76,7 @@ case class ShuffleBlockBatchId(
shuffleId: Int,
mapId: Long,
startReduceId: Int,
endReduceId: Int) extends BlockId {
endReduceId: Int) extends BlockId with ShuffleId {
override def name: String = {
"shuffle_" + shuffleId + "_" + mapId + "_" + startReduceId + "_" + endReduceId
}
Expand All @@ -81,18 +88,20 @@ case class ShuffleBlockChunkId(
shuffleId: Int,
shuffleMergeId: Int,
reduceId: Int,
chunkId: Int) extends BlockId {
chunkId: Int) extends BlockId with ShuffleId {
override def name: String =
"shuffleChunk_" + shuffleId + "_" + shuffleMergeId + "_" + reduceId + "_" + chunkId
}

@DeveloperApi
case class ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
case class ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId
with ShuffleId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
}

@DeveloperApi
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId
with ShuffleId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
}

Expand All @@ -108,7 +117,7 @@ case class ShufflePushBlockId(
shuffleId: Int,
shuffleMergeId: Int,
mapIndex: Int,
reduceId: Int) extends BlockId {
reduceId: Int) extends BlockId with ShuffleId {
override def name: String = "shufflePush_" + shuffleId + "_" +
shuffleMergeId + "_" + mapIndex + "_" + reduceId + ""
}
Expand All @@ -118,7 +127,7 @@ case class ShufflePushBlockId(
case class ShuffleMergedBlockId(
shuffleId: Int,
shuffleMergeId: Int,
reduceId: Int) extends BlockId {
reduceId: Int) extends BlockId with ShuffleId {
override def name: String = "shuffleMerged_" + shuffleId + "_" +
shuffleMergeId + "_" + reduceId
}
Expand All @@ -129,7 +138,7 @@ case class ShuffleMergedDataBlockId(
appId: String,
shuffleId: Int,
shuffleMergeId: Int,
reduceId: Int) extends BlockId {
reduceId: Int) extends BlockId with ShuffleId {
override def name: String = RemoteBlockPushResolver.MERGED_SHUFFLE_FILE_NAME_PREFIX + "_" +
appId + "_" + shuffleId + "_" + shuffleMergeId + "_" + reduceId + ".data"
}
Expand All @@ -140,7 +149,7 @@ case class ShuffleMergedIndexBlockId(
appId: String,
shuffleId: Int,
shuffleMergeId: Int,
reduceId: Int) extends BlockId {
reduceId: Int) extends BlockId with ShuffleId {
override def name: String = RemoteBlockPushResolver.MERGED_SHUFFLE_FILE_NAME_PREFIX + "_" +
appId + "_" + shuffleId + "_" + shuffleMergeId + "_" + reduceId + ".index"
}
Expand All @@ -151,7 +160,7 @@ case class ShuffleMergedMetaBlockId(
appId: String,
shuffleId: Int,
shuffleMergeId: Int,
reduceId: Int) extends BlockId {
reduceId: Int) extends BlockId with ShuffleId {
override def name: String = RemoteBlockPushResolver.MERGED_SHUFFLE_FILE_NAME_PREFIX + "_" +
appId + "_" + shuffleId + "_" + shuffleMergeId + "_" + reduceId + ".meta"
}
Expand Down
Loading