-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files #28363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This PR is just a revival of #24128 as the problem definition and the solution can be still applied. |
|
Test build #121886 has finished for PR 28363 at commit
|
|
I've read the discussion on #24128 and I agree that TTL would be the way. I like the idea for instance how Kafka handles the situation (even if retention generates some confusion on Spark user side when retention deleted data but Spark wanted to process it and not found). I think first the metadata must be compacted (remove file entries where TTL expired) but what I miss is to delete files. There are 2 type of files without this patch:
With this change this will be extended with a third one:
If we want to do full TTL then a separate GC would be good to delete files matching 2nd and 3rd bullet points (of course only after whne from metadata removed). What I see as a potential problem is that FS timestamp may be different from local time (not yet checked how Hadoop handles time). |
Yeah I didn't deal with this because there may be some reader queries which still read from old version of metadata which may contain excluded files. (Batch query would read all available files so there's still a chance for race condition.)
While I'm not sure it's a real problem (as we rely on the last modified time while reading files), I eliminated the case via adding "commit time" on entry and applying retention based on commit time. So I guess the thing is no longer valid. |
That's a valid consideration. Cleaning junk files not necessarily must belong to this feature. This can be put behind another flag. I'm thinking about this for long time (though the initial idea was to delete only the generated junk). Of course this must be done in a separate thread because directory listing can be pathologically slow in some cases. This could reduce the storage cost to users significantly in an automatic way...
I've played with HDFS and read the docs of the other filesystems and haven't found any glithes. |
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round
| <code>path</code>: path to the output directory, must be specified.<br/> | ||
| <code>outputRetentionMs</code>: time to live (TTL) for output files. Output files which batches were | ||
| committed older than TTL will be eventually excluded in metadata log. This means reader queries which read | ||
| the sink's output directory may not process them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would mention the default, like other params.
| <td> | ||
| <code>path</code>: path to the output directory, must be specified. | ||
| <code>path</code>: path to the output directory, must be specified.<br/> | ||
| <code>outputRetentionMs</code>: time to live (TTL) for output files. Output files which batches were |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 time fields in SinkFileStatus, modificationTime and commitTime. Maybe worth to mention the exact field which is used for comparison to make it 100% clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we avoid exposing the implementation details in docs. e.g. If I'm not mistaken, there's no explanation of the format of the metadata, hence it would be confusing which field is being used because end users even don't know what they are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right and we're not explaining metadata details to users. What users would like to understand though what the reference TTL is bound to. As half developer and half user I was a bit confused which field SinkFileStatus we would like to refer to. Since we've removed the (in my view) duplicate field I'm fine here.
| blockSize: Long, | ||
| action: String) { | ||
| action: String, | ||
| commitTime: Long) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're supporting this feature in append mode only isn't it possible to use modificationTime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the introduce of "commit time" came from the concern about uncertain of HDFS file timestamp in previous PR. If we are sure about the modification time, no need to use "commit time".
|
Test build #122955 has finished for PR 28363 at commit
|
|
Test build #122961 has finished for PR 28363 at commit
|
|
retest this, please |
|
Test build #122975 has finished for PR 28363 at commit
|
| <td> | ||
| <code>path</code>: path to the output directory, must be specified. | ||
| <code>path</code>: path to the output directory, must be specified.<br/> | ||
| <code>outputRetentionMs</code>: time to live (TTL) for output files. Output files which batches were |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right and we're not explaining metadata details to users. What users would like to understand though what the reference TTL is bound to. As half developer and half user I was a bit confused which field SinkFileStatus we would like to refer to. Since we've removed the (in my view) duplicate field I'm fine here.
| val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet | ||
| val curTime = System.currentTimeMillis() | ||
| val deletedFiles = logs.filter { log => | ||
| log.action == FileStreamSinkLog.DELETE_ACTION || (curTime - log.modificationTime) > ttlMs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some debug information why a certain entry deleted from the log? I was just thinking that a user comes and tells that certain files have been deleted and they mustn't or the opposite. Without debug information hard to tell anything to such issue. If you have an idea w/o debug that's also fine to me...
| private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = | ||
| withFileStreamSinkLog(None, f) | ||
|
|
||
| private def withFileStreamSinkLog(ttl: Option[Long], f: FileStreamSinkLog => Unit): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the 2 params order flipped with a default then we don't need a method override.
| private def newFakeSinkFileStatus( | ||
| path: String, | ||
| action: String, | ||
| modificationTime: Long): SinkFileStatus = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a default here?
|
Test build #123108 has finished for PR 28363 at commit
|
|
Looks good apart from the style issue. |
|
Given it didn't fail on my local, I'll try to rebase to see how to code has been affected by automatic merge. |
06be0e4 to
9383fcb
Compare
|
OK just fixed it. Let's see the build result. |
|
Test build #123116 has finished for PR 28363 at commit
|
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
retest this, please |
|
Test build #123986 has finished for PR 28363 at commit
|
|
retest this, please |
|
Test build #123999 has finished for PR 28363 at commit
|
9383fcb to
b648156
Compare
|
Test build #124953 has finished for PR 28363 at commit
|
|
retest this, please |
|
retest this, please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #130287 has finished for PR 28363 at commit
|
|
retest this, please |
|
cc. @tdas @zsxwing @gaborgsomogyi @viirya @xuanyuanking Just a final reminder. I'll merge this in early next week if there's no further comments, according to the feedback from dev@ mailing list. |
|
Test build #131890 has finished for PR 28363 at commit
|
|
retest this, please |
|
Test build #131898 has finished for PR 28363 at commit
|
|
retest this, please |
|
Test build #131903 has finished for PR 28363 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments. An ideal solution would be building a file compaction for file sink. But that would be a large effort. This option at least provides a workaround for people hitting the large metadata issue, so I'm +1 for adding this.
| sparkSession.sessionState.conf) | ||
| private val fileLog = | ||
| new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString) | ||
| private val outputTimeToLive = options.get("outputRetentionMs").map(_.toLong) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Utils.timeStringAsMs to parse this? Users likely set this to multiple days and asking them to calculate milliseconds is not user friendly.
Nit: regarding the option name, can we call it retention? It's obvious that the query is outputting files, so output sounds redundant to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: any reason to use a different name outputTimeToLive? Using the same name as the option would help other people read codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it would be great to output an info log for this value if it's set. It might be useful when debugging data issues caused by the retention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestions! I'll apply all of inputs.
| private val ttlMs = outputTimeToLiveMs.getOrElse(Long.MaxValue) | ||
|
|
||
| override def shouldRetain(log: SinkFileStatus): Boolean = { | ||
| val curTime = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to avoid calling System.currentTimeMillis() if the option is not set, considering we need to call this method once (a JNI call) for each log entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we could change the method signature a bit to provide "context" which would be same for the same compact batch. We changed the method shouldRetain in SPARK-30462 which is not yet released (3.1.0), hence making change shouldn't make backward compatibility for this change. (We decided to break the interface, but we break only once for these changes.)
Once we only call System.currentTimeMillis() once per compact batch the overhead should be ignorable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah also CompactibleFileStreamLog is not a public API (its package is org.apache.spark.sql.execution.streaming), so it shouldn't matter much.
|
Btw, I also concern (probably more concerning) on metadata log growing in FileStreamSource.(#28422) The format of each entry in FileStreamSource is much smaller than FileStreamSink's one so it's more resilient to the memory issue, but while there're 3rd party alternatives on FileStreamSink (as we all know), there're no alternative on FileStreamSource to read from files. That said users are forced to introduce external process to have less files in order to give less pressure to the metadata log in FileStreamSource, or use other data sources for the input of SS. Unlike FileStreamSink, it's not that simple to remove log entry, just because we support I've also raised the discussion thread but didn't get any committers' voice. Though I see some voices want to see FileStreamSource work just like Kafka stream source, which says, replace WDYT? |
| * to change the behavior. | ||
| */ | ||
| def shouldRetain(log: T): Boolean = true | ||
| def shouldRetain(log: T, context: Map[String, Any]): Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel adding context is overkill. How about passing the now timestamp into this instead? We can add the retain context like this in future if that's necessary:
def shouldRetain(log: T, currentTime: Long): Boolean = true
def shouldRetain(log: T, context: Map[String, Any]): Boolean = {
shouldRetain(log, context.get("currentTime").asInstanceOf[Long])
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the change was made to cover possible general case, because I got review comment when making change of shouldRetain that "please avoid breaking backward compatibility." I disagreed as it's a private API based on the package, but took considerable time and effort to persuade.
If we agree that this is not a public API and shouldn't bother with backward compatibility with this, I definitely agree this is pretty much overkill as of now. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this is not a public API. It's okay to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Will make a quick fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Please take a look again. Thanks!
|
Test build #131937 has finished for PR 28363 at commit
|
|
Test build #131938 has finished for PR 28363 at commit
|
|
Test build #131940 has finished for PR 28363 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one nit.
| val logs = | ||
| getAllValidBatches(latestId, compactInterval).flatMap { id => | ||
| filterInBatch(id)(shouldRetain).getOrElse { | ||
| val curTime = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can move this out of the flatMap function.
Could you update this in the PR description? We are using the file modification time now. |
|
Thanks for the detailed review. Just applied both. |
|
LGTM. Thanks for your patience. |
|
Thanks all for the thoughtful reviews! Merging to master. |
What changes were proposed in this pull request?
This patch proposes to provide a new option to specify time-to-live (TTL) for output file entries in FileStreamSink. TTL is defined via current timestamp - the last modified time for the file.
This patch will filter out outdated output files in metadata while compacting batches (other batches don't have functionality to clean entries), which helps metadata to not grow linearly, as well as filtered out files will be "eventually" no longer seen in reader queries which leverage File(Stream)Source.
Why are the changes needed?
The metadata log greatly helps to easily achieve exactly-once but given the output path is open to arbitrary readers, there's no way to compact the metadata log, which ends up growing the metadata file as query runs for long time, especially for compacted batch.
Lots of end users have been reporting the issue: see comments in SPARK-24295 and SPARK-29995, and SPARK-30462.
(There're some reports from end users which include their workarounds: SPARK-24295)
Does this PR introduce any user-facing change?
No, as the configuration is new and by default it is not applied.
How was this patch tested?
New UT.