Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Apr 30, 2020

What changes were proposed in this pull request?

This PR introduces a new option, inputRetention to provide a way to specify retention on input files.

maxAgeMs plays as soft limit (it doesn't apply for some conditions like first batch, as well as it's applied relatively to the modified time of input files). Given it's not consistently applied across the matrix of configurations, Spark cannot purge the entries based on the configuration. (Streaming query can change the configurations and be relaunched.)

inputRetention plays as hard limit - Spark will not include files older than the retention as input files, as well as tries to exclude file entries older than the retention (it actually happens on compaction, as it's the only phase to remove entries).

inputRetention is relative to the system timestamp unlike maxAgeMs, which is easier for end users to reason about. This would require end users to correctly set the nodes' timestamp, but in most cases they would do it in other reasons as well. Also, this would filter out old files when the query intends to replay from input files, hence this should be considered as well.

Why are the changes needed?

This has been a pain to deal with metadata growing in both file stream source and file stream sink. For file stream source, all processed input files are tracked which size is continuously growing, and there's no approach on reducing the size/entries. In compact batch, it reads all previous input files to write new compact file, which brings major latency.

Does this PR introduce any user-facing change?

This doesn't bring any change "by default", as the new configuration is optional. (The default value is set to unrealistic one making it effectively none.)

This adds a new configuration - previous sections described the behavior.

How was this patch tested?

New UTs verifying two behaviors per test.

  1. old files should not be included as input files if input retention is specified
  2. when compacting, outdated entries should be filtered out

I've manually tested with above two behaviors as well.

@HeartSaVioR
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Apr 30, 2020

Test build #122127 has finished for PR 28422 at commit 738caa1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 2, 2020

Test build #122203 has finished for PR 28422 at commit 2af1df1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented May 27, 2020

Test build #123141 has finished for PR 28422 at commit 2af1df1.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented May 27, 2020

Test build #123161 has finished for PR 28422 at commit 2af1df1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented May 27, 2020

Test build #123172 has finished for PR 28422 at commit 2af1df1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

Just picked this up. Maybe the user group reference can be added as a comment and not as commit msg.

I've manually tested with above two behaviors as well.

How do you mean that exactly?

NOTE 1: Please be careful to set the value if the query replays from the old input files.<br/>
NOTE 2: Please make sure the timestamp is in sync between nodes which run the query.<br/>
<br/>
"file:///dataset.txt"<br/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was reviewing this I've pinpointed a parameter description split and opened #28739

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jun 8, 2020

Maybe the user group reference can be added as a comment and not as commit msg.

I'll move it to the JIRA description - personally I'd be OK to leave the rationalization in commit message, but I agree it's redundant and make commit message be verbose.

(EDIT: I already did that, skipping)

I've manually tested with above two behaviors as well.

How do you mean that exactly?

I meant both two behaviors are manually tested

  1. old files should not be included as input files if input retention is specified
  2. when compacting, outdated entries should be filtered out

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123621 has finished for PR 28422 at commit 06ee53d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

@gaborgsomogyi
Copy link
Contributor

retest this please

@gaborgsomogyi
Copy link
Contributor

I see that maxFileAge does'n't apply in all cases which means that it can't be used as an exclude criteria (at least I presume that's what you've considered). Please correct me if I'm wrong.

First I've taken a look at this purely from user perspective. I'm a user and I've realized that the compact file is growing infinitely. I've just taken a look at this solution and as a first glance:

  • I see that all of a sudden 2 parameters are there to configure maximum age of a file. This generated hell a lot of questions in my head and most probably I'll fall into several edge cases which may end-up in data loss.
  • I've started to think why do I need to configure an additional parameter on my side to have an influence on something which I don't want to care about. My expectation as a user that Spark saves couple of things to recover but I don't want to know what is that and just should work by default.

From engineering side I think if there are 2 params which meant to configure almost the same is something we should take a closer look: Maximum age of a file that can be found in this directory, before it is ignored.

I have the feeling something is not fully consistent with the general approach.

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123632 has finished for PR 28422 at commit 06ee53d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

I agree the new addition of the similar option feels tricky.

Maybe you've already indicated there're some cases maxFileAge has to be ignored which means Spark is never able to drop entries from metadata (e.g. when latestFirst is true and maxFilesPerTrigger is set). Given all of these options can be changed for the further runs, I was confused whether it'd be safe to drop entries based on the current set of options and status of entries. There looked to be an edge-case input files can be processed more than once.

Also I felt it's less intuitive to reason about the way how the max age is specified - it is with respect to the timestamp of the latest file being figured out from Spark, not the timestamp of the current system. (But well... That might be only me.)

The new option ensures that the behavior is consistent regardless of these options. It just plays as "hard" limit and in any case Spark won't handle the files which are older than the threshold. (Suppose these files are simply deleted due to the retention policy - not physically though) It applies on both forward read and backward read, doesn't matter how many files Spark will read in a batch.
(Personally, I think maxFileAge itself should work like the way, and then we wouldn't have such confusion.)

@gaborgsomogyi
Copy link
Contributor

I've analyzed this further. I have the same opinion about maxFileAge, namely it's unintuitive how it's programmed. I think it should be like:

  • maxFileAge should behave like inputRetention. Retention is based on current timestamp normally. We shouldn't go far, Kafka and similar components does that.
  • The current feature should depend on maxFileAge

If the user wants to operate a query with latestFirst in long term then I see these options:

  • User sets maxFileAge properly => no file loss just some fluctuation in the number of not processed files
  • User doesn't set maxFileAge properly but cluster sized properly => configuration issue, because with proper value all the files must be processed within maxFileAge.
  • User doesn't set maxFileAge properly and cluster sized badly => sizing and configuration issue. Cluster computation power must be increased to have room for the old not yet processed files. As in the previous case choosing appropriate maxFileAge is important.

The last point can be problematic and can end-up in data loss but this is exactly the same when processing data from Kafka. If retention fires then the data just disappear w/o any notification. This situation is better though because if the query is not able to catch-up then it can be restarted with bigger maxFileAge and cluster, allowing the query to catch up properly.

@HeartSaVioR
Copy link
Contributor Author

I can even tolerate the fact maxFileAge is originated from path's latest timestamp. If we don't believe the node's wall time (I suspect other logic works well in such case though) then yes it might be the source of the truth across nodes.

I feel all the confusions come from the behavior of latestFirst. Yes we would like to read from latest in some case if we're only interested with latest files. But then should we really open the possibility to trace back older files? Would we just simply do the thing we do with Kafka's "latest" option, which only affects the first batch and no-op in further batches?

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jun 14, 2020

Test build #123985 has finished for PR 28422 at commit 06ee53d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jun 14, 2020

Test build #123998 has finished for PR 28422 at commit 06ee53d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125975 has finished for PR 28422 at commit 06ee53d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

FYI, I've initiated the discussion around this in dev@ mailing list, how to deal with "latestFirst" option and metadata growing.
https://lists.apache.org/thread.html/r08e3a8d7df74354b38d19ffdebe1afe7fa73c2f611f0a812a867dffb%40%3Cdev.spark.apache.org%3E

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Aug 18, 2020

Test build #127531 has finished for PR 28422 at commit 06ee53d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Aug 18, 2020

Test build #127547 has finished for PR 28422 at commit 06ee53d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Just rebased. I haven't reached consensus around the discussion in dev mailing list though. I'll bump it again.

@SparkQA
Copy link

SparkQA commented Aug 25, 2020

Test build #127882 has finished for PR 28422 at commit b7d94f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 15, 2020

Test build #128682 has finished for PR 28422 at commit b7d94f7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 15, 2020

Test build #128702 has finished for PR 28422 at commit b7d94f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 25, 2020
@github-actions github-actions bot closed this Dec 26, 2020
@HeartSaVioR HeartSaVioR removed the Stale label Dec 26, 2020
@HeartSaVioR HeartSaVioR reopened this Dec 26, 2020
@SparkQA
Copy link

SparkQA commented Dec 26, 2020

Test build #133389 has finished for PR 28422 at commit b7d94f7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 28, 2020

Test build #133435 has finished for PR 28422 at commit b7d94f7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 8, 2021

Test build #135034 has finished for PR 28422 at commit b7d94f7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@darxriggs
Copy link

The PR for FileStreamSink was already merged quite some time ago.
So I am wondering if the goal for FileStreamSource here is to also find some consensus and integrate it or is it abandoned?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants