Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-1438 RDD . Replace System.nanoTime with a Random generated numb…
…er. python: use a separate instance of Random instead of seeding language api global Random instance.
  • Loading branch information
arun-rama committed Apr 23, 2014
commit 8d05b1a7719d0fdf262f130ed5a71f1030b153e4
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.StatCounter
import org.apache.spark.util.Utils

class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, JavaDoubleRDD] {

Expand Down Expand Up @@ -134,7 +135,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: JDouble): JavaDoubleRDD =
sample(withReplacement, fraction, System.nanoTime)
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
(implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
Expand Down Expand Up @@ -120,7 +121,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double): JavaPairRDD[K, V] =
sample(withReplacement, fraction, System.nanoTime)
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark._
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
extends JavaRDDLike[T, JavaRDD[T]] {
Expand Down Expand Up @@ -99,7 +100,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] =
sample(withReplacement, fraction, System.nanoTime)
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
Expand Down Expand Up @@ -395,7 +396,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}

def takeSample(withReplacement: Boolean, num: Int): JList[T] =
takeSample(withReplacement, num, System.nanoTime)
takeSample(withReplacement, num, Utils.random.nextLong)

def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] = {
import scala.collection.JavaConversions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.reflect.ClassTag

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.util.random.RandomSampler
import org.apache.spark.util.Utils

private[spark]
class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
Expand All @@ -38,14 +39,15 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
*
* @param prev RDD to be sampled
* @param sampler a random sampler
* @param seed random seed, default to System.nanoTime
* @param seed random seed, default to a Long value generated by an instance of
* java.util.Random shared within the library code
* @tparam T input RDD item type
* @tparam U sampled RDD item type
*/
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
@transient seed: Long = System.nanoTime)
@transient seed: Long = Utils.random.nextLong)
extends RDD[U](prev) {

override def getPartitions: Array[Partition] = {
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ abstract class RDD[T: ClassTag](
/**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long = System.nanoTime): RDD[T] = {
def sample(withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0.0, "Invalid fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), seed)
Expand All @@ -334,19 +336,20 @@ abstract class RDD[T: ClassTag](
* Randomly splits this RDD with the provided weights.
*
* @param weights weights for splits, will be normalized if they don't sum to 1
* @param seed random seed, default to System.nanoTime
* @param seed random seed, default to rand.nextLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to say what the seed defaults to here since users won't understand it; just say @param seed random seed and they can guess that if you don't specify it, we will choose one

*
* @return split RDDs in an array
*/
def randomSplit(weights: Array[Double], seed: Long = System.nanoTime): Array[RDD[T]] = {
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), seed)
}.toArray
}

def takeSample(withReplacement: Boolean, num: Int, seed: Long = System.nanoTime): Array[T] = {
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] =
{
var fraction = 0.0
var total = 0
val multiplier = 3.0
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
private[spark] object Utils extends Logging {

val osName = System.getProperty("os.name")

val random = new Random()

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val data = sc.parallelize(1 to 100, 2)

for (num <- List(5,20,100)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put spaces after the commas here

val sample = data.takeSample(withReplacement=false, num=num)
val sample = data.takeSample(withReplacement=false, num=num)
assert(sample.size === num) // Got exactly num elements
assert(sample.toSet.size === num) // Elements are distinct
assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
}
}
for (seed <- 1 to 5) {
val sample = data.takeSample(withReplacement=false, 20, seed)
assert(sample.size === 20) // Got exactly 20 elements
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from threading import Thread
import warnings
import heapq
import random
from random import Random

from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
Expand Down Expand Up @@ -382,11 +382,11 @@ def takeSample(self, withReplacement, num, seed=None):
# If the first sample didn't turn out large enough, keep trying to take samples;
# this shouldn't happen often because we use a big multiplier for their initial size.
# See: scala/spark/RDD.scala
random.seed(seed)
rand = Random(seed)
while len(samples) < total:
samples = self.sample(withReplacement, fraction, random.randint(0,sys.maxint)).collect()
samples = self.sample(withReplacement, fraction, rand.randint(0,sys.maxint)).collect()

sampler = RDDSampler(withReplacement, fraction, random.randint(0,sys.maxint))
sampler = RDDSampler(withReplacement, fraction, rand.randint(0,sys.maxint))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put spaces after the comma here and in other instances of randint(0,sys.maxint)

sampler.shuffle(samples)
return samples[0:total]

Expand Down
28 changes: 13 additions & 15 deletions python/pyspark/rddsampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, withReplacement, fraction, seed=None):
print >> sys.stderr, "NumPy does not appear to be installed. Falling back to default random generator for sampling."
self._use_numpy = False

self._seed = seed
self._seed = seed if seed is not None else random.randint(0,sys.maxint)
self._withReplacement = withReplacement
self._fraction = fraction
self._random = None
Expand All @@ -38,17 +38,15 @@ def initRandomGenerator(self, split):
if self._use_numpy:
import numpy
self._random = numpy.random.RandomState(self._seed)
for _ in range(0, split):
# discard the next few values in the sequence to have a
# different seed for the different splits
self._random.randint(sys.maxint)
else:
import random
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, we have imported random at the beginning. This line is unnecessary.

random.seed(self._seed)
for _ in range(0, split):
# discard the next few values in the sequence to have a
# different seed for the different splits
random.randint(0, sys.maxint)
self._random = random.Random(self._seed)

for _ in range(0, split):
# discard the next few values in the sequence to have a
# different seed for the different splits
self._random.randint(0, sys.maxint)

self._split = split
self._rand_initialized = True

Expand All @@ -59,7 +57,7 @@ def getUniformSample(self, split):
if self._use_numpy:
return self._random.random_sample()
else:
return random.uniform(0.0, 1.0)
return self._random.uniform(0.0, 1.0)

def getPoissonSample(self, split, mean):
if not self._rand_initialized or split != self._split:
Expand All @@ -73,26 +71,26 @@ def getPoissonSample(self, split, mean):
num_arrivals = 1
cur_time = 0.0

cur_time += random.expovariate(mean)
cur_time += self._random.expovariate(mean)

if cur_time > 1.0:
return 0

while(cur_time <= 1.0):
cur_time += random.expovariate(mean)
cur_time += self._random.expovariate(mean)
num_arrivals += 1

return (num_arrivals - 1)

def shuffle(self, vals):
if self._random == None or split != self._split:
if self._random == None:
self.initRandomGenerator(0) # this should only ever called on the master so
# the split does not matter

if self._use_numpy:
self._random.shuffle(vals)
else:
random.shuffle(vals, self._random)
self._random.shuffle(vals, self._random.random)

def func(self, split, iterator):
if self._withReplacement:
Expand Down