Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class FileStreamSource(
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)

metadataLog.allFiles().foreach { entry =>
metadataLog.restore().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
}
seenFiles.purge()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FileStreamSourceLog(
extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {

import CompactibleFileStreamLog._
import FileStreamSourceLog._

// Configurations about metadata compaction
protected override val defaultCompactInterval: Int =
Expand Down Expand Up @@ -122,8 +123,34 @@ class FileStreamSourceLog(
}
batches
}

def restore(): Array[FileEntry] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To not touch existing semantic of allFiles(), I simply add a new method to cover the new semantic. I'll just override allFiles() if it's preferred.

val files = allFiles()

// When restarting the query, there is a case which the query starts from compaction batch,
// and the batch has source metadata file to read. One case is that the previous query
// succeeded to read from inputs, but not finalized the batch for various reasons.
// The below code finds the latest compaction batch, and put entries for the batch into the
// file entry cache which would avoid reading compact batch file twice.
// It doesn't know about offset / commit metadata in checkpoint so doesn't know which exactly
// batch to start from, but in practice, only couple of latest batches are candidates to
// be started. We leverage the fact to skip calculation if possible.
files.lastOption.foreach { lastEntry =>
val latestBatchId = lastEntry.batchId
val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0)
if ((latestBatchId - latestCompactedBatchId) < PREV_NUM_BATCHES_TO_READ_IN_RESTORE) {
val logsForLatestCompactedBatch = files.filter { entry =>
entry.batchId == latestCompactedBatchId
}
fileEntryCache.put(latestCompactedBatchId, logsForLatestCompactedBatch)
}
}

files
}
}

object FileStreamSourceLog {
val VERSION = 1
val PREV_NUM_BATCHES_TO_READ_IN_RESTORE = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,70 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("restore from file stream source log") {
def createEntries(batchId: Long, count: Int): Array[FileEntry] = {
(1 to count).map { idx =>
FileEntry(s"path_${batchId}_$idx", 10000 * batchId + count, batchId)
}.toArray
}

withSQLConf(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5") {
def verifyBatchAvailabilityInCache(
fileEntryCache: java.util.LinkedHashMap[Long, Array[FileEntry]],
expectNotAvailable: Seq[Int],
expectAvailable: Seq[Int]): Unit = {
expectNotAvailable.foreach { batchId =>
assert(!fileEntryCache.containsKey(batchId.toLong))
}
expectAvailable.foreach { batchId =>
assert(fileEntryCache.containsKey(batchId.toLong))
}
}
withTempDir { chk =>
val _fileEntryCache = PrivateMethod[java.util.LinkedHashMap[Long, Array[FileEntry]]](
Symbol("fileEntryCache"))

val metadata = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark,
chk.getCanonicalPath)
val fileEntryCache = metadata invokePrivate _fileEntryCache()

(0 to 4).foreach { batchId =>
metadata.add(batchId, createEntries(batchId, 100))
}
val allFiles = metadata.allFiles()

// batch 4 is a compact batch which logs would be cached in fileEntryCache
verifyBatchAvailabilityInCache(fileEntryCache, Seq(0, 1, 2, 3), Seq(4))

val metadata2 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark,
chk.getCanonicalPath)
val fileEntryCache2 = metadata2 invokePrivate _fileEntryCache()

// allFiles() doesn't restore the logs for the latest compact batch into file entry cache
assert(metadata2.allFiles() === allFiles)
verifyBatchAvailabilityInCache(fileEntryCache2, Seq(0, 1, 2, 3, 4), Seq.empty)

// restore() will restore the logs for the latest compact batch into file entry cache
assert(metadata2.restore() === allFiles)
verifyBatchAvailabilityInCache(fileEntryCache2, Seq(0, 1, 2, 3), Seq(4))

(5 to 5 + FileStreamSourceLog.PREV_NUM_BATCHES_TO_READ_IN_RESTORE).foreach { batchId =>
metadata2.add(batchId, createEntries(batchId, 100))
}

val metadata3 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark,
chk.getCanonicalPath)
val fileEntryCache3 = metadata3 invokePrivate _fileEntryCache()

// restore() will not restore the logs for the latest compact batch into file entry cache
// if the latest batch is too far from latest compact batch, because it's unlikely Spark
// will request the batch for the start point.
assert(metadata3.restore() === metadata2.allFiles())
verifyBatchAvailabilityInCache(fileEntryCache3, Seq(0, 1, 2, 3, 4), Seq.empty)
}
}
}

test("get arbitrary batch from FileStreamSource") {
withTempDirs { case (src, tmp) =>
withSQLConf(
Expand Down