Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
SPARK-1165 Implemented RDD.intersection in python.
  • Loading branch information
ScrapCodes committed Mar 5, 2014
commit d6effee4ee967f15210d0d57526beab4e3f9c8e2
13 changes: 13 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,19 @@ def union(self, other):
return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
self.ctx.serializer)

def intersection(self, other):
"""
Return the intersection of this RDD and another one.

>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersection(rdd2).collect()
[1, 2, 3]
"""
return self.map(lambda v: (v, None)).cogroup(
other.map(lambda v: (v, None))).filter(
lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)).keys()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably slightly nicer to write this like this:

return self.map(lambda v: (v, None)) \
    .cogroup(other.map(lambda v: (v, None))) \
    .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
    .keys()

Or put parens around the whole thing to avoid Python thinking lines ended (but we used the backslash style before).

Other than that it looks good.


def _reserialize(self):
if self._jrdd_deserializer == self.ctx.serializer:
return self
Expand Down