diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e8ce8e148709..77a6b8b41472 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -111,6 +111,8 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + private var unreadFiles: Seq[(String, Long)] = _ + /** * Returns the maximum offset that can be retrieved from the source. * @@ -118,15 +120,45 @@ class FileStreamSource( * there is no race here, so the cost of `synchronized` should be rare. */ private def fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset = synchronized { - // All the new files found - ignore aged files and files that we have seen. - val newFiles = fetchAllFiles().filter { - case (path, timestamp) => seenFiles.isNewFile(path, timestamp) + val newFiles = if (unreadFiles != null) { + logDebug(s"Reading from unread files - ${unreadFiles.size} files are available.") + unreadFiles + } else { + // All the new files found - ignore aged files and files that we have seen. + fetchAllFiles().filter { + case (path, timestamp) => seenFiles.isNewFile(path, timestamp) + } } // Obey user's setting to limit the number of files in this batch trigger. - val batchFiles = limit match { - case files: ReadMaxFiles => newFiles.take(files.maxFiles()) - case _: ReadAllAvailable => newFiles + val (batchFiles, unselectedFiles) = limit match { + case files: ReadMaxFiles if !sourceOptions.latestFirst => + // we can cache and reuse remaining fetched list of files in further batches + val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles()) + if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_FILES_RATIO) { + // Discard unselected files if the number of files are smaller than threshold. + // This is to avoid the case when the next batch would have too few files to read + // whereas there're new files available. + logTrace(s"Discarding ${usFiles.length} unread files as it's smaller than threshold.") + (bFiles, null) + } else { + (bFiles, usFiles) + } + + case files: ReadMaxFiles => + // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch + (newFiles.take(files.maxFiles()), null) + + case _: ReadAllAvailable => (newFiles, null) + } + + if (unselectedFiles != null && unselectedFiles.nonEmpty) { + logTrace(s"Taking first $MAX_CACHED_UNSEEN_FILES unread files.") + unreadFiles = unselectedFiles.take(MAX_CACHED_UNSEEN_FILES) + logTrace(s"${unreadFiles.size} unread files are available for further batches.") + } else { + unreadFiles = null + logTrace(s"No unread file is available for further batches.") } batchFiles.foreach { file => @@ -139,6 +171,7 @@ class FileStreamSource( s""" |Number of new files = ${newFiles.size} |Number of files selected for batch = ${batchFiles.size} + |Number of unread files = ${Option(unreadFiles).map(_.size).getOrElse(0)} |Number of seen files = ${seenFiles.size} |Number of files purged from tracking map = $numPurged """.stripMargin) @@ -311,6 +344,9 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long + val DISCARD_UNSEEN_FILES_RATIO = 0.2 + val MAX_CACHED_UNSEEN_FILES = 10000 + case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fa320333143e..27cf8235ff21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming import java.io.File import java.net.URI +import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable import scala.util.Random @@ -32,11 +33,11 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.connector.read.streaming.ReadLimit import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap, SourceFileArchiver} import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -997,15 +998,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("when schema inference is turned on, should read partition data") { - def createFile(content: String, src: File, tmp: File): Unit = { - val tempFile = Utils.tempFileWith(new File(tmp, "text")) - val finalFile = new File(src, tempFile.getName) - require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file: ${src.isFile}") - require(src.mkdirs(), s"Cannot create $src") - require(src.isDirectory(), s"$src is not a directory") - require(stringToFile(tempFile, content).renameTo(finalFile)) - } - withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { withTempDirs { case (dir, tmp) => val partitionFooSubDir = new File(dir, "partition=foo") @@ -1602,6 +1594,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("do not recheck that files exist during getBatch") { + val scheme = ExistsThrowsExceptionFileSystem.scheme withTempDir { temp => spark.conf.set( s"fs.$scheme.impl", @@ -1935,6 +1928,129 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(expectedDir.exists()) assert(expectedDir.list().exists(_.startsWith(filePrefix))) } + + private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = { + val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl" + val originClassForLocalFileSystem = spark.conf.getOption(optionKey) + try { + spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName) + body + } finally { + originClassForLocalFileSystem match { + case Some(fsClazz) => spark.conf.set(optionKey, fsClazz) + case _ => spark.conf.unset(optionKey) + } + } + } + + test("Caches and leverages unread files") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "10") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog")) + val metadataLog = source invokePrivate _metadataLog() + + def verifyBatch( + offset: FileStreamSourceOffset, + expectedBatchId: Long, + inputFiles: Seq[File], + expectedListingCount: Int): Unit = { + val batchId = offset.logOffset + assert(batchId === expectedBatchId) + + val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry]) + assert(files.forall(_.batchId == batchId)) + + val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath } + val expectedInputFiles = inputFiles.slice(batchId.toInt * 10, batchId.toInt * 10 + 10) + .map(_.getCanonicalPath) + assert(actualInputFiles === expectedInputFiles) + + assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + + CountListingLocalFileSystem.resetCount() + + // provide 41 files in src, with sequential "last modified" to guarantee ordering + val inputFiles = (0 to 40).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(idx * 10000) + f + } + + // 4 batches will be available for 40 input files + (0 to 3).foreach { batchId => + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1) + } + + // batch 5 will trigger list operation though the batch 4 should have 1 unseen file: + // 1 is smaller than the threshold (refer FileStreamSource.DISCARD_UNSEEN_FILES_RATIO), + // hence unseen files for batch 4 will be discarded. + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + assert(4 === offsetBatch.logOffset) + assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + + val offsetBatch2 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + // latestOffset returns the offset for previous batch which means no new batch is presented + assert(4 === offsetBatch2.logOffset) + // listing should be performed after the list of unread files are exhausted + assert(3 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + } + } + + test("Don't cache unread files when latestFirst is true") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + + CountListingLocalFileSystem.resetCount() + + // provide 20 files in src, with sequential "last modified" to guarantee ordering + (0 to 19).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(idx * 10000) + f + } + + source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + + // Even though the first batch doesn't read all available files, since latestFirst is true, + // file stream source will not leverage unread files - next batch will also trigger + // listing files + source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + } + } + + private def createFile(content: String, src: File, tmp: File): File = { + val tempFile = Utils.tempFileWith(new File(tmp, "text")) + val finalFile = new File(src, tempFile.getName) + require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file: ${src.isFile}") + require(src.mkdirs(), s"Cannot create $src") + require(src.isDirectory(), s"$src is not a directory") + require(stringToFile(tempFile, content).renameTo(finalFile)) + finalFile + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { @@ -1961,6 +2077,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest { * `DataSource.resolveRelation`. */ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { + import ExistsThrowsExceptionFileSystem._ + override def getUri: URI = { URI.create(s"$scheme:///") } @@ -1980,3 +2098,24 @@ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { object ExistsThrowsExceptionFileSystem { val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs" } + +class CountListingLocalFileSystem extends RawLocalFileSystem { + import CountListingLocalFileSystem._ + + override def getUri: URI = { + URI.create(s"$scheme:///") + } + + override def listStatus(f: Path): Array[FileStatus] = { + val curVal = pathToNumListStatusCalled.getOrElseUpdate(f.toUri.getPath, new AtomicLong(0)) + curVal.incrementAndGet() + super.listStatus(f) + } +} + +object CountListingLocalFileSystem { + val scheme = s"CountListingLocalFileSystem${math.abs(Random.nextInt)}fs" + val pathToNumListStatusCalled = new mutable.HashMap[String, AtomicLong] + + def resetCount(): Unit = pathToNumListStatusCalled.clear() +}