Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import struct
import sys
import numpy
from numpy import ndarray, float64, int64, int32, array_equal, array
from pyspark import SparkContext, RDD
Expand Down Expand Up @@ -78,6 +79,14 @@
LABELED_POINT_MAGIC = 4


# Workaround for SPARK-2954: before Python 2.7, struct.unpack couldn't unpack bytearray()s.
if sys.version_info[:2] <= (2, 6):
def _unpack(fmt, string):
return struct.unpack(fmt, buffer(string))
else:
_unpack = struct.unpack


def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
"""
Deserialize a numpy array of the given type from an offset in
Expand Down Expand Up @@ -191,7 +200,7 @@ def _deserialize_double(ba, offset=0):
raise TypeError("_deserialize_double called on a %s; wanted bytearray" % type(ba))
if len(ba) - offset != 8:
raise TypeError("_deserialize_double called on a %d-byte array; wanted 8 bytes." % nb)
return struct.unpack("d", ba[offset:])[0]
return _unpack("d", ba[offset:])[0]


def _deserialize_double_vector(ba, offset=0):
Expand Down
7 changes: 6 additions & 1 deletion python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
Fuller unit tests for Python MLlib.
"""

import sys
from numpy import array, array_equal
import unittest

if sys.version_info[:2] <= (2, 6):
import unittest2 as unittest
else:
import unittest

from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
_deserialize_double_vector, _dot, _squared_distance
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ def _copy_func(f):

_old_namedtuple = _copy_func(collections.namedtuple)

def namedtuple(name, fields, verbose=False, rename=False):
cls = _old_namedtuple(name, fields, verbose, rename)
def namedtuple(*args, **kwargs):
cls = _old_namedtuple(*args, **kwargs)
return _hack_namedtuple(cls)

# replace namedtuple with new one
Expand Down
13 changes: 10 additions & 3 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@
import sys
import tempfile
import time
import unittest
import zipfile

if sys.version_info[:2] <= (2, 6):
import unittest2 as unittest
else:
import unittest


from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.serializers import read_int
Expand Down Expand Up @@ -605,6 +610,7 @@ def test_oldhadoop(self):
conf=input_conf).collect())
self.assertEqual(old_dataset, dict_data)

@unittest.skipIf(sys.version_info[:2] <= (2, 6), "Skipped on 2.6 until SPARK-2951 is fixed")
def test_newhadoop(self):
basepath = self.tempdir.name
# use custom ArrayWritable types and converters to handle arrays
Expand Down Expand Up @@ -905,8 +911,9 @@ def createFileInZip(self, name, content):
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
path = os.path.join(self.programDir, name + ".zip")
with zipfile.ZipFile(path, 'w') as zip:
zip.writestr(name, content)
zip = zipfile.ZipFile(path, 'w')
zip.writestr(name, content)
zip.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this, does with on ZipFiles not work in 2.6?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, support for using ZipFile as a context manager was added in Python 2.7: https://docs.python.org/2.7/library/zipfile.html?highlight=zipfile#zipfile.ZipFile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, makes sense. Patch looks good to me otherwise -- feel free to merge it.

return path

def test_single_script(self):
Expand Down
8 changes: 8 additions & 0 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ function run_test() {

echo "Running PySpark tests. Output is in python/unit-tests.log."

# Try to test with Python 2.6, since that's the minimum version that we support:
if [ $(which python2.6) ]; then
export PYSPARK_PYTHON="python2.6"
fi

echo "Testing with Python version:"
$PYSPARK_PYTHON --version

run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "pyspark/conf.py"
Expand Down