Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import sys
import shlex
import traceback
from bisect import bisect_right
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem to be used

from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
Expand Down Expand Up @@ -534,7 +535,26 @@ def func(iterator):
return reduce(op, vals, zeroValue)

# TODO: aggregate


def max(self):
"""
Find the maximum item in this RDD.

>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
43.0
"""
return self.stats().max()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use a more direct reduce here too


def min(self):
"""
Find the maximum item in this RDD.

>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
1.0
"""
return self.stats().min()

def sum(self):
"""
Add up the elements in this RDD.
Expand Down
23 changes: 21 additions & 2 deletions python/pyspark/statcounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def __init__(self, values=[]):
self.n = 0L # Running count of our values
self.mu = 0.0 # Running mean of our values
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)

self.max_v = float("-inf")
self.min_v = float("inf")

for v in values:
self.merge(v)

Expand All @@ -36,6 +38,11 @@ def merge(self, value):
self.n += 1
self.mu += delta / self.n
self.m2 += delta * (value - self.mu)
if self.max_v < value:
self.max_v = value
if self.min_v > value:
self.min_v = value

return self

# Merge another StatCounter into this one, adding up the internal statistics.
Expand All @@ -49,7 +56,10 @@ def mergeStats(self, other):
if self.n == 0:
self.mu = other.mu
self.m2 = other.m2
self.n = other.n
self.n = other.n
self.max_v = other.max_v
self.min_v = other.min_v

elif other.n != 0:
delta = other.mu - self.mu
if other.n * 10 < self.n:
Expand All @@ -58,6 +68,9 @@ def mergeStats(self, other):
self.mu = other.mu - (delta * self.n) / (self.n + other.n)
else:
self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n)

self.max_v = max(self.max_v, other.max_v)
self.min_v = min(self.min_v, other.min_v)

self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n)
self.n += other.n
Expand All @@ -76,6 +89,12 @@ def mean(self):
def sum(self):
return self.n * self.mu

def min(self):
return self.min_v

def max(self):
return self.max_v

# Return the variance of the values.
def variance(self):
if self.n == 0:
Expand Down