Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove evenBuckets, add more tests (including str)
  • Loading branch information
davies committed Aug 25, 2014
commit 84e85fa3f4358f608e6a362b3931d5a11ec2755f
59 changes: 44 additions & 15 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ def redFunc(left_counter, right_counter):

return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)

def histogram(self, buckets, evenBuckets=False):
def histogram(self, buckets):
"""
Compute a histogram using the provided buckets. The buckets
are all open to the right except for the last which is closed.
Expand All @@ -866,7 +866,7 @@ def histogram(self, buckets, evenBuckets=False):

If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
this can be switched from an O(log n) inseration to O(1) per
element(where n = # buckets), if you set `even` to True.
element(where n = # buckets).

Buckets must be sorted and not contain any duplicates, must be
at least two elements.
Expand All @@ -886,31 +886,45 @@ def histogram(self, buckets, evenBuckets=False):
([0, 25, 50], [25, 26])
>>> rdd.histogram([0, 5, 25, 50])
([0, 5, 25, 50], [5, 20, 26])
>>> rdd.histogram([0, 15, 30, 45, 60], True)
>>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets
([0, 15, 30, 45, 60], [15, 15, 15, 6])
>>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
>>> rdd.histogram(("a", "b", "c"))
(('a', 'b', 'c'), [2, 2])
"""

if isinstance(buckets, (int, long)):
if buckets < 1:
raise ValueError("number of buckets must be >= 1")

# filter out non-comparable elements
self = self.filter(lambda x: x is not None and not isnan(x))
def comparable(x):
if x is None:
return False
if type(x) is float and isnan(x):
return False
return True

filtered = self.filter(comparable)

# faster than stats()
def minmax(a, b):
return min(a[0], b[0]), max(a[1], b[1])
try:
minv, maxv = self.map(lambda x: (x, x)).reduce(minmax)
minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax)
except TypeError as e:
if e.message == "reduce() of empty sequence with no initial value":
if " empty " in e.message:
raise ValueError("can not generate buckets from empty RDD")
raise

if minv == maxv or buckets == 1:
return [minv, maxv], [self.count()]
return [minv, maxv], [filtered.count()]

try:
inc = (maxv - minv) / buckets
except TypeError:
raise TypeError("Can not generate buckets with non-number in RDD")

inc = (maxv - minv) / buckets
if isinf(inc):
raise ValueError("Can not generate buckets with infinite value")

Expand All @@ -920,28 +934,43 @@ def minmax(a, b):

buckets = [i * inc + minv for i in range(buckets)]
buckets.append(maxv) # fix accumulated error
evenBuckets = True
even = True

else:
elif isinstance(buckets, (list, tuple)):
if len(buckets) < 2:
raise ValueError("buckets should have more than one value")

if any(i is None or isnan(i) for i in buckets):
if any(i is None or isinstance(i, float) and isnan(i) for i in buckets):
raise ValueError("can not have None or NaN in buckets")

if sorted(buckets) != buckets:
if sorted(buckets) != list(buckets):
raise ValueError("buckets should be sorted")

if len(set(buckets)) != len(buckets):
raise ValueError("buckets should not contain duplicated values")

minv = buckets[0]
maxv = buckets[-1]
inc = buckets[1] - buckets[0] if evenBuckets else None
even = False
inc = None
try:
steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)]
except TypeError:
pass # objects in buckets do not support '-'
else:
if max(steps) - min(steps) < 1e-10: # handle precision errors
even = True
inc = (maxv - minv) / (len(buckets) - 1)

else:
raise TypeError("buckets should be a list or tuple or number(int or long)")

def histogram(iterator):
counters = [0] * len(buckets)
for i in iterator:
if i is None or isnan(i) or i > maxv or i < minv:
if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv:
continue
t = (int((i - minv) / inc) if evenBuckets
t = (int((i - minv) / inc) if even
else bisect.bisect_right(buckets, i) - 1)
counters[t] += 1
# add last two together
Expand Down
30 changes: 21 additions & 9 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,26 +368,25 @@ def test_histogram(self):
# empty
rdd = self.sc.parallelize([])
self.assertEquals([0], rdd.histogram([0, 10])[1])
self.assertEquals([0], rdd.histogram([0, 10], True)[1])
self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])
self.assertRaises(ValueError, lambda: rdd.histogram(1))

# out of range
rdd = self.sc.parallelize([10.01, -0.01])
self.assertEquals([0], rdd.histogram([0, 10])[1])
self.assertEquals([0], rdd.histogram([0, 10], True)[1])
self.assertEquals([0, 0], rdd.histogram((0, 4, 10))[1])

# in range with one bucket
rdd = self.sc.parallelize(range(1, 5))
self.assertEquals([4], rdd.histogram([0, 10])[1])
self.assertEquals([4], rdd.histogram([0, 10], True)[1])
self.assertEquals([3, 1], rdd.histogram([0, 4, 10])[1])

# in range with one bucket exact match
self.assertEquals([4], rdd.histogram([1, 4])[1])
self.assertEquals([4], rdd.histogram([1, 4], True)[1])

# out of range with two buckets
rdd = self.sc.parallelize([10.01, -0.01])
self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1])
self.assertEquals([0, 0], rdd.histogram([0, 5, 10], True)[1])

# out of range with two uneven buckets
rdd = self.sc.parallelize([10.01, -0.01])
Expand All @@ -396,12 +395,10 @@ def test_histogram(self):
# in range with two buckets
rdd = self.sc.parallelize([1, 2, 3, 5, 6])
self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])
self.assertEquals([3, 2], rdd.histogram([0, 5, 10], True)[1])

# in range with two bucket and None
rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')])
self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])
self.assertEquals([3, 2], rdd.histogram([0, 5, 10], True)[1])

# in range with two uneven buckets
rdd = self.sc.parallelize([1, 2, 3, 5, 6])
Expand All @@ -421,12 +418,14 @@ def test_histogram(self):
self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])

# out of range with infinite buckets
rdd = self.sc.parallelize([10.01, -0.01, float('nan')])
self.assertEquals([1, 1], rdd.histogram([float('-inf'), 0, float('inf')])[1])
rdd = self.sc.parallelize([10.01, -0.01, float('nan'), float("inf")])
self.assertEquals([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1])

# invalid buckets
self.assertRaises(ValueError, lambda: rdd.histogram([]))
self.assertRaises(ValueError, lambda: rdd.histogram([1]))
self.assertRaises(ValueError, lambda: rdd.histogram(0))
self.assertRaises(TypeError, lambda: rdd.histogram({}))

# without buckets
rdd = self.sc.parallelize(range(1, 5))
Expand Down Expand Up @@ -456,6 +455,19 @@ def test_histogram(self):
rdd = self.sc.parallelize([float('nan')])
self.assertRaises(ValueError, lambda: rdd.histogram(2))

# string
rdd = self.sc.parallelize(["ab", "ac", "b", "bd", "ef"], 2)
self.assertEquals([2, 2], rdd.histogram(["a", "b", "c"])[1])
self.assertEquals((["ab", "ef"], [5]), rdd.histogram(1))
self.assertRaises(TypeError, lambda: rdd.histogram(2))

# mixed RDD
rdd = self.sc.parallelize([1, 4, "ab", "ac", "b"], 2)
self.assertEquals([1, 1], rdd.histogram([0, 4, 10])[1])
self.assertEquals([2, 1], rdd.histogram(["a", "b", "c"])[1])
self.assertEquals(([1, "b"], [5]), rdd.histogram(1))
self.assertRaises(TypeError, lambda: rdd.histogram(2))


class TestIO(PySparkTestCase):

Expand Down