Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class FileStreamSource(
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)

metadataLog.allFiles().foreach { entry =>
metadataLog.restore().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
}
seenFiles.purge()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FileStreamSourceLog(
extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {

import CompactibleFileStreamLog._
import FileStreamSourceLog._

// Configurations about metadata compaction
protected override val defaultCompactInterval: Int =
Expand Down Expand Up @@ -122,8 +123,35 @@ class FileStreamSourceLog(
}
batches
}

def restore(): Array[FileEntry] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To not touch existing semantic of allFiles(), I simply add a new method to cover the new semantic. I'll just override allFiles() if it's preferred.

val files = allFiles()

// When restarting the query, there is a case which the query starts from compaction batch,
// and the batch has source metadata file to read. One case is that the previous query
// succeeded to read from inputs, but not finalized the batch for various reasons.
// The below code finds the latest compaction batch, and put entries for the batch into the
// file entry cache which would avoid reading compact batch file twice.
// It doesn't know about offset / commit metadata in checkpoint so doesn't know which exactly
// batch to start from, but in practice, only couple of latest batches are candidates to
// be started. We leverage the fact to skip calculation if possible.
files.lastOption.foreach { lastEntry =>
val latestBatchId = lastEntry.batchId
val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0)
if (latestCompactedBatchId > 0 &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it's not working when one set spark.sql.streaming.fileSource.log.compactInterval to 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just to prune the case where it may not help much, but yeah let's make it simple. It won't hurt in either way.

(latestBatchId - latestCompactedBatchId) < PREV_NUM_BATCHES_TO_READ_IN_RESTORE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment would be good why this heuristic is useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I forgot to explain, but looks like I explained already:

// It doesn't know about offset / commit metadata in checkpoint so doesn't know which exactly
// batch to start from, but in practice, only couple of latest batches are candidates to
// be started. We leverage the fact to skip calculation if possible.

only couple of latest batches is the threshold - I heuristically took 2 here.

val logsForLatestCompactedBatch = files.filter { entry =>
entry.batchId == latestCompactedBatchId
}
fileEntryCache.put(latestCompactedBatchId, logsForLatestCompactedBatch)
}
}

files
}
}

object FileStreamSourceLog {
val VERSION = 1
val PREV_NUM_BATCHES_TO_READ_IN_RESTORE = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,60 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("restore from file stream source log") {
def createEntries(batchId: Long, count: Int): Array[FileEntry] = {
(1 to count).map { idx =>
FileEntry(s"path_${batchId}_$idx", 10000 * batchId + count, batchId)
}.toArray
}

withSQLConf(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5") {
withTempDir { chk =>
val _fileEntryCache = PrivateMethod[java.util.LinkedHashMap[Long, Array[FileEntry]]](
Symbol("fileEntryCache"))

val metadata = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark,
chk.getCanonicalPath)
val fileEntryCache = metadata invokePrivate _fileEntryCache()

(0 to 4).foreach { batchId =>
metadata.add(batchId, createEntries(batchId, 100))
}
val allFiles = metadata.allFiles()

// batch 4 is a compact batch which logs would be cached in fileEntryCache
fileEntryCache.containsKey(4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need some kind of assertion here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol I have no idea why I missed assert here. Nice finding. Will add.


val metadata2 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark,
chk.getCanonicalPath)
val fileEntryCache2 = metadata2 invokePrivate _fileEntryCache()

// allFiles() doesn't restore the logs for the latest compact batch into file entry cache
assert(metadata2.allFiles() === allFiles)
assert(!fileEntryCache2.containsKey(4L))

// restore() will restore the logs for the latest compact batch into file entry cache
assert(metadata2.restore() === allFiles)
assert(fileEntryCache2.containsKey(4L))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here.


(5 to 5 + FileStreamSourceLog.PREV_NUM_BATCHES_TO_READ_IN_RESTORE).foreach { batchId =>
metadata2.add(batchId, createEntries(batchId, 100))
}
val allFiles2 = metadata2.allFiles()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be inlined, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand about "inline" here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I've seen this val only used in one place.


val metadata3 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark,
chk.getCanonicalPath)
val fileEntryCache3 = metadata3 invokePrivate _fileEntryCache()

// restore() will not restore the logs for the latest compact batch into file entry cache
// if the latest batch is too far from latest compact batch, because it's unlikely Spark
// will request the batch for the start point.
assert(metadata2.restore() === allFiles2)
assert(!fileEntryCache3.containsKey(4L))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here.

}
}
}

test("get arbitrary batch from FileStreamSource") {
withTempDirs { case (src, tmp) =>
withSQLConf(
Expand Down