-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30946][SS] Serde entry via DataInputStream/DataOutputStream with LZ4 compression on FileStream(Source/Sink)Log #27694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Honestly I have been thinking about larger changes, like:
but would like to have priorities on the perspective of (less changes & bigger impact), and make changes incrementally. This patch brings the least changes but great impact on performance. Above items are orthogonal to this improvement so they can be addressed on demand later. |
|
Test build #118924 has finished for PR 27694 at commit
|
|
retest this, please |
|
Test build #121228 has finished for PR 27694 at commit
|
|
Suppose there is a streaming job pipeline, and these streaming job comes from different end-users or department, if middle end-user upgrade their spark and use |
|
@uncleGen It seems to be a clear case that end users don't want to upgrade version smoothly (implicitly), but would it be some case where they start to write the output and want to make it be compatible with Spark 2.x? If we want to have latter then it's worth to have an option which specifies "version" of the format. (The default value would be ideally same with the existing one in file, or highest known version.) If we can just live with former then a flag option would be fine. |
|
I've just picked this up but seems like it has a conflict + linter job failed. Could you resolve them? |
|
Test build #122403 has finished for PR 27694 at commit
|
|
Test build #123150 has finished for PR 27694 at commit
|
|
retest this please. |
|
Test build #123156 has finished for PR 27694 at commit
|
|
One lesson I learned from the past is UnsafeRow is not designed to be persisted across Spark versions. I'd avoid to introduce another place that persists data in the UnsafeRow format. Do you know what are the performance numbers if we just compress the text files? |
This sounds like a blocker for SS, as we leverage it to store state which should be used across Spark versions and shouldn't be corrupted/lost as it's time-consuming or sometimes simply not possible to replay from the scratch to construct the same due to retention policy. Do you have references on following the discussion/gotcha for that issue? I think we should really fix it for storing state - probably state must not be stored via UnsafeRow format then.
I haven't experimented but we can easily imagine the file size would be smaller whereas processing time may be affected both positively (less to read from remote storage) and negatively (still have to serialize/deserialize with JSON + additional cost for compression/decompression). Here the point of the PR is that we know the schema (and even versioning) of the file in prior, hence we don't (and shouldn't) pay huge cost to make the file be backward-compatible by itself. We don't do versioning for data structures being used by event log so we are paying huge cost to make it backward/forward compatible. If we are not sure about unsafe row format for storing then we may be able to just try with traditional approaches. |
|
I've spent some time to experiment more approaches. This is the experiment branch: https://github.com/HeartSaVioR/spark/tree/SPARK-30946-experiments
I've also implemented simple apps to 1) prepare metadata (so that we can experiment on the specific batch) and 2) run simple test with various versions: HeartSaVioR/spark-delegation-token-experiment@bea7680 The numbers are recorded below: https://docs.google.com/spreadsheets/d/1D5P103F_sKOjkDpNr9PaCC8Ehk4Y4dRtH3oEdytM4_c/edit?usp=sharing
The number represents that applying compression on existing format doesn't help reducing the time, while the size is reduced similar with other alternatives. Other alternatives directly integrated to the data structure greatly reduce the time, say, 10 times faster. The size of compact files are similar across alternatives. |
|
Hi all, are these numbers enough to persuade? Given we concern about "compatibility" I think the version 4 is ideal, which "Java" guarantees compatibility across architectures. I'll update the patch to use the version 4 as a new version (version 2, of course) as of now. |
|
Test build #123555 has finished for PR 27694 at commit
|
|
Test build #123559 has finished for PR 27694 at commit
|
|
Failed UTs are unrelated. |
|
retest this, please |
|
Test build #123565 has finished for PR 27694 at commit
|
|
Test build #123564 has finished for PR 27694 at commit
|
|
@zsxwing I think considering all of these would take me to redesign metadata log as well as file stream source itself, which wasn't a goal actually. As I commented previously, building a holistic solution was not a goal, because I already indicated it should take considerable time, and someone might claim that it's reinventing the wheel (I know these functionalities are extracted from alternatives). That said, shall we do the following?
If it's not desired to go with 2, I'd ask to make the file source/sink metadata log class be pluggable at least (I'll craft a PR soon if that's the way to go), so that someone can move forward with restricting the use case instead of thinking too general. WDYT? |
|
(IMHO it might be still good chance to leverage this PR to construct a good way for versioning properly - so that version 2 can be used as an interim with best practice on versioning, and we get version 3 with such major features included without headache on versioning.) |
|
Probably we can go with supporting relative path “as well” in this metadata version which would heavily reduce the size even without compression. I’ll experiment this approach as well. |
Never mind. I simply missed that the metadata log should be able to read from older Spark versions which might have no idea to handle relative path. This change also needs to bump up the version. |
|
retest this, please |
|
Test build #125726 has finished for PR 27694 at commit
|
|
retest this, please |
|
@zsxwing Would you mind if I ask your opinion on #27694 (comment) ? |
|
Test build #125737 has finished for PR 27694 at commit
|
|
retest this, please |
|
Test build #125748 has finished for PR 27694 at commit
|
|
Test build #125876 has finished for PR 27694 at commit
|
|
retest this, please |
|
Test build #125900 has finished for PR 27694 at commit
|
…th LZ4 compression on FileStream(Source/Sink)Log
|
Test build #125934 has finished for PR 27694 at commit
|
|
retest this, please |
|
Test build #125957 has finished for PR 27694 at commit
|
|
retest this, please |
|
Test build #127534 has finished for PR 27694 at commit
|
|
retest this, please |
|
Test build #127549 has finished for PR 27694 at commit
|
|
retest this, please |
|
Test build #127573 has finished for PR 27694 at commit
|
|
Closing as I don't see any support on this and I'm now in favor of one of alternatives. |
What changes were proposed in this pull request?
This patch uses DataInputStream / DataOutputStream to serialize/deserialize entry on FileStream(Source/Sink)Log, as well as applying LZ4 compression on serde.
The ground idea is that both FileEntry and SinkFileStatus haven't been changed for 3 years but we have been simply taking JSON serde to prepare with possible change and spending bunch of overhead on JSON format. CompactibleFileStreamLog has the metadata version information but it has been just version 1.
This patch introduces metadata version 2 of CompactibleFileStreamLog as leveraging DataInputStream / DataOutputStream & LZ4 compression. While we introduce version 2 for CompactibleFileStreamLog, CompactibleFileStreamLog is still compatible with version 1 for both serialization and deserialization, so we can read from version 2 and write to version 2 in normal, and also we can read from version 1 and write to version 2 for smooth migration.
There's some exceptional case on file stream sink, which outputs are possibly read by older version of Spark. To support such case, a new configuration (
spark.sql.streaming.fileSink.log.writeMetadataVersion) is introduced to specify the write version of the metadata on file stream sink. If specified, further batch log files will be written via the write version. This doesn't rewrite the existing batch log files, so if the query is started from existing metadata, the query should be run at least for next compact batch so that the metadata log of compact batch can be written via the write version.Why are the changes needed?
Multiple JIRA issues have been filed to report that huge metadata logs make their queries very slow. While this patch won't make their metadata log stop growing, this patch may heavily reduce down their metadata log and time to process compaction.
Please find the numbers in the section "How was this patch tested?".
As this also reduces the latency to read all files from file stream sink metadata, it also helps to speed up the query for both batch and streaming if the input path is pointing to the output of file stream sink.
Does this PR introduce any user-facing change?
Further metadata log files will be written by configured write version for file stream sink. For other compatible metadata logs, further metadata log files will be written by version 2.
How was this patch tested?
Note that the version 4 in the experiment is now taken as version 2 in this patch.
This is the experiment branch: https://github.com/HeartSaVioR/spark/tree/SPARK-30946-experiments
HeartSaVioR@406670a
HeartSaVioR@7c66516
I've also implemented simple apps to 1) prepare metadata (so that we can experiment on the specific batch) and 2) run simple test with various versions:
HeartSaVioR/spark-delegation-token-experiment@bea7680
The numbers are recorded below:
https://docs.google.com/spreadsheets/d/1D5P103F_sKOjkDpNr9PaCC8Ehk4Y4dRtH3oEdytM4_c/edit?usp=sharing