Skip to content

Commit 0c134f3

Browse files
committed
[SPARK-2024] Test refactoring and adding couple unbatched cases
1 parent 7a176df commit 0c134f3

File tree

2 files changed

+53
-35
lines changed

2 files changed

+53
-35
lines changed

python/pyspark/context.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class SparkContext(object):
5353
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
5454
_default_batch_size_for_serialized_input = 10
5555

56-
5756
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
5857
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
5958
gateway=None):

python/pyspark/tests.py

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -295,12 +295,6 @@ def test_sequencefiles(self):
295295
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
296296
self.assertEqual(ints, ei)
297297

298-
unbatched_ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/",
299-
"org.apache.hadoop.io.IntWritable",
300-
"org.apache.hadoop.io.Text",
301-
batchSize=1).collect())
302-
self.assertEqual(unbatched_ints, ei)
303-
304298
doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/",
305299
"org.apache.hadoop.io.DoubleWritable",
306300
"org.apache.hadoop.io.Text").collect())
@@ -395,18 +389,12 @@ def test_oldhadoop(self):
395389
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
396390
self.assertEqual(ints, ei)
397391

398-
unbatched_ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
399-
"org.apache.hadoop.mapred.SequenceFileInputFormat",
400-
"org.apache.hadoop.io.IntWritable",
401-
"org.apache.hadoop.io.Text",
402-
batchSize=1).collect())
403-
self.assertEqual(unbatched_ints, ei)
404-
405392
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
406-
hello = self.sc.hadoopFile(hellopath,
407-
"org.apache.hadoop.mapred.TextInputFormat",
408-
"org.apache.hadoop.io.LongWritable",
409-
"org.apache.hadoop.io.Text").collect()
393+
oldconf = {"mapred.input.dir" : hellopath}
394+
hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
395+
"org.apache.hadoop.io.LongWritable",
396+
"org.apache.hadoop.io.Text",
397+
conf=oldconf).collect()
410398
result = [(0, u'Hello World!')]
411399
self.assertEqual(hello, result)
412400

@@ -420,19 +408,12 @@ def test_newhadoop(self):
420408
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
421409
self.assertEqual(ints, ei)
422410

423-
unbatched_ints = sorted(self.sc.newAPIHadoopFile(
424-
basepath + "/sftestdata/sfint/",
425-
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
426-
"org.apache.hadoop.io.IntWritable",
427-
"org.apache.hadoop.io.Text",
428-
batchSize=1).collect())
429-
self.assertEqual(unbatched_ints, ei)
430-
431411
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
432-
hello = self.sc.newAPIHadoopFile(hellopath,
433-
"org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
434-
"org.apache.hadoop.io.LongWritable",
435-
"org.apache.hadoop.io.Text").collect()
412+
newconf = {"mapred.input.dir" : hellopath}
413+
hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
414+
"org.apache.hadoop.io.LongWritable",
415+
"org.apache.hadoop.io.Text",
416+
conf=newconf).collect()
436417
result = [(0, u'Hello World!')]
437418
self.assertEqual(hello, result)
438419

@@ -620,14 +601,17 @@ def test_reserialization(self):
620601
rdd.saveAsSequenceFile(basepath + "/reserialize/sequence")
621602
result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
622603
self.assertEqual(result1, data)
604+
623605
rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop",
624606
"org.apache.hadoop.mapred.SequenceFileOutputFormat")
625607
result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect())
626608
self.assertEqual(result2, data)
609+
627610
rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop",
628611
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
629612
result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect())
630613
self.assertEqual(result3, data)
614+
631615
conf4 = {
632616
"mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
633617
"mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
@@ -636,6 +620,7 @@ def test_reserialization(self):
636620
rdd.saveAsHadoopDataset(conf4)
637621
result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
638622
self.assertEqual(result4, data)
623+
639624
conf5 = {"mapreduce.outputformat.class" :
640625
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
641626
"mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
@@ -645,13 +630,47 @@ def test_reserialization(self):
645630
result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
646631
self.assertEqual(result5, data)
647632

648-
# unbatched save and read
633+
def test_unbatched_save_and_read(self):
634+
basepath = self.tempdir.name
649635
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
650636
self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
651-
basepath + "/reserialize/unbatched")
652-
unbatched_ints = sorted(self.sc.sequenceFile(basepath + "/reserialize/unbatched",
653-
batchSize=1).collect())
654-
self.assertEqual(unbatched_ints, ei)
637+
basepath + "/unbatched/")
638+
639+
unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/",
640+
batchSize=1).collect())
641+
self.assertEqual(unbatched_sequence, ei)
642+
643+
unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/",
644+
"org.apache.hadoop.mapred.SequenceFileInputFormat",
645+
"org.apache.hadoop.io.IntWritable",
646+
"org.apache.hadoop.io.Text",
647+
batchSize=1).collect())
648+
self.assertEqual(unbatched_hadoopFile, ei)
649+
650+
unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/",
651+
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
652+
"org.apache.hadoop.io.IntWritable",
653+
"org.apache.hadoop.io.Text",
654+
batchSize=1).collect())
655+
self.assertEqual(unbatched_newAPIHadoopFile, ei)
656+
657+
oldconf = {"mapred.input.dir" : basepath + "/unbatched/"}
658+
unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
659+
"org.apache.hadoop.mapred.SequenceFileInputFormat",
660+
"org.apache.hadoop.io.IntWritable",
661+
"org.apache.hadoop.io.Text",
662+
conf=oldconf,
663+
batchSize=1).collect())
664+
self.assertEqual(unbatched_hadoopRDD, ei)
665+
666+
newconf = {"mapred.input.dir" : basepath + "/unbatched/"}
667+
unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
668+
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
669+
"org.apache.hadoop.io.IntWritable",
670+
"org.apache.hadoop.io.Text",
671+
conf=newconf,
672+
batchSize=1).collect())
673+
self.assertEqual(unbatched_newAPIHadoopRDD, ei)
655674

656675
class TestDaemon(unittest.TestCase):
657676
def connect(self, port):

0 commit comments

Comments
 (0)