Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add zipWithIndex and zipWithUniqueId
  • Loading branch information
davies committed Aug 5, 2014
commit a95eca01ebfd023a5b016015b49d98abbd658287
9 changes: 9 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,15 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))

def runApproximateJob(self, rdd, func, evaluator, timeout):
"""
:: DeveloperApi ::
Run a job that can return approximate results.

Not implemented.
"""
raise NotImplementedError


def _test():
import atexit
Expand Down
80 changes: 61 additions & 19 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,10 +907,10 @@ def histogram(self, buckets=None, even=False):

>>> rdd = sc.parallelize(range(51))
>>> rdd.histogram(2)
([0L, 25L, 50L], [25L, 26L]
>>> rdd.histogram(3, [0, 5, 25, 50])
[5L, 20L, 25L]
>>> rdd.histogram(4, [0, 15, 30, 45, 60], True)
([0.0, 25.0, 50.0], [25L, 26L])
>>> rdd.histogram([0, 5, 25, 50])
[5L, 20L, 26L]
>>> rdd.histogram([0, 15, 30, 45, 60], True)
[15L, 15L, 15L, 6L]
"""

Expand All @@ -923,12 +923,12 @@ def histogram(self, buckets=None, even=False):
raise ValueError("buckets should be greater than 1")

r = jdrdd.histogram(buckets)
return list(r._r1()), list(r._2())
return list(r._1()), list(r._2())

jbuckets = self.ctx._gateway.new_array(self.ctx._gateway.jvm.java.lang.Double, len(buckets))
for i in range(len(buckets)):
jbuckets[i] = float(buckets[i])
return list(jdrdd.histogram(jbuckets, evenBuckets))
return list(jdrdd.histogram(jbuckets, even))

def mean(self):
"""
Expand Down Expand Up @@ -1750,29 +1750,56 @@ def zip(self, other):
>>> x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
"""
if self.getNumPartitions() != other.getNumPartitions():
raise ValueError("the number of partitions dose not match"
" with each other")

pairRDD = self._jrdd.zip(other._jrdd)
deserializer = PairDeserializer(self._jrdd_deserializer,
other._jrdd_deserializer)
return RDD(pairRDD, self.ctx, deserializer)

def zipPartitions(self, other, f):
def zipPartitions(self, other, f, preservesPartitioning=False):
"""
Zip this RDD's partitions with one (or more) RDD(s) and return a
new RDD by applying a function to the zipped partitions.

Not implemented.
"""
raise NotImplementedError

def zipWithIndex(self):
"""
Zips this RDD with its element indices.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scala documentation is much more descriptive about what this method does:

  /**
   * Zips this RDD with its element indices. The ordering is first based on the partition index
   * and then the ordering of items within each partition. So the first item in the first
   * partition gets index 0, and the last item in the last partition receives the largest index.
   * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
   * This method needs to trigger a spark job when this RDD contains more than one partitions.
   */
  def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)

The Python documentation should explain these subtleties, too.


>>> sc.parallelize(range(4), 2).zipWithIndex().collect()
[(0, 0), (1, 1), (2, 2), (3, 3)]
"""
raise NotImplementedError
nums = self.glom().map(lambda it: sum(1 for i in it)).collect()
starts = [0]
for i in range(len(nums) - 1):
starts.append(starts[-1] + nums[i])

def func(k, it):
for i, v in enumerate(it):
yield starts[k] + i, v

return self.mapPartitionsWithIndex(func)

def zipWithUniqueId(self):
"""
Zips this RDD with generated unique Long ids.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same case here: this should be similarly descriptive to the Scala docs:

 /**
   * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
   * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
   * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
   */
  def zipWithUniqueId(): RDD[(T, Long)] = {


>>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
[(0, 0), (2, 1), (1, 2), (3, 3)]
"""
raise NotImplementedError
n = self.getNumPartitions()

def func(k, it):
for i, v in enumerate(it):
yield i * n + k, v

return self.mapPartitionsWithIndex(func)

def name(self):
"""
Expand Down Expand Up @@ -1842,63 +1869,78 @@ def _defaultReducePartitions(self):
def lookup(self, key):
"""
Return the list of values in the RDD for key key.

Not Implemented
"""
raise NotImplementedError

def countApprox(self, timeout, confidence=1.0):
def countApprox(self, timeout, confidence=0.95):
"""
:: Experimental ::
Approximate version of count() that returns a potentially incomplete
result within a timeout, even if not all tasks have finished.

Not implemented.
"""
raise NotImplementedError

def countApproxDistinct(self, timeout, confidence=1.0):
def countApproxDistinct(self, timeout, confidence=0.95):
"""
:: Experimental ::
Return approximate number of distinct elements in the RDD.

Not implemented.
"""
raise NotImplementedError

def countByValueApprox(self, timeout, confidence=1.0):
def countByValueApprox(self, timeout, confidence=0.95):
"""
:: Experimental::
Approximate version of countByValue().

Not implemented.
"""
raise NotImplementedError

def sumApprox(self, timeout, confidence=1.0):
def sumApprox(self, timeout, confidence=0.95):
"""
:: Experimental ::
Approximate operation to return the sum within a timeout
or meet the confidence.

Not implemented.
"""
raise NotImplementedError

def meanApprox(self, timeout, confidence=1.0):
def meanApprox(self, timeout, confidence=0.95):
"""
:: Experimental ::
Approximate operation to return the mean within a timeout
or meet the confidence.

Not implemented.
"""
raise NotImplementedError

def countApproxDistinctByKey(self):
def countApproxDistinctByKey(self, timeout, confidence=0.95):
"""
:: Experimental ::
Return approximate number of distinct values for each key in this RDD.

Not implemented.
"""
raise NotImplementedError

def countByKeyApprox(self, timeout, confidence=1.0):
def countByKeyApprox(self, timeout, confidence=0.95):
"""
:: Experimental ::
Approximate version of countByKey that can return a partial result if it does not finish within a timeout.

Not implemented.
"""
raise NotImplementedError





class PipelinedRDD(RDD):

"""
Expand Down