Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,11 @@ private[spark] object PythonRDD extends Logging {
iter.foreach(write)
}

/** Create an RDD that has no partitions or elements. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emptyRDD[T] should produce an RDD of T, not byte arrays right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T in the return type is not used. Actually, it can be an arbitrary type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update it to T

def emptyRDD[T](sc: JavaSparkContext): JavaRDD[Array[Byte]] = {
sc.emptyRDD[Array[Byte]]
}

/**
* Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]],
* key and value class.
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ def stop(self):
with SparkContext._lock:
SparkContext._active_spark_context = None

def emptyRDD(self):
"""
Create an RDD that has no partitions or elements.
"""
return RDD(self._jsc.emptyRDD(), self, NoOpSerializer())

def range(self, start, end=None, step=1, numSlices=None):
"""
Create a new RDD of int containing elements from `start` to `end`
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ def sum(self):
>>> sc.parallelize([1.0, 2.0, 3.0]).sum()
6.0
"""
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)

def count(self):
"""
Expand Down
8 changes: 8 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,14 @@ def test_id(self):
self.assertEqual(id + 1, id2)
self.assertEqual(id2, rdd2.id())

def test_empty_rdd(self):
rdd = self.sc.emptyRDD()
self.assertTrue(rdd.isEmpty())

def test_sum(self):
self.assertEqual(0, self.sc.emptyRDD().sum())
self.assertEqual(6, self.sc.parallelize([1, 2, 3]).sum())

def test_save_as_textfile_with_unicode(self):
# Regression test for SPARK-970
x = u"\u00A1Hola, mundo!"
Expand Down