-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4398][PySpark] specialize sc.parallelize(xrange) #3264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #23360 has started for PR 3264 at commit
|
python/pyspark/context.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about pre-calculate all the boundaries for all the partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only serializes an xrange object. If we pre-calculate the boundaries, the cost is O(p).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but the size + 1 is tricky, how about this one:
start = c[0]
def getStart(split):
return start + size * split / numSlices * step
def f(split, iterator):
return xrange(getStart(split), getStart(split+1), step)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is better!
|
Test build #23360 has finished for PR 3264 at commit
|
|
Test PASSed. |
|
Test build #23374 has started for PR 3264 at commit
|
|
Test build #23375 has started for PR 3264 at commit
|
|
LGTM, thanks! |
|
Test build #23375 has finished for PR 3264 at commit
|
|
Test PASSed. |
|
Test build #23374 has finished for PR 3264 at commit
|
|
Test PASSed. |
|
@davies Thanks! I've merged this into master and branch-1.2. |
`sc.parallelize(range(1 << 20), 1).count()` may take 15 seconds to finish and the rdd object stores the entire list, making task size very large. This PR adds a specialized version for xrange. JoshRosen davies Author: Xiangrui Meng <[email protected]> Closes #3264 from mengxr/SPARK-4398 and squashes the following commits: 8953c41 [Xiangrui Meng] follow davies' suggestion cbd58e3 [Xiangrui Meng] specialize sc.parallelize(xrange) (cherry picked from commit abd5817) Signed-off-by: Xiangrui Meng <[email protected]>
… parallelize lazy iterable range ## What changes were proposed in this pull request? During the follow-up work(#23435) for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because of the specialize rdd.parallelize logic for xrange(introduced in #3264) generated data by lazy iterable range, which don't need to use the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse takes no effect. See more details in [SPARK-26549](https://issues.apache.org/jira/browse/SPARK-26549) description. We fix this by force using the passed-in iterator. ## How was this patch tested? New UT in test_worker.py. Closes #23470 from xuanyuanking/SPARK-26549. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
… parallelize lazy iterable range ## What changes were proposed in this pull request? During the follow-up work(apache#23435) for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because of the specialize rdd.parallelize logic for xrange(introduced in apache#3264) generated data by lazy iterable range, which don't need to use the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse takes no effect. See more details in [SPARK-26549](https://issues.apache.org/jira/browse/SPARK-26549) description. We fix this by force using the passed-in iterator. ## How was this patch tested? New UT in test_worker.py. Closes apache#23470 from xuanyuanking/SPARK-26549. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
sc.parallelize(range(1 << 20), 1).count()may take 15 seconds to finish and the rdd object stores the entire list, making task size very large. This PR adds a specialized version for xrange.@JoshRosen @davies