Skip to content

Conversation

@nolanliou
Copy link
Contributor

@nolanliou nolanliou commented May 26, 2021

What changes were proposed in this pull request?

Limit the batch size for add_shuffle_key in partitionBy function to fix OverflowError: cannot convert float infinity to integer

Why are the changes needed?

It's not easy to write a UT, but I can use some simple code to explain the bug.

  • Original code
        def add_shuffle_key(split, iterator):

            buckets = defaultdict(list)
            c, batch = 0, min(10 * numPartitions, 1000)

            for k, v in iterator:
                buckets[partitionFunc(k) % numPartitions].append((k, v))
                c += 1

                # check used memory and avg size of chunk of objects
                if (c % 1000 == 0 and get_used_memory() > limit
                        or c > batch):
                    n, size = len(buckets), 0
                    for split in list(buckets.keys()):
                        yield pack_long(split)
                        d = outputSerializer.dumps(buckets[split])
                        del buckets[split]
                        yield d
                        size += len(d)

                    avg = int(size / n) >> 20
                    # let 1M < avg < 10M
                    if avg < 1:
                        batch *= 1.5
                    elif avg > 10:
                        batch = max(int(batch / 1.5), 1)
                    c = 0

if get_used_memory() > limit always is True and avg < 1 always is True, the variable batch will grow to infinity. then batch = max(int(batch / 1.5), 1) may raise OverflowError if avg > 10 at some time.

  • sample code to reproduce the bug
import sys

limit = 100
used_memory = 200
numPartitions = 64
c, batch = 0, min(10 * numPartitions, 1000)


while True:
    c += 1
    if (c % 1000 == 0 and used_memory > limit or c > batch):
        batch = batch * 1.5
        d = max(int(batch / 1.5), 1)
        print(c, batch)

Does this PR introduce any user-facing change?

no

How was this patch tested?

It's not easy to write a UT, there is sample code to test

import sys


limit = 100
used_memory = 200
numPartitions = 64
c, batch = 0, min(10 * numPartitions, 1000)


while True:
    c += 1
    if (c % 1000 == 0 and used_memory > limit or c > batch):
        batch = min(sys.maxsize, batch * 1.5)
        d = max(int(batch / 1.5), 1)
        print(c, batch)

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks making sense. cc @viirya and @ueshin too for double checking.

@HyukjinKwon
Copy link
Member

ok to test

@HyukjinKwon
Copy link
Member

@nolanliou please take a look at https://github.com/apache/spark/pull/32667/checks?check_run_id=2671465975 and enable Github Actions. Apache Spark repositry uses the resources in each forked repository in PR builds.

@HyukjinKwon HyukjinKwon changed the title [SPARK-35512][PYTHON]: fix OverflowError(cannot convert float infinity to integer) in partitionBy function [SPARK-35512][PYTHON] Fix OverflowError(cannot convert float infinity to integer) in partitionBy function May 26, 2021
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually when get_used_memory() > limit is true, I don't know why we want to increase batch *= 1.5.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess to increase the size of batch and to use more memory .. ?

Copy link
Member

@viirya viirya May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm..I thought increasing batch is for c > batch. In other words, it increases the size of batch if it reaches the current batch size, but used memory is still under limit (and the average size of bucket is small).

If it reaches memory limit before reaching the batch size (so it means current batch size is more than memory limit), it seems not make sense to increase batch size (even the average size of bucket is small).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. the batch size should not increase when reaching the memory limit

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable. Although I have a question why we increase batch when get_used_memory() > limit.

@SparkQA
Copy link

SparkQA commented May 26, 2021

Test build #138957 has finished for PR 32667 at commit 9710887.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use sys.float_info.max instead, @nolanliou? Hm, btw just realised that it's funny that it can reach to the maximum of float ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway avoiding failure on the batch size makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sys.maxsize is ok.

It’s not easy to encounter this problem, but I ran into it...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, sys.maxsize batch size already doesn't make much sense anyway.

@SparkQA
Copy link

SparkQA commented May 26, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43477/

@nolanliou
Copy link
Contributor Author

@nolanliou please take a look at https://github.com/apache/spark/pull/32667/checks?check_run_id=2671465975 and enable Github Actions. Apache Spark repositry uses the resources in each forked repository in PR builds.

I have enabled actions(Allow all actions), but still not work...

@HyukjinKwon
Copy link
Member

@nolanliou did you face something like this: #32400 (comment)?

@SparkQA
Copy link

SparkQA commented May 26, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43477/

@SparkQA
Copy link

SparkQA commented May 26, 2021

Test build #138964 has finished for PR 32667 at commit b6241fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 26, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43483/

@SparkQA
Copy link

SparkQA commented May 26, 2021

Test build #138973 has finished for PR 32667 at commit 67e6d71.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 26, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43483/

@nolanliou
Copy link
Contributor Author

nolanliou commented May 26, 2021

@nolanliou did you face something like this: #32400 (comment)?

All tests passed?

@HyukjinKwon
Copy link
Member

Let's wait for few days to make sure other people can review.

@SparkQA
Copy link

SparkQA commented May 26, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43493/

@SparkQA
Copy link

SparkQA commented May 26, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43493/

@nolanliou
Copy link
Contributor Author

Any updates?

@viirya
Copy link
Member

viirya commented Jun 8, 2021

retest this please

@SparkQA
Copy link

SparkQA commented Jun 8, 2021

Test build #139511 has finished for PR 32667 at commit 67e6d71.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 9, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44037/

@SparkQA
Copy link

SparkQA commented Jun 9, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44037/

@HyukjinKwon
Copy link
Member

Merged to master.

@nolanliou nolanliou deleted the fix_partitionby branch June 9, 2021 03:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants