-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-1065] [PySpark] improve supporting for large broadcast #1912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()). Add an option to keep object in driver (it's False by default) to save memory in driver.
|
QA tests have started for PR 1912. This patch merges cleanly. |
|
QA results for PR 1912: |
|
failed tests were not related to this PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this fit in the line above?
|
test this please |
|
QA tests have started for PR 1912. This patch merges cleanly. |
|
I was talking to Jenkins when I said "test this please", but thanks @davies for adding tests too. |
|
LoL, I realized this just after pushing the commit :) |
|
QA results for PR 1912: |
|
QA tests have started for PR 1912. This patch merges cleanly. |
|
QA results for PR 1912: |
|
@davies I am about to test it again with CompressedSerializer. Am I right that I don't need to change anything in my project, but just rebuild Spark? |
|
@frol , Yes, thanks again! |
|
QA tests have started for PR 1912. This patch merges cleanly. |
python/pyspark/broadcast.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call here; it was a bad idea to expose these internals in user-facing module doctests.
|
This looks good to me and I'm really glad to read the JIRA comments saying how it sped things up. I left one minor usability-related comment, but otherwise this looks great. |
|
QA results for PR 1912: |
add better message when try to access Broadcast.value in driver.
|
QA tests have started for PR 1912. This patch merges cleanly. |
|
QA results for PR 1912: |
|
@davies Compression improved things, but my tasks have heavy computations inside, so it saved only 10 seconds on a 4.5-minute task and also about 10-20 seconds on a 18-minute task. In both cases I have only 340 partitions. I'm still investigating where the second copy of my fat object is, because I can easily notice that in comparison with my local tests. And also if I cut my big object twice, the memory consumption decreases as it would be cut 4 times on local run. |
|
@frol , The big win of compression maybe save the memory in JVM. It's also a win if it does not increase the runtime. If the future, we could try LZ4, it may help a little bit about runtime, but will not contribute much in your case. What is the memory you are talking about? in Python driver, JVM, or Python worker? |
|
@davies I'm talking about memory in Python workers and it is my issue. (I figured out that my local test had a mistake and after I fix it local test and Spark Python workers consume the same amount of memory). I'm sorry to confuse you. |
|
@frol After fixing your local test, are you still noticing any broadcast performance issues? If you still see any odd behavior, could you post a small script or set of pyspark shell commands so we can test it out? |
|
@JoshRosen No, I'm not noticing any broadcast performance issues now. PySpark works like a charm again. Thank you! |
python/pyspark/broadcast.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: should be spelled "accessible." Also, maybe the error message could be a little clearer about how broadcast variables are created and why call failed. I'm thinking of something like "please call sc.broadcast() with keep=True to make values accessible in the driver".
|
It occurs to me: what if we had .value retrieve and depickle the value from the JVM? Also, won't we still experience memory leaks in the JVM if we iteratively create broadcast variables, since we will never clean up those pickled values? One approach is to have .value() depickle the JVM value (so we're not changing the user-facing API) and add a Python equivalent of Broadcast.destroy() for performing permanent cleanup of a broadcast's resources. What do you think of this approach? |
add Broadcast.unpersist()
|
QA tests have started for PR 1912 at commit
|
|
I had add Broadcast.unpersist(blocking=False). Because we have an copy in disks, so read it from there when user want to access it driver, then we can keep the SparkContext.broadcast() unchanged. |
|
Hmm, looks like this was affected by the Jenkins timeouts last night. Jenkins, retest this please. |
|
Jenkins, retest this please. |
|
QA tests have started for PR 1912 at commit
|
|
QA tests have finished for PR 1912 at commit
|
|
Jenkins, retest this please. |
|
QA tests have started for PR 1912 at commit
|
|
QA tests have finished for PR 1912 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a docstring? It's fine to just copy it over from the Scala equivalent. In this case:
/**
* Delete cached copies of this broadcast on the executors. If the broadcast is used after
* this is called, it will need to be re-sent to each executor.
* @param blocking Whether to block until unpersisting has completed
*/|
I guess we don't necessarily want to expose |
|
Actually, I'm just going to merge this now and I'll add the docstring as part of a subsequent documentation-improvement PR (I also want to edit some Scala / Java docs, too). |
|
I've merged this into |
Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()). Add an option to keep object in driver (it's False by default) to save memory in driver. Author: Davies Liu <[email protected]> Closes #1912 from davies/broadcast and squashes the following commits: e06df4a [Davies Liu] load broadcast from disk in driver automatically db3f232 [Davies Liu] fix serialization of accumulator 631a827 [Davies Liu] Merge branch 'master' into broadcast c7baa8c [Davies Liu] compress serrialized broadcast and command 9a7161f [Davies Liu] fix doc tests e93cf4b [Davies Liu] address comments: add test 6226189 [Davies Liu] improve large broadcast (cherry picked from commit 2fc8aca) Signed-off-by: Josh Rosen <[email protected]>
Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()). Add an option to keep object in driver (it's False by default) to save memory in driver. Author: Davies Liu <[email protected]> Closes apache#1912 from davies/broadcast and squashes the following commits: e06df4a [Davies Liu] load broadcast from disk in driver automatically db3f232 [Davies Liu] fix serialization of accumulator 631a827 [Davies Liu] Merge branch 'master' into broadcast c7baa8c [Davies Liu] compress serrialized broadcast and command 9a7161f [Davies Liu] fix doc tests e93cf4b [Davies Liu] address comments: add test 6226189 [Davies Liu] improve large broadcast
This PR updates UC-Spark-Authz plugin to 0.1.3 Change list: https://github.pie.apple.com/apple-cloud-services/uc-spark-authz/compare/0d9f00e...f371827
Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()).
Add an option to keep object in driver (it's False by default) to save memory in driver.