Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
60e4e2f
support unpickle array.array for Python 2.6
davies Sep 11, 2014
c77c87b
cleanup debugging code
davies Sep 13, 2014
3908f5c
Merge branch 'master' into pickle
davies Sep 13, 2014
f44f771
enable tests about array
davies Sep 13, 2014
b30ef35
use pickle to serialize data for mllib/recommendation
davies Sep 13, 2014
52d1350
use new protocol in mllib/stat
davies Sep 13, 2014
f1544c4
refactor clustering
davies Sep 13, 2014
aa2287e
random
davies Sep 13, 2014
8fe166a
Merge branch 'pickle' into pickle_mllib
davies Sep 13, 2014
cccb8b1
mllib/tree
davies Sep 14, 2014
d9f691f
mllib/util
davies Sep 14, 2014
f2a0856
mllib/regression
davies Sep 14, 2014
c383544
classification
davies Sep 14, 2014
6d26b03
fix tests
davies Sep 14, 2014
4d7963e
remove muanlly serialization
davies Sep 14, 2014
84c721d
Merge branch 'master' into pickle_mllib
davies Sep 14, 2014
b02e34f
remove _common.py
davies Sep 14, 2014
0ee1525
remove outdated tests
davies Sep 14, 2014
722dd96
cleanup _common.py
davies Sep 15, 2014
f3506c5
Merge branch 'master' into pickle_mllib
davies Sep 16, 2014
df19464
memorize the module and class name during pickleing
davies Sep 16, 2014
46a501e
choose batch size automatically
davies Sep 16, 2014
88034f0
rafactor, address comments
davies Sep 16, 2014
9dcfb63
fix style
davies Sep 16, 2014
708dc02
fix tests
davies Sep 16, 2014
e1d1bfc
refactor
davies Sep 16, 2014
44736d7
speed up pickling array in Python 2.7
davies Sep 16, 2014
154d141
fix autobatchedpickler
davies Sep 16, 2014
df625c7
Merge commit '154d141' into pickle_mllib
davies Sep 16, 2014
a379a81
fix pickle array in python2.7
davies Sep 16, 2014
44e0551
fix cache
davies Sep 16, 2014
9ceff73
test size of serialized Rating
davies Sep 16, 2014
a2cc855
fix tests
davies Sep 16, 2014
1fccf1a
address comments
davies Sep 16, 2014
2511e76
cleanup
davies Sep 16, 2014
19d0967
refactor Picklers
davies Sep 17, 2014
e431377
fix cache of rdd, refactor
davies Sep 17, 2014
bd738ab
address comments
davies Sep 18, 2014
032cd62
add more type check and conversion for user_product
davies Sep 18, 2014
810f97f
fix equal of matrix
davies Sep 18, 2014
dffbba2
Merge branch 'master' of github.com:apache/spark into pickle_mllib
davies Sep 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
support unpickle array.array for Python 2.6
  • Loading branch information
davies committed Sep 11, 2014
commit 60e4e2f8e0897555af6e657967bd39c150fc4726
52 changes: 52 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.api.python

import java.nio.ByteOrder

import scala.collection.JavaConversions._
import scala.util.Failure
import scala.util.Try
Expand All @@ -28,6 +30,56 @@ import org.apache.spark.rdd.RDD

/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
// /* Description of types */
// static struct arraydescr descriptors[] = {
// {'c', sizeof(char), c_getitem, c_setitem},
// {'b', sizeof(char), b_getitem, b_setitem},
// {'B', sizeof(char), BB_getitem, BB_setitem},
// #ifdef Py_USING_UNICODE
// {'u', sizeof(Py_UNICODE), u_getitem, u_setitem},
// #endif
// {'h', sizeof(short), h_getitem, h_setitem},
// {'H', sizeof(short), HH_getitem, HH_setitem},
// {'i', sizeof(int), i_getitem, i_setitem},
// {'I', sizeof(int), II_getitem, II_setitem},
// {'l', sizeof(long), l_getitem, l_setitem},
// {'L', sizeof(long), LL_getitem, LL_setitem},
// {'f', sizeof(float), f_getitem, f_setitem},
// {'d', sizeof(double), d_getitem, d_setitem},
// {'\0', 0, 0, 0} /* Sentinel */
// };
// TODO: support Py_UNICODE with 2 bytes
// FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the
// machine code for float/double here to work arround it.
// we should fix this after Pyrolite fix them
val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21
)
} else {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20
)
}
override def construct(args: Array[Object]): Object = {
if (args.length == 1) {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
val data: String = args(1).asInstanceOf[String]
println(typecode, machineCodes(typecode), data.length, data.toList)
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
} else {
super.construct(args)
}
}
}

def initialize() = {
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
}

private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
val pickle = new Pickler
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
SparkContext._jvm.SerDeUtil.initialize()

if instance:
if (SparkContext._active_spark_context and
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,6 @@ def test_oldhadoop(self):
conf=input_conf).collect())
self.assertEqual(old_dataset, dict_data)

@unittest.skipIf(sys.version_info[:2] <= (2, 6), "Skipped on 2.6 until SPARK-2951 is fixed")
def test_newhadoop(self):
basepath = self.tempdir.name
# use custom ArrayWritable types and converters to handle arrays
Expand Down