Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add countApprox and countApproxDistinct
meanApprox() and sumApprox()
  • Loading branch information
davies committed Aug 6, 2014
commit 1218b3b8c997e81cecc23a57f83cd79b5eac9147
19 changes: 18 additions & 1 deletion core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.api.java.{JavaDoubleRDD, JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -741,6 +741,23 @@ private[spark] object PythonRDD extends Logging {
}
}
}

/**
* Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
*/
def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor point - but we created SerDeUtil object for the SequenceFile/InputFormat stuff, maybe these Java <-> Python conversion methods can all live there instead, it's a little cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Then we can move all the pickle stuff into SerDeUtil in another PR.

pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
if (batched) {
obj.asInstanceOf[JArrayList[_]]
} else {
Seq(obj)
}
}
}.toJavaRDD()
}
}

private
Expand Down
103 changes: 81 additions & 22 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,22 @@ def _parse_memory(s):
return int(float(s[:-1]) * units[s[-1].lower()])


class BoundedFloat(float):
"""
Bounded value is generated by approximate job, with confidence and low
bound and high bound.

>>> BoundedFloat(100.0, 0.95, 95.0, 105.0)
100.0
"""
def __new__(cls, mean, confidence, low, high):
obj = float.__new__(cls, mean)
obj.confidence = confidence
obj.low = low
obj.high = high
return obj


class RDD(object):

"""
Expand Down Expand Up @@ -1911,71 +1927,114 @@ def lookup(self, key):
"""
raise NotImplementedError

def countApprox(self, timeout, confidence=0.95):
def _is_pickled(self):
""" Return this RDD is serialized by Pickle or not. """
der = self._jrdd_deserializer
if isinstance(der, PickleSerializer):
return True
if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
return True
return False

def _to_jrdd(self):
""" Return an JavaRDD """
if not self._is_pickled():
self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)

def countApproxDistinct(self, relativeSD=0.05):
"""
:: Experimental ::
Approximate version of count() that returns a potentially incomplete
result within a timeout, even if not all tasks have finished.
Return approximate number of distinct elements in the RDD.

Not implemented.
The algorithm used is based on streamlib's implementation of
"HyperLogLog in Practice: Algorithmic Engineering of a State
of The Art Cardinality Estimation Algorithm", available
<a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.

:param: relativeSD Relative accuracy. Smaller values create
counters that require more space.
It must be greater than 0.000017.

>>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
>>> 950 < n < 1050
True
"""
raise NotImplementedError
return self._to_jrdd().countApproxDistinct(relativeSD)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really work? The Java version of this RDD will be batched, and it will contain byte arrays that may not be comparable directly with the algorithm in countApproxDistinct. Please add some tests for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I guess you remove the batching, but still, it's not super clear that countApproxDistinct will work on byte arrays.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What countApproxDistinct() need is the hash code of items, so this means that we create a hash code for Python objects by hash(pickle.dumps(obj)), so I think this could work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the JavaRDD is an RDD[Object], by unpickle the Python objects. so it support most of the builtin types. (those supported by Pyrolite)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, okay. Please add some tests for this in tests.py to make sure we cover all the data types we'll care about (e.g. strings, integers, tuples, etc). We might also want to document which data types it works with above.


def countApproxDistinct(self, timeout, confidence=0.95):
def countApproxDistinctByKey(self, timeout, confidence=0.95):
"""
:: Experimental ::
Return approximate number of distinct elements in the RDD.
Return approximate number of distinct values for each key in this RDD.

Not implemented.
"""
raise NotImplementedError

def countByValueApprox(self, timeout, confidence=0.95):
def countApprox(self, timeout, confidence=0.95):
"""
:: Experimental::
Approximate version of countByValue().
:: Experimental ::
Approximate version of count() that returns a potentially incomplete
result within a timeout, even if not all tasks have finished.

Not implemented.
>>> rdd = sc.parallelize(range(1000), 10)
>>> rdd.countApprox(1000, 1.0)
1000
"""
raise NotImplementedError
drdd = self.mapPartitions(lambda it:[float(sum(1 for i in it))])
return int(drdd.sumApprox(timeout, confidence))

def sumApprox(self, timeout, confidence=0.95):
"""
:: Experimental ::
Approximate operation to return the sum within a timeout
or meet the confidence.

Not implemented.
>>> rdd = sc.parallelize(range(1000), 10)
>>> r = sum(xrange(1000))
>>> (rdd.sumApprox(1000) - r) / r < 0.05
True
"""
raise NotImplementedError
jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd()
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())

def meanApprox(self, timeout, confidence=0.95):
"""
:: Experimental ::
Approximate operation to return the mean within a timeout
or meet the confidence.

Not implemented.
>>> rdd = sc.parallelize(range(1000), 10)
>>> r = sum(xrange(1000)) / 1000.0
>>> (rdd.meanApprox(1000) - r) / r < 0.05
True
"""
raise NotImplementedError
jrdd = self.map(float)._to_jrdd()
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())

def countApproxDistinctByKey(self, timeout, confidence=0.95):
def countByKeyApprox(self, timeout, confidence=0.95):
"""
:: Experimental ::
Return approximate number of distinct values for each key in this RDD.
Approximate version of countByKey that can return a partial result
if it does not finish within a timeout.

Not implemented.
"""
raise NotImplementedError

def countByKeyApprox(self, timeout, confidence=0.95):
def countByValueApprox(self, timeout, confidence=0.95):
"""
:: Experimental ::
Approximate version of countByKey that can return a partial result if it does not finish within a timeout.
:: Experimental::
Approximate version of countByValue().

Not implemented.
"""
raise NotImplementedError
return self.map(lambda x: (x, None)).countByKeyApprox(timeout, confidence)


class PipelinedRDD(RDD):
Expand Down