-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files #27620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b417911
07eed68
57981cd
8251b74
0e972fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming | |
|
|
||
| import java.io.File | ||
| import java.net.URI | ||
| import java.util.concurrent.atomic.AtomicLong | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.util.Random | ||
|
|
@@ -32,11 +33,11 @@ import org.scalatest.time.SpanSugar._ | |
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.util._ | ||
| import org.apache.spark.sql.connector.read.streaming.ReadLimit | ||
| import org.apache.spark.sql.execution.streaming._ | ||
| import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap, SourceFileArchiver} | ||
| import org.apache.spark.sql.execution.streaming.sources.MemorySink | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ | ||
| import org.apache.spark.sql.streaming.util.StreamManualClock | ||
| import org.apache.spark.sql.test.SharedSparkSession | ||
| import org.apache.spark.sql.types._ | ||
|
|
@@ -997,15 +998,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { | |
| } | ||
|
|
||
| test("when schema inference is turned on, should read partition data") { | ||
| def createFile(content: String, src: File, tmp: File): Unit = { | ||
| val tempFile = Utils.tempFileWith(new File(tmp, "text")) | ||
| val finalFile = new File(src, tempFile.getName) | ||
| require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file: ${src.isFile}") | ||
| require(src.mkdirs(), s"Cannot create $src") | ||
| require(src.isDirectory(), s"$src is not a directory") | ||
| require(stringToFile(tempFile, content).renameTo(finalFile)) | ||
| } | ||
|
|
||
| withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { | ||
| withTempDirs { case (dir, tmp) => | ||
| val partitionFooSubDir = new File(dir, "partition=foo") | ||
|
|
@@ -1602,6 +1594,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { | |
| } | ||
|
|
||
| test("do not recheck that files exist during getBatch") { | ||
| val scheme = ExistsThrowsExceptionFileSystem.scheme | ||
| withTempDir { temp => | ||
| spark.conf.set( | ||
| s"fs.$scheme.impl", | ||
|
|
@@ -1935,6 +1928,129 @@ class FileStreamSourceSuite extends FileStreamSourceTest { | |
| assert(expectedDir.exists()) | ||
| assert(expectedDir.list().exists(_.startsWith(filePrefix))) | ||
| } | ||
|
|
||
| private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = { | ||
| val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl" | ||
| val originClassForLocalFileSystem = spark.conf.getOption(optionKey) | ||
| try { | ||
| spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName) | ||
| body | ||
| } finally { | ||
| originClassForLocalFileSystem match { | ||
| case Some(fsClazz) => spark.conf.set(optionKey, fsClazz) | ||
| case _ => spark.conf.unset(optionKey) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("Caches and leverages unread files") { | ||
| withCountListingLocalFileSystemAsLocalFileSystem { | ||
| withThreeTempDirs { case (src, meta, tmp) => | ||
| val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "10") | ||
| val scheme = CountListingLocalFileSystem.scheme | ||
| val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", | ||
| StructType(Nil), Seq.empty, meta.getCanonicalPath, options) | ||
| val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog")) | ||
| val metadataLog = source invokePrivate _metadataLog() | ||
|
|
||
| def verifyBatch( | ||
| offset: FileStreamSourceOffset, | ||
| expectedBatchId: Long, | ||
| inputFiles: Seq[File], | ||
| expectedListingCount: Int): Unit = { | ||
| val batchId = offset.logOffset | ||
| assert(batchId === expectedBatchId) | ||
|
|
||
| val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry]) | ||
| assert(files.forall(_.batchId == batchId)) | ||
|
|
||
| val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath } | ||
| val expectedInputFiles = inputFiles.slice(batchId.toInt * 10, batchId.toInt * 10 + 10) | ||
| .map(_.getCanonicalPath) | ||
| assert(actualInputFiles === expectedInputFiles) | ||
|
|
||
| assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled | ||
| .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) | ||
| } | ||
|
|
||
| CountListingLocalFileSystem.resetCount() | ||
|
|
||
| // provide 41 files in src, with sequential "last modified" to guarantee ordering | ||
| val inputFiles = (0 to 40).map { idx => | ||
| val f = createFile(idx.toString, new File(src, idx.toString), tmp) | ||
| f.setLastModified(idx * 10000) | ||
| f | ||
| } | ||
|
|
||
| // 4 batches will be available for 40 input files | ||
| (0 to 3).foreach { batchId => | ||
| val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) | ||
| .asInstanceOf[FileStreamSourceOffset] | ||
| verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1) | ||
| } | ||
|
|
||
| // batch 5 will trigger list operation though the batch 4 should have 1 unseen file: | ||
| // 1 is smaller than the threshold (refer FileStreamSource.DISCARD_UNSEEN_FILES_RATIO), | ||
| // hence unseen files for batch 4 will be discarded. | ||
| val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) | ||
| .asInstanceOf[FileStreamSourceOffset] | ||
| assert(4 === offsetBatch.logOffset) | ||
| assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled | ||
| .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) | ||
|
|
||
| val offsetBatch2 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) | ||
| .asInstanceOf[FileStreamSourceOffset] | ||
| // latestOffset returns the offset for previous batch which means no new batch is presented | ||
| assert(4 === offsetBatch2.logOffset) | ||
| // listing should be performed after the list of unread files are exhausted | ||
| assert(3 === CountListingLocalFileSystem.pathToNumListStatusCalled | ||
| .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("Don't cache unread files when latestFirst is true") { | ||
| withCountListingLocalFileSystemAsLocalFileSystem { | ||
| withThreeTempDirs { case (src, meta, tmp) => | ||
| val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5") | ||
| val scheme = CountListingLocalFileSystem.scheme | ||
| val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", | ||
| StructType(Nil), Seq.empty, meta.getCanonicalPath, options) | ||
|
|
||
| CountListingLocalFileSystem.resetCount() | ||
|
|
||
| // provide 20 files in src, with sequential "last modified" to guarantee ordering | ||
| (0 to 19).map { idx => | ||
| val f = createFile(idx.toString, new File(src, idx.toString), tmp) | ||
| f.setLastModified(idx * 10000) | ||
| f | ||
| } | ||
|
|
||
| source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) | ||
| .asInstanceOf[FileStreamSourceOffset] | ||
| assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it worth to check nothing not relevant is inside. This probably indicate the need of some reset functionality for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I've meant here is that the test should fail if some nasty code puts irrelevant data into the map. For example when I put (just for the sake of representation) the following: it would be good to fail.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your example is now failing because I added check for counting the element of pathToNumListStatusCalled. Does it address your comment?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sigh I realized I didn't push the change. Sorry about it. Will push.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I have to revert it. My bad. I remembered why I only checked the directory - this requires all input files to be verified, which is actually redundant, as we already verified such behavior from the UT "Caches and leverages unread files".
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if it's checked in the positive case it still holds value in this negative case unless we find a pretty good reason why it's not possible. Negative case code parts can list unnecessary dirs/files.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure we want to verify whole behavior of file stream source in this PR. This test only makes sure the calls of listing input directory (and input files as well) are expected, other checks are redundant and error-prone. E.g. Suppose file stream source employs some changes to read side due to some changes, then this test will fail unintentionally. EDIT: it might be true for input files as well, but that may be the one of important things we may want to watch. (And we checked it in other test I've added.) Other paths are not that important relatively.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Making sure that the modified code doesn't introduce further unintended directory listing is also important but I agree not with the price to make test failures when somebody makes modification in the stream source code. All in all I agree not to add it since we've double checked that no further unintended directory listing introduced. |
||
| .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) | ||
|
|
||
| // Even though the first batch doesn't read all available files, since latestFirst is true, | ||
| // file stream source will not leverage unread files - next batch will also trigger | ||
| // listing files | ||
| source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) | ||
| .asInstanceOf[FileStreamSourceOffset] | ||
| assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled | ||
| .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def createFile(content: String, src: File, tmp: File): File = { | ||
| val tempFile = Utils.tempFileWith(new File(tmp, "text")) | ||
| val finalFile = new File(src, tempFile.getName) | ||
| require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file: ${src.isFile}") | ||
| require(src.mkdirs(), s"Cannot create $src") | ||
| require(src.isDirectory(), s"$src is not a directory") | ||
| require(stringToFile(tempFile, content).renameTo(finalFile)) | ||
| finalFile | ||
| } | ||
| } | ||
|
|
||
| class FileStreamSourceStressTestSuite extends FileStreamSourceTest { | ||
|
|
@@ -1961,6 +2077,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest { | |
| * `DataSource.resolveRelation`. | ||
| */ | ||
| class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { | ||
| import ExistsThrowsExceptionFileSystem._ | ||
|
|
||
| override def getUri: URI = { | ||
| URI.create(s"$scheme:///") | ||
| } | ||
|
|
@@ -1980,3 +2098,24 @@ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { | |
| object ExistsThrowsExceptionFileSystem { | ||
| val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs" | ||
| } | ||
|
|
||
| class CountListingLocalFileSystem extends RawLocalFileSystem { | ||
HeartSaVioR marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| import CountListingLocalFileSystem._ | ||
|
|
||
| override def getUri: URI = { | ||
| URI.create(s"$scheme:///") | ||
| } | ||
|
|
||
| override def listStatus(f: Path): Array[FileStatus] = { | ||
| val curVal = pathToNumListStatusCalled.getOrElseUpdate(f.toUri.getPath, new AtomicLong(0)) | ||
| curVal.incrementAndGet() | ||
| super.listStatus(f) | ||
| } | ||
| } | ||
|
|
||
| object CountListingLocalFileSystem { | ||
| val scheme = s"CountListingLocalFileSystem${math.abs(Random.nextInt)}fs" | ||
| val pathToNumListStatusCalled = new mutable.HashMap[String, AtomicLong] | ||
|
|
||
| def resetCount(): Unit = pathToNumListStatusCalled.clear() | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for keeping these 2 parameters instead of making it configurable? Is it to detail to expose to the end-user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just wanted to avoid Spark configuration be "airplane control panel" - end users already have bunch of things to tune. It's completely OK to make them be configurable, if we found the case the default value won't work.