Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-1170 Added histogram(buckets) to pyspark and not histogram(noOf…
…Buckets).
  • Loading branch information
ScrapCodes authored and Chandan Kumar committed Aug 5, 2014
commit 0c2bbdd2956db71d227f89822f246293fc6e6458
136 changes: 79 additions & 57 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
get_used_memory

from py4j.java_collections import ListConverter, MapConverter
from bisect import bisect_left

__all__ = ["RDD"]

Expand Down Expand Up @@ -902,66 +903,87 @@ def sampleVariance(self):
1.0
"""
return self.stats().sampleVariance()

def histogram(self, buckets=None, evenBuckets=False, bucketCount=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can define this API as

def histogram(self, buckets, even=False):

buckets can be list or int.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Why didn't I come up with it :)

"""
Compute a histogram using the provided buckets. The buckets are all open
to the left except for the last which is closed e.g. for the array
[1, 10, 20, 50] the buckets are [1, 10) [10, 20) [20, 50] i.e. 1<=x<10,
10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a
histogram of 1, 0, 1.

If bucketCount is supplied, evenly-spaced buckets are automatically
constructed using the minimum and maximum of the RDD. For example if the
min value is 0 and the max is 100 and there are two buckets the resulting
buckets will be [0, 50) [50, 100]. bucketCount must be at least 1.
Exactly one of buckets and bucketCount must be provided.

Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can
be switched from an O(log n) computation to O(1) per element (where n is
the number of buckets) if you set evenBuckets to true.
buckets must be sorted and not contain any duplicates.
buckets array must be at least two elements

>>> a = sc.parallelize(range(100))
>>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], evenBuckets=True)
[10, 10, 10, 10, 10, 10, 10, 10, 11]
>>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90])
[10, 10, 10, 10, 10, 10, 10, 10, 11]
>>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99])
[10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
"""

if (buckets and bucketCount) or (not buckets and not bucketCount):
raise ValueError("Pass either buckets or bucketCount but not both")

if bucketCount <= 0:
raise ValueError("bucketCount must be positive")

def getBuckets():
#use the statscounter as a quick way of getting max and min
mm_stats = self.stats()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call it stats for short

min = mm_stats.min()
max = mm_stats.max()
increment = (max - min) / bucketCount
if increment != 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if increment:

buckets = range(min, max, increment)
else:
buckets = [min, max]
return buckets

def _getBuckets(self, bucketCount):
#use the statscounter as a quick way of getting max and min
mm_stats = self.stats()
min = mm_stats.min()
max = mm_stats.max()

increment = (max-min)/bucketCount
buckets = range(min,min)
if increment != 0:
buckets = range(min,max, increment)

return {"min":min, "max":max, "buckets":buckets}

def histogram(self, bucketCount, buckets=None):
"""
Compute a histogram of the data using bucketCount number of buckets
evenly spaced between the min and max of the RDD.

>>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3)
defaultdict(<type 'int'>, {(67, 100): 2, (1, 33): 6, (34, 66): 2})
"""
min = float("-inf")
max = float("inf")
evenBuckets = False
if not buckets:
b = self._getBuckets(bucketCount)
buckets = b["buckets"]
min = b["min"]
max = b["max"]

if len(buckets) < 2:
raise ValueError("requires more than 1 bucket")
if len(buckets) % 2 == 0:
evenBuckets = True
# histogram partition
def histogramPartition(iterator):
counters = defaultdict(int)
for obj in iterator:
k = bisect_right(buckets, obj)
if k < len(buckets) and k > 0:
key = (buckets[k-1], buckets[k]-1)
elif k == len(buckets):
key = (buckets[k-1], max)
elif k == 0:
key = (min, buckets[k]-1)
print obj, k, key
counters[key] += 1
yield counters

# merge counters
def mergeCounters(d1, d2):
for k in d2.keys():
if k in d1:
d1[k] += d2[k]
return d1

#map partitions(histogram_partition(bucketFunction)).reduce(mergeCounters)
return self.mapPartitions(histogramPartition).reduce(mergeCounters)
counters = [0 for i in range(len(buckets) - 1)]
for i in iterator:
if evenBuckets:
t = fastBucketFunction(buckets[0], buckets[1] - buckets[0], len(buckets), i)
else:
t = basicBucketFunction(i)
if t:
counters[t] += 1
return [counters]

def mergeCounters(a1, a2):
for i in range(len(a1)):
a1[i] = a1[i] + a2[i]
return a1

def basicBucketFunction(e):
loc = bisect_left(buckets, e, 0, len(buckets))
if loc > 0 and loc < len(buckets):
return loc - 1
else:
return None

def fastBucketFunction(minimum, inc, count, e):
bucketNumber = (e - minimum) // inc
if (bucketNumber >= count or bucketNumber < 0):
return None
return min(bucketNumber, count -1)

if bucketCount:
evenBuckets = True
buckets = getBuckets()
return self.mapPartitions(lambda x: histogramPartition(x)).reduce(mergeCounters)

def countByValue(self):
"""
Expand Down