Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Feb 18, 2020

What changes were proposed in this pull request?

This patch caches the fetched list of files in FileStreamSource to avoid re-fetching whenever possible.

This improvement would be effective when the source options are being set to below:

  • maxFilesPerTrigger is set
  • latestFirst is set to false (default)

as

  • if maxFilesPerTrigger is unset, Spark will process all the new files within a batch
  • if latestFirst is set to true, it intends to process "latest" files which Spark has to refresh for every batch

Fetched list of files are filtered against SeenFilesMap before caching - unnecessary files are filtered in this phase. Once we cached the file, we don't check the file again for isNewFile, as Spark processes the files in timestamp order so cached files should have equal or later timestamp than latestTimestamp in SeenFilesMap.

Cache is only persisted in memory to simplify the logic - if we support restore cache when restarting query, we should deal with the changes of source options.

To avoid tiny set of inputs on the batch due to have tiny unread files (that could be possible when the list operation provides slightly more than the max files), this patch employs the "lower-bar" to determine whether it's helpful to retain unread files. Spark will discard unread files and do listing in the next batch if the number of unread files is lower than the specific (20% for now) ratio of max files.

This patch will have synergy with SPARK-20568 - while this patch helps to avoid redundant cost of listing, SPARK-20568 will get rid of the cost of listing for processed files. Once the query processes all files in initial load, the cost of listing for the files in initial load will be gone.

Why are the changes needed?

Spark spends huge cost to fetch the list of files from input paths, but restricts the usage of list in a batch. If the streaming query starts from huge input data for various reasons (initial load, reprocessing, etc.) the cost to fetch the files will be applied to all batches as it is unusual to let first microbatch to process all of initial load.

SPARK-20568 will help to reduce the cost to fetch as processed files will be either deleted or moved outside of input paths, but it still won't help in early phase.

Does this PR introduce any user-facing change?

Yes, the driver process would require more memory than before if maxFilesPerTrigger is set and latestFirst is set to "false" to cache fetched files. Previously Spark only takes some amount from left side of the list and discards remaining - so technically the peak memory would be same, but they can be freed sooner.

It may not hurt much, as peak memory is still be similar, and it would require similar amount of memory in any way when maxFilesPerTrigger is unset.

How was this patch tested?

New unit tests. Manually tested under the test environment:

  • input files
    • 171,839 files distributed evenly into 24 directories
    • each file contains 200 lines
  • query: read from the "file stream source" and repartition to 50, and write to the "file stream sink"
    • maxFilesPerTrigger is set to 100

before applying the patch

Screen Shot 2020-02-18 at 11 53 12 PM

after applying the patch

Screen Shot 2020-02-18 at 11 56 01 PM

The area of brown color represents "latestOffset" where listing operation is performed for FileStreamSource. After the patch the cost for listing is paid "only once", whereas before the patch
it was for "every batch".

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 18, 2020

The patch is actually very straightforward about how it works and how it helps (as the changeset except the test code is very small).

I'll attach the test result for the use case of "initial load" in the section of "How was this patch tested?" sooner. I've already have screenshots of UI, but would like to run against latest master.

EDIT: Just updated the description of PR.

@HeartSaVioR
Copy link
Contributor Author

cc. @tdas @zsxwing @gaborgsomogyi

@SparkQA
Copy link

SparkQA commented Feb 18, 2020

Test build #118640 has finished for PR 27620 at commit b417911.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 14, 2020

Test build #121231 has finished for PR 27620 at commit b417911.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 15, 2020

Test build #121303 has finished for PR 27620 at commit b417911.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering what would happen in the following scenario?

  • "latestFirst" -> "true"
  • "maxFilesPerTrigger" -> "5"
  • 6 files are available and 5 processed in batch0 -> 1 stored in unreadFiles
  • 1 new file arrives
  • batch1 processed in next round

The question is with what content will be batch1 executed?

}

override def listStatus(f: Path): Array[FileStatus] = {
val path = f.toUri.getPath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: f.toUri.getPath can be inlined.

}

object CountListingLocalFileSystem {
val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use the object name since there are multiple filesystems declared here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes good point. Will do.


source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
.asInstanceOf[FileStreamSourceOffset]
assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it worth to check nothing not relevant is inside. This probably indicate the need of some reset functionality for pathToNumListStatusCalled...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I've meant here is that the test should fail if some nasty code puts irrelevant data into the map. For example when I put (just for the sake of representation) the following:

        CountListingLocalFileSystem.resetCount()
        CountListingLocalFileSystem.pathToNumListStatusCalled.put("foo", new AtomicLong(1))

it would be good to fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your example is now failing because I added check for counting the element of pathToNumListStatusCalled. Does it address your comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh I realized I didn't push the change. Sorry about it. Will push.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I have to revert it. My bad. I remembered why I only checked the directory - this requires all input files to be verified, which is actually redundant, as we already verified such behavior from the UT "Caches and leverages unread files".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it's checked in the positive case it still holds value in this negative case unless we find a pretty good reason why it's not possible. Negative case code parts can list unnecessary dirs/files.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Apr 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to verify whole behavior of file stream source in this PR. This test only makes sure the calls of listing input directory (and input files as well) are expected, other checks are redundant and error-prone. E.g. Suppose file stream source employs some changes to read side due to some changes, then this test will fail unintentionally.

EDIT: it might be true for input files as well, but that may be the one of important things we may want to watch. (And we checked it in other test I've added.) Other paths are not that important relatively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only makes sure the calls of listing input directory (and input files as well) are expected

Making sure that the modified code doesn't introduce further unintended directory listing is also important but I agree not with the price to make test failures when somebody makes modification in the stream source code. All in all I agree not to add it since we've double checked that no further unintended directory listing introduced.

var lastModified = 0
val inputFiles = (0 to 19).map { idx =>
val f = createFile(idx.toString, new File(src, idx.toString), tmp)
f.setLastModified(lastModified)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe idx * 10000?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding. I guess I used the variable and forgot to clean up when the variable was no longer needed.

var lastModified = 0
(0 to 19).map { idx =>
val f = createFile(idx.toString, new File(src, idx.toString), tmp)
f.setLastModified(lastModified)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

@HeartSaVioR
Copy link
Contributor Author

Just wondering what would happen in the following scenario?

"latestFirst" -> "true"
"maxFilesPerTrigger" -> "5"
6 files are available and 5 processed in batch0 -> 1 stored in unreadFiles
1 new file arrives
batch1 processed in next round
The question is with what content will be batch1 executed?

I've explained the condition when the functionality takes effect in the description of PR - it won't cache the list of files if latestFirst is true, so it should be same as it is.

@SparkQA
Copy link

SparkQA commented Apr 16, 2020

Test build #121348 has finished for PR 27620 at commit 07eed68.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 16, 2020

Test build #121353 has finished for PR 27620 at commit 07eed68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

Just wondering what would happen in the following scenario?
"latestFirst" -> "true"
"maxFilesPerTrigger" -> "5"
6 files are available and 5 processed in batch0 -> 1 stored in unreadFiles
1 new file arrives
batch1 processed in next round
The question is with what content will be batch1 executed?

I've explained the condition when the functionality takes effect in the description of PR - it won't cache the list of files if latestFirst is true, so it should be same as it is.

Wanted to write "latestFirst" -> "false" but with the modified config my question still stands.

@HeartSaVioR
Copy link
Contributor Author

Only one file left in unread will be used for the batch for that case.

It's designed to avoid calling list operation whenever possible, but in some case it might be valid to drop unread files and call list operation if the number of remaining files are relatively smaller than the max files to trigger. I think it's affecting only few batch, though.

@gaborgsomogyi
Copy link
Contributor

I've double checked maxFilesPerTrigger semantics and it's only max number to consider so this doesn't break that. Since I agree that it affects small amount of batches I agree that the overall gain is positive.

@HeartSaVioR
Copy link
Contributor Author

Hmm... I thought about that more, and maybe it's good to add a lower bar to avoid the weird case, listing files provides slightly more than maxFilesPerTrigger. The tricky part is deciding the condition to discard unread files (ratio based on maxFilesPerTrigger? static number?); logic to add would be straightforward.

@SparkQA
Copy link

SparkQA commented Apr 17, 2020

Test build #121395 has finished for PR 27620 at commit 57981cd.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Apr 17, 2020

Hmm, seems the issue is relevant.

maybe it's good to add a lower bar to avoid the weird case, listing files provides slightly more than maxFilesPerTrigger.

+1 on this. Maybe we can add a new test to cover this case.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Apr 17, 2020

Just addressed lower bar of unseen files - the threshold ratio is set to 0.2 (20%) of max files for now, and we can adjust it later if we can find better value (or even condition).

@SparkQA
Copy link

SparkQA commented Apr 17, 2020

Test build #121415 has finished for PR 27620 at commit 8251b74.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

Looks good basically, only one thing is under discussion.

@gaborgsomogyi
Copy link
Contributor

Does the modified code behaves the same way you've shown on the pictures attached?

@SparkQA
Copy link

SparkQA commented Jun 14, 2020

Test build #123989 has finished for PR 27620 at commit 8251b74.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jun 14, 2020

Test build #124000 has finished for PR 27620 at commit 8251b74.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

After looking at a couple of more issues on file stream source, I'm feeling that we also need to have upper bound of the cache, as file stream source is already contributing memory usage on driver and this adds (possibly) unbounded amount of memory.

I guess 10,000 entries are good enough, as it affects 100 batches when maxFilesPerTrigger is set to 100, and affects 10 batches when maxFilesPerTrigger is set to 1000. Once we find that higher value is OK for memory usage and pretty much helpful on majority of workloads, we can make it configurable with higher default value.

@SparkQA
Copy link

SparkQA commented Jun 29, 2020

Test build #124638 has started for PR 27620 at commit 0e972fc.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jun 30, 2020

Test build #124675 has finished for PR 27620 at commit 0e972fc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124725 has finished for PR 27620 at commit 0e972fc.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124763 has finished for PR 27620 at commit 0e972fc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HiveSuite failed

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124773 has finished for PR 27620 at commit 0e972fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125728 has finished for PR 27620 at commit 0e972fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the test case, could you rebase this to the master and deduplicate CountListingLocalFileSystem?

@SparkQA
Copy link

SparkQA commented Aug 9, 2020

Test build #127239 has finished for PR 27620 at commit 0e972fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

I just revisited about deduplicating CountListingLocalFileSystem, and felt that deduplication isn't safe, because the instance refers singleton object which will be co-used once we deduplicate, and calling reset may affect other test - this would depend on the characteristic of parallelism on test suite run. If it's guaranteed that test suites are never executed in parallel in same JVM then the change would be safe, otherwise it wouldn't.

@dongjoon-hyun
Copy link
Member

Got it, @HeartSaVioR .

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @HeartSaVioR and @gaborgsomogyi .
Merged to master for Apache Spark 3.1.0 on December 2020.

@dongjoon-hyun
Copy link
Member

cc @tdas , @zsxwing , @jose-torres , @dbtsai

type Timestamp = Long

val DISCARD_UNSEEN_FILES_RATIO = 0.2
val MAX_CACHED_UNSEEN_FILES = 10000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for keeping these 2 parameters instead of making it configurable? Is it to detail to expose to the end-user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to avoid Spark configuration be "airplane control panel" - end users already have bunch of things to tune. It's completely OK to make them be configurable, if we found the case the default value won't work.

HeartSaVioR pushed a commit that referenced this pull request May 20, 2024
### What changes were proposed in this pull request?
This change adds configuration options for the streaming input File Source for `maxCachedFiles` and `discardCachedInputRatio`.  These values were originally introduced with #27620 but were hardcoded to 10,000 and 0.2, respectively.

### Why are the changes needed?
Under certain workloads with large `maxFilesPerTrigger` settings, the performance gain from caching the input files capped at 10,000 can cause a cluster to be underutilized and jobs to take longer to finish if each batch takes a while to finish.  For example, a job with `maxFilesPerTrigger` set to 100,000 would do all 100k in batch 1, then only 10k in batch 2, but both batches could take just as long since some of the files cause skewed processing times.  This results in a cluster spending nearly the same amount of time while processing only 1/10 of the files it could have.

### Does this PR introduce _any_ user-facing change?
Updated documentation for structured streaming sources to describe new configurations options

### How was this patch tested?
New and existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45362 from ragnarok56/filestream-cached-files-config.

Authored-by: ragnarok56 <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants