-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-2871] [PySpark] add histgram() API #2091
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,7 @@ | |
| import heapq | ||
| import bisect | ||
| from random import Random | ||
| from math import sqrt, log | ||
| from math import sqrt, log, isinf, isnan | ||
|
|
||
| from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ | ||
| BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ | ||
|
|
@@ -856,6 +856,104 @@ def redFunc(left_counter, right_counter): | |
|
|
||
| return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) | ||
|
|
||
| def histogram(self, buckets, evenBuckets=False): | ||
| """ | ||
| Compute a histogram using the provided buckets. The buckets | ||
| are all open to the right except for the last which is closed. | ||
| e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], | ||
| which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 | ||
| and 50 we would have a histogram of 1,0,1. | ||
|
|
||
| If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), | ||
| this can be switched from an O(log n) inseration to O(1) per | ||
| element(where n = # buckets), if you set `even` to True. | ||
|
|
||
| Buckets must be sorted and not contain any duplicates, must be | ||
| at least two elements. | ||
|
|
||
| If `buckets` is a number, it will generates buckets which is | ||
| evenly spaced between the minimum and maximum of the RDD. For | ||
| example, if the min value is 0 and the max is 100, given buckets | ||
| as 2, the resulting buckets will be [0,50) [50,100]. buckets must | ||
| be at least 1 If the RDD contains infinity, NaN throws an exception | ||
| If the elements in RDD do not vary (max == min) always returns | ||
| a single bucket. | ||
|
|
||
| It will return an tuple of buckets and histogram. | ||
|
|
||
| >>> rdd = sc.parallelize(range(51)) | ||
| >>> rdd.histogram(2) | ||
| ([0, 25, 50], [25, 26]) | ||
| >>> rdd.histogram([0, 5, 25, 50]) | ||
| ([0, 5, 25, 50], [5, 20, 26]) | ||
| >>> rdd.histogram([0, 15, 30, 45, 60], True) | ||
| ([0, 15, 30, 45, 60], [15, 15, 15, 6]) | ||
| """ | ||
|
|
||
| if isinstance(buckets, (int, long)): | ||
| if buckets < 1: | ||
| raise ValueError("buckets should not less than 1") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about "number of buckets must be >= 1" ? |
||
|
|
||
| # filter out non-comparable elements | ||
| self = self.filter(lambda x: x is not None and not isnan(x)) | ||
|
|
||
| # faster than stats() | ||
| def minmax(a, b): | ||
| return min(a[0], b[0]), max(a[1], b[1]) | ||
| try: | ||
| minv, maxv = self.map(lambda x: (x, x)).reduce(minmax) | ||
| except TypeError as e: | ||
| if e.message == "reduce() of empty sequence with no initial value": | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. checking the message is fragile, it can change and break this code how about letting all TypeErrors propagate?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's fragile. If we propagate the message from reduce(), user may treat this is a bug of PySpark, because use did not call reduce() explicitly. Or we define a special exception for reduce() ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the goal of propagating messages that do not expose implementation details is good imho are you confident that the "empty sequence" is the only exception that could arise? i was thinking about a mixed types in the rdd, but maybe that's not a problem
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although this is fragile, nothing bad will happen if this breaks because we'll end up re-raising the exception on line 908. |
||
| raise ValueError("can not generate buckets from empty RDD") | ||
| raise | ||
|
|
||
| if minv == maxv or buckets == 1: | ||
| return [minv, maxv], [self.count()] | ||
|
|
||
| inc = (maxv - minv) / buckets | ||
| if isinf(inc): | ||
| raise ValueError("Can not generate buckets with infinite value") | ||
|
|
||
| # keep them as integer if possible | ||
| if inc * buckets != maxv - minv: | ||
| inc = (maxv - minv) * 1.0 / buckets | ||
|
|
||
| buckets = [i * inc + minv for i in range(buckets)] | ||
| buckets.append(maxv) # fix accumulated error | ||
| evenBuckets = True | ||
|
|
||
| else: | ||
| if len(buckets) < 2: | ||
| raise ValueError("buckets should have more than one value") | ||
|
|
||
| if any(i is None or isnan(i) for i in buckets): | ||
| raise ValueError("can not have None or NaN in buckets") | ||
|
|
||
| if sorted(buckets) != buckets: | ||
| raise ValueError("buckets should be sorted") | ||
|
|
||
| minv = buckets[0] | ||
| maxv = buckets[-1] | ||
| inc = buckets[1] - buckets[0] if evenBuckets else None | ||
|
|
||
| def histogram(iterator): | ||
| counters = [0] * len(buckets) | ||
| for i in iterator: | ||
| if i is None or isnan(i) or i > maxv or i < minv: | ||
| continue | ||
| t = (int((i - minv) / inc) if evenBuckets | ||
| else bisect.bisect_right(buckets, i) - 1) | ||
| counters[t] += 1 | ||
| # add last two together | ||
| last = counters.pop() | ||
| counters[-1] += last | ||
| return [counters] | ||
|
|
||
| def mergeCounters(a, b): | ||
| return [i + j for i, j in zip(a, b)] | ||
|
|
||
| return buckets, self.mapPartitions(histogram).reduce(mergeCounters) | ||
|
|
||
| def mean(self): | ||
| """ | ||
| Compute the mean of this RDD's elements. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - buckets which are