Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
adding purge methods
  • Loading branch information
ericm-db committed Jun 10, 2024
commit c058c76c4b7fe39500b29de2714895edc412c6ce
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ trait AsyncLogPurge extends Logging {

protected def purge(threshold: Long): Unit

protected def purgeOldest(): Unit

protected lazy val useAsyncPurge: Boolean = sparkSession.conf.get(SQLConf.ASYNC_LOG_PURGE)

protected def purgeAsync(batchId: Long): Unit = {
if (purgeRunning.compareAndSet(false, true)) {
asyncPurgeExecutorService.execute(() => {
try {
purge(batchId - minLogEntriesToMaintain)
purgeOldest()
} catch {
case throwable: Throwable =>
logError("Encountered error while performing async log purge", throwable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](
}
}

def purgeOldest(minEntriesToMaintain: Int): Unit = {
val batchIds = listBatches.sorted
if (batchIds.length > minEntriesToMaintain) {
val filesToDelete = batchIds.take(batchIds.length - minEntriesToMaintain)
filesToDelete.foreach { batchId =>
val path = batchIdToPath(batchId)
fileManager.delete(path)
if (metadataCacheEnabled) batchCache.remove(batchId)
logTrace(s"Removed metadata log file: $path")
}
}
}

/** List the available batches on file system. */
protected def listBatches: Array[Long] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ class MicroBatchExecution(
purgeAsync(execCtx.batchId)
} else {
purge(execCtx.batchId - minLogEntriesToMaintain)
purgeOldest()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,11 @@ abstract class StreamExecution(
logDebug(s"Purging metadata at threshold=$threshold")
offsetLog.purge(threshold)
commitLog.purge(threshold)
operatorStateMetadataLogs.foreach(_._2.purge(threshold))
}

protected def purgeOldest(): Unit = {
operatorStateMetadataLogs.foreach(
_._2.purgeOldest(minLogEntriesToMaintain))
}
}

Expand Down