-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29434][Core] Improve the MapStatuses Serialization Performance #26085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Outdated
Show resolved
Hide resolved
|
Thank you for pinging me, @dbtsai . I'll take a look tomorrow. |
advancedxy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About current approach, what about that chunks are big enough that compressing directly is faster than two-step approach?
should we add an adaptive approach to choose the optimal one?
| out.write(DIRECT) | ||
| val objOut = new ObjectOutputStream(new GZIPOutputStream(out)) | ||
| import scala.language.reflectiveCalls | ||
| val out = new ByteArrayOutputStream(4096) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is there any specific reason to choose 4096 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just choose something bigger. Any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which apache's ByteArrayOutputStream implementation, I don't need to put a larger initial value anymore, and that implementation doesn't have to allocate the new memory buffer, and then copy over.
| import java.util.zip.{GZIPInputStream, GZIPOutputStream} | ||
|
|
||
| import com.github.luben.zstd.ZstdInputStream | ||
| import com.github.luben.zstd.ZstdOutputStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall we put third party imports under java and scala.
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Outdated
Show resolved
Hide resolved
|
I haven't looked at the code yet, can you clarify what ops/ms is measuring here?
Just to verify, this holds true for small jobs, where you have 10 outputs or 100 outputs? Did you try with say 2000 or 5000 blocks? The performance numbers for zstd ops/ms for the 2 step dropped a lot between 5 and 500, whereas the 1 step didn't drop nearly as much, wondering if 1 step becomes faster at some point. |
|
@tgravescs it's record / ms. When the num of blocks are large, two steps and one step will have similar result, but two step will never be slower. I switched to use In Apache's /* In contrast
* to the original it doesn't reallocate the whole memory block but allocates
* additional buffers. This way no buffers need to be garbage collected and
* the contents don't have to be copied to the new buffer.
*/ |
|
Test build #111936 has finished for PR 26085 at commit
|
|
Test build #112193 has finished for PR 26085 at commit
|
core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala
Outdated
Show resolved
Hide resolved
|
Test build #112239 has finished for PR 26085 at commit
|
|
Hi, @dbtsai . I created a PR to your branch. Could you review and merge the updated benchmark result? |
|
@dongjoon-hyun thanks. Merged into my branch. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. It's a nice improvement, @dbtsai . Thanks!
|
Test build #112299 has finished for PR 26085 at commit
|
|
Test build #112300 has finished for PR 26085 at commit
|
|
Test build #112301 has finished for PR 26085 at commit
|
|
Test build #112303 has finished for PR 26085 at commit
|
|
Thank you, @dbtsai , @tgravescs , @viirya , @MaxGekk , @advancedxy . |
|
sorry my late comment here, didn't have time last week to follow up, just curious if you tried like lz4 (or other) instead of zstd? I would expect zstd to be a better choice but just wondering if you tried and what was performance diff or if it made sense to make this configurable in the future. |
|
That sounds like a reasonable idea. Could you make a JIRA and a PR for the configuration, @tgravescs ? I can help you the benchmark. Thanks! |
|
So does that mean you know lz4 (or others) weren't tried? I was curious if it was tried already before spending time on it. |
|
No. I don't think it's tried. To make it sure, let's ping @dbtsai . :) |
|
@tgravescs let me try lz4 quickly, and will post the result. Thanks. |
|
@tgravescs The following the result ran on my desktop. LZ4 is 5x faster but creates 1.6x bigger data. Wondering should we trade the serialization time with larger data?
Java HotSpot(TM) 64-Bit Server VM 1.8.0_161-b12 on Mac OS X 10.14.2
Intel(R) Xeon(R) CPU E5-1650 v2 @ 3.50GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 3340 3355 21 0.1 16700.1 1.0X
Deserialization 650 660 14 0.3 3248.6 5.1X
Compressed Serialized MapStatus sizes: 123 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
Running benchmark: 200000 MapOutputs, 1000 blocks w/o broadcast
Running case: Serialization
Stopped after 3 iterations, 2109 ms
Running case: Deserialization
Stopped after 5 iterations, 2424 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_161-b12 on Mac OS X 10.14.2
Intel(R) Xeon(R) CPU E5-1650 v2 @ 3.50GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 677 703 32 0.3 3383.6 1.0X
Deserialization 466 485 27 0.4 2331.1 1.5X
Compressed Serialized MapStatus sizes: 194 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
Java HotSpot(TM) 64-Bit Server VM 1.8.0_161-b12 on Mac OS X 10.14.2
Intel(R) Xeon(R) CPU E5-1650 v2 @ 3.50GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 2199 2202 4 0.1 10994.6 1.0X
Deserialization 690 720 46 0.3 3450.6 3.2X
Compressed Serialized MapStatus sizes: 182 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes |
|
In this PR, ZSTD reduces the size like the following. If we choose |
|
thanks for running the tests @dbtsai. Its actually faster then I expected. yes it needs to distribute it and this is obviously using memory on the driver side. Normally if the map status is of any size it will end up being broadcast to the hosts with the message going over the wire just indicating its a broadcast. This to me isn't much different then any other broadcast thing which normally has the spark.io.compression.codec config applied for what compression to use. You may actually want it faster if you have ample network. I assume originally before we were broadcasting it, the size was definitely an issue because if it went over the max message size it would just fail. It also took a long time for large status' and slower networks. That was also before we had highlycompressed status's and such as well. This is definitely an improvement over what we had. Perhaps we just wait and see if its an issue or if someone wants to use something other then zstd and at that point we an make it configurable if needed. I hate to add more configs if not really needed. |
|
Got it. Thanks, @tgravescs . |
|
Agree, this PR already speeds up the serialization a bit, and unblocks our use-case. I was initially thinking to add another configuration, but also feel we have too many tuning parameters in Spark now. Let's see what other people think, and if they need to make it faster, we can consider the option to make it configurable. Thanks. |
| } | ||
| val outArr = { | ||
| compressedOut.reset() | ||
| val zos = new ZstdOutputStream(compressedOut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @dbtsai , I am back-porting this into our internal repo. Looks like this compression is unnecessary since arr is already compressed by zstd. Compress again with already compressed byte[] is a waste of cpu time. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actually value of the data (which is already compressed) will not be in the serialized form of out.writeTo(zos) as it's transient. Here, we are just serializing the reference to the actual data, and the actual data will be broadcast through TorrentBroadcast. See the next log, "Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length for your real data. The broadcast one is very small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your clarification. It's indeed not including the compressed data.
Instead of using GZIP for compressing the serialized `MapStatuses`, ZStd provides better compression rate and faster compression time. The original approach is serializing and writing data directly into `GZIPOutputStream` as one step; however, the compression time is faster if a bigger chuck of the data is processed by the codec at once. As a result, in this PR, the serialized data is written into an uncompressed byte array first, and then the data is compressed. For smaller `MapStatues`, we find it's 2x faster. Here is the benchmark result. 1. ZStd two steps in this PR: 0.402 ops/ms, 89,066 bytes 2. ZStd one step as the original approach: 0.370 ops/ms, 89,069 bytes 3. GZip: 0.092 ops/ms, 217,345 bytes 1. ZStd two steps in this PR: 0.9 ops/ms, 75,449 bytes 2. ZStd one step as the original approach: 0.38 ops/ms, 75,452 bytes 3. GZip: 0.21 ops/ms, 160,094 bytes Decrease the time for serializing the `MapStatuses` in large scale job. No. Existing tests. Closes apache#26085 from dbtsai/mapStatus. Lead-authored-by: DB Tsai <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Ref: LIHADOOP-56788
What changes were proposed in this pull request?
Instead of using GZIP for compressing the serialized
MapStatuses, ZStd provides better compression rate and faster compression time.The original approach is serializing and writing data directly into
GZIPOutputStreamas one step; however, the compression time is faster if a bigger chuck of the data is processed by the codec at once. As a result, in this PR, the serialized data is written into an uncompressed byte array first, and then the data is compressed. For smallerMapStatues, we find it's 2x faster.Here is the benchmark result.
20k map outputs, and each has 500 blocks
20k map outputs, and each has 5 blocks
Why are the changes needed?
Decrease the time for serializing the
MapStatusesin large scale job.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.