Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Feb 21, 2019

What changes were proposed in this pull request?

This patch proposes making purge in CompactibleFileStreamLog to throw UnsupportedOperationException to prevent purging necessary batch files, as well as adding javadoc to document its behavior. Actually it would only break when latest compaction batch is requested to be purged, but caller wouldn't be aware of this so safer to just prevent it.

How was this patch tested?

Added UT.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 21, 2019

While I think it is safest way to only let CompactibleFileStreamLog to maintain logs, we have alternative options here:

  1. Only purge batches which are safe to delete and don't throw exception

Suppose it has batch 0, 1, 2, 3, 4 which batch 2 is compacted. If 4 is given as parameter, purge will try to remove 0, 1, 2, 3 which removing batch 2 (latest compaction batch) would break the internal state. Instead of this, this method could be overridden to remove only batch 0 and 1 and silently ignore removing 2 and 3.
(I'm a bit hesitant on this cause sometimes it might be against caller's intention.)

  1. Throw exception (maybe IllegalArgumentException, or IllegalStateException?) only when batches to purge contain the latest compaction batch

This would selectively throw exception - when it can break internal state of CompactibleFileStreamLog.

Please let me know if alternative would make more sense. Thanks in advance!

@SparkQA
Copy link

SparkQA commented Feb 21, 2019

Test build #102575 has finished for PR 23850 at commit 9c5fa64.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Feb 21, 2019

Seems like this PR is facing the same jenkins issues like mine and doesn't start new build.
BTW, just started to review this...

@gaborgsomogyi
Copy link
Contributor

As long as the functionality is not needed I'm fine with the UnsupportedOperationException.
I've double checked and not used anywhere.

As a potential user of this or any kind of function I expect to do what I want without any internal thinking. Because of this option 1 and 2 are having side effects from my perspective.

If it would be really required I think the compacted file can be deserialized => remove the batches => serialized again with proper content (maybe you meant the same by maintain logs).

@HeartSaVioR
Copy link
Contributor Author

Thanks @gaborgsomogyi for providing your opinion. Same here, and that's why I took this approach and left such approaches as alternatives.

If it would be really required I think the compacted file can be deserialized => remove the batches => serialized again with proper content (maybe you meant the same by maintain logs).

What I meant by maintain logs is compaction what CompactibleFileStreamLog is doing now. IMHO instrumenting latest compacted batch doesn't seem to be an alternative, because we can't find which entries were added before thresholdBatchId once batches were compacted.

@HeartSaVioR
Copy link
Contributor Author

Btw, purgeAfter would be safe since it'll leave compaction batch which batch id is equals or less than thresholdBatchId, or otherwise remaining batches don't need compaction batch (batches before first compaction).

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Feb 21, 2019

we can't find which entries were added before thresholdBatchId once batches were compacted

Yeah, that's true. Batch ID is not compacted into the file.

@SparkQA
Copy link

SparkQA commented Feb 21, 2019

Test build #102587 has finished for PR 23850 at commit a51564f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@HeartSaVioR
Copy link
Contributor Author

cc. @zsxwing @brkyvz @jerryshao since they've authored parts of the file.

@HeartSaVioR
Copy link
Contributor Author

Ping.

@HeartSaVioR
Copy link
Contributor Author

Also cc-ing @tdas and @jose-torres since CompactibleFileStreamLog is only used for SS.

@HeartSaVioR
Copy link
Contributor Author

Kindly reminder.

@HeartSaVioR
Copy link
Contributor Author

Ping again.

@HeartSaVioR
Copy link
Contributor Author

Ping again, as Spark+AI Summit 2019 in SF is end.

@felixcheung
Copy link
Member

where are we on this? @tdas and @jose-torres

@SparkQA
Copy link

SparkQA commented Jun 2, 2019

Test build #106067 has finished for PR 23850 at commit a51564f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

When this is revisited, please consider other PRs in mine as well: https://github.com/apache/spark/pulls/HeartSaVioR
And also consider other PRs in SS area, too: https://spark-prs.appspot.com/open-prs#streaming (title starts with [SS])

@dongjoon-hyun
Copy link
Member

Retest this please.

* of given parameter, and let CompactibleFileStreamLog handles purging by itself.
*/
override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException(
s"'purge' might break internal state of CompactibleFileStreamLog hence not supported")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • nit. s" -> "?
  • CompactibleFileStreamLog hence not supported seems to need some revision.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @HeartSaVioR .
This looks like a mismatch between compactInterval and the parameter of purge.
I have a question. If CompactibleFileStreamLog calls purge only when isCompactionBatch returns true, does purge fail in that case?

@SparkQA
Copy link

SparkQA commented Jun 9, 2019

Test build #106320 has finished for PR 23850 at commit a51564f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

@dongjoon-hyun
Thanks for taking a look at the patch.

If CompactibleFileStreamLog calls purge only when isCompactionBatch returns true, does purge fail in that case?

Let me clear the issue - the condition which breaks internal state is, batches to purge contain the latest compaction batch, as further batches will refer the compaction batch.

I've described alternatives as well, so please take a look at previous comment: #23850 (comment)

Btw, even we could purge batches earlier than latest compaction batch, CompactibleFileStreamLog also does the clean up in deleteExpiredLog so it is actually not needed. (I'd like to let CompactibleFileStreamLog be responsible to take care about logs by itself.)

@SparkQA
Copy link

SparkQA commented Jun 12, 2019

Test build #106412 has finished for PR 23850 at commit ba62794.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2019

Test build #106422 has finished for PR 23850 at commit f8adab3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR makes the abstract class CompactibleFileStreamLog more robust by preventing error situations. The derived classes like FileStreamSinkLog should override this method with a safe and correct implementation.

@tdas , @jose-torres , @cloud-fan , @gatorsmile , @HyukjinKwon . This PR looks reasonable to me. I'm supporting @HeartSaVioR 's suggestion. Please give us your comments (for better alternatives or any regressions)

If there is no more comments, I'll proceed to merge this improvement to master for Apache Spark 3.0.0 in a few days.

@dongjoon-hyun
Copy link
Member

Merged to master. Thank you, @HeartSaVioR and @gaborgsomogyi .
cc @tdas , @jose-torres , @zsxwing

emanuelebardelli pushed a commit to emanuelebardelli/spark that referenced this pull request Jun 15, 2019
…pactibleFileStreamLog

## What changes were proposed in this pull request?

This patch proposes making `purge` in `CompactibleFileStreamLog` to throw `UnsupportedOperationException` to prevent purging necessary batch files, as well as adding javadoc to document its behavior. Actually it would only break when latest compaction batch is requested to be purged, but caller wouldn't be aware of this so safer to just prevent it.

## How was this patch tested?

Added UT.

Closes apache#23850 from HeartSaVioR/SPARK-26949.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-26949 branch June 15, 2019 14:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants