Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
60e4e2f
support unpickle array.array for Python 2.6
davies Sep 11, 2014
c77c87b
cleanup debugging code
davies Sep 13, 2014
3908f5c
Merge branch 'master' into pickle
davies Sep 13, 2014
f44f771
enable tests about array
davies Sep 13, 2014
b30ef35
use pickle to serialize data for mllib/recommendation
davies Sep 13, 2014
52d1350
use new protocol in mllib/stat
davies Sep 13, 2014
f1544c4
refactor clustering
davies Sep 13, 2014
aa2287e
random
davies Sep 13, 2014
8fe166a
Merge branch 'pickle' into pickle_mllib
davies Sep 13, 2014
cccb8b1
mllib/tree
davies Sep 14, 2014
d9f691f
mllib/util
davies Sep 14, 2014
f2a0856
mllib/regression
davies Sep 14, 2014
c383544
classification
davies Sep 14, 2014
6d26b03
fix tests
davies Sep 14, 2014
4d7963e
remove muanlly serialization
davies Sep 14, 2014
84c721d
Merge branch 'master' into pickle_mllib
davies Sep 14, 2014
b02e34f
remove _common.py
davies Sep 14, 2014
0ee1525
remove outdated tests
davies Sep 14, 2014
722dd96
cleanup _common.py
davies Sep 15, 2014
f3506c5
Merge branch 'master' into pickle_mllib
davies Sep 16, 2014
df19464
memorize the module and class name during pickleing
davies Sep 16, 2014
46a501e
choose batch size automatically
davies Sep 16, 2014
88034f0
rafactor, address comments
davies Sep 16, 2014
9dcfb63
fix style
davies Sep 16, 2014
708dc02
fix tests
davies Sep 16, 2014
e1d1bfc
refactor
davies Sep 16, 2014
44736d7
speed up pickling array in Python 2.7
davies Sep 16, 2014
154d141
fix autobatchedpickler
davies Sep 16, 2014
df625c7
Merge commit '154d141' into pickle_mllib
davies Sep 16, 2014
a379a81
fix pickle array in python2.7
davies Sep 16, 2014
44e0551
fix cache
davies Sep 16, 2014
9ceff73
test size of serialized Rating
davies Sep 16, 2014
a2cc855
fix tests
davies Sep 16, 2014
1fccf1a
address comments
davies Sep 16, 2014
2511e76
cleanup
davies Sep 16, 2014
19d0967
refactor Picklers
davies Sep 17, 2014
e431377
fix cache of rdd, refactor
davies Sep 17, 2014
bd738ab
address comments
davies Sep 18, 2014
032cd62
add more type check and conversion for user_product
davies Sep 18, 2014
810f97f
fix equal of matrix
davies Sep 18, 2014
dffbba2
Merge branch 'master' of github.com:apache/spark into pickle_mllib
davies Sep 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor Picklers
  • Loading branch information
davies committed Sep 17, 2014
commit 19d096783b60e741173f48f2944d91f650616140
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

import scala.reflect.ClassTag
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports out of order.


/**
* :: DeveloperApi ::
* The Java stubs necessary for the Python mllib bindings.
Expand Down Expand Up @@ -447,42 +449,57 @@ class PythonMLLibAPI extends Serializable {
*/
private[spark] object SerDe extends Serializable {

val PYSPARK_PACKAGE = "pyspark.mllib"

/**
* Base class used for pickle
* @param module the module of class in pyspark.mllib
* @param cls the class
*/
private[python] abstract case class BasePickler(module: String, cls: Class[_])
private[python] abstract class BasePickler[T: ClassTag]
extends IObjectPickler with IObjectConstructor {

def name = cls.getSimpleName
private val cls = implicitly[ClassTag[T]].runtimeClass
private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4)
private val name = cls.getSimpleName

// register this to Pickler and Unpickler
def register(): Unit = {
Pickler.registerCustomPickler(this.getClass, this)
Pickler.registerCustomPickler(cls, this)
Unpickler.registerConstructor(module, name, this)
}

def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
pickler.save(this) // it will be memorized by Pickler
saveState(obj, out, pickler)
out.write(Opcodes.REDUCE)
if (obj == this) {
out.write(Opcodes.GLOBAL)
out.write((module + "\n" + name + "\n").getBytes())
} else {
pickler.save(this) // it will be memorized by Pickler
saveState(obj, out, pickler)
out.write(Opcodes.REDUCE)
}
}

private[python] def saveObjects(out: OutputStream, pickler: Pickler,
objects: Any*) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can fit def on 1 line.

out.write(Opcodes.MARK)
if (objects.length == 0 || objects.length > 3) {
out.write(Opcodes.MARK)
}
objects.foreach(pickler.save(_))
out.write(Opcodes.TUPLE)
val code = objects.length match {
case 1 => Opcodes.TUPLE1
case 2 => Opcodes.TUPLE2
case 3 => Opcodes.TUPLE3
case _ => Opcodes.TUPLE
}
out.write(code)
}

def saveState(obj: Object, out: OutputStream, pickler: Pickler)
}

private[python] class MetaPickler extends IObjectPickler {
def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
out.write(Opcodes.GLOBAL)
val cons = obj.asInstanceOf[BasePickler]
out.write((cons.module + "\n" + cons.name + "\n").getBytes())
}
private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler)
}

// Pickler for DenseVector
private[python] class DenseVectorPickler
extends BasePickler("pyspark.mllib.linalg", classOf[DenseVector]) {
extends BasePickler[DenseVector] {

def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
val vector: DenseVector = obj.asInstanceOf[DenseVector]
Expand All @@ -495,8 +512,9 @@ private[spark] object SerDe extends Serializable {
}
}

// Pickler for DenseMatrix
private[python] class DenseMatrixPickler
extends BasePickler("pyspark.mllib.linalg", classOf[DenseMatrix]) {
extends BasePickler[DenseMatrix] {

def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
Expand All @@ -510,8 +528,9 @@ private[spark] object SerDe extends Serializable {
}
}

// Pickler for SparseVector
private[python] class SparseVectorPickler
extends BasePickler("pyspark.mllib.linalg", classOf[SparseVector]) {
extends BasePickler[SparseVector] {

def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
val v: SparseVector = obj.asInstanceOf[SparseVector]
Expand All @@ -525,8 +544,9 @@ private[spark] object SerDe extends Serializable {
}
}

// Pickler for LabeledPoint
private[python] class LabeledPointPickler
extends BasePickler("pyspark.mllib.regression", classOf[LabeledPoint]) {
extends BasePickler[LabeledPoint] {

def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
Expand All @@ -541,11 +561,9 @@ private[spark] object SerDe extends Serializable {
}
}

/**
* Pickle Rating
*/
// Pickler for Rating
private[python] class RatingPickler
extends BasePickler("pyspark.mllib.recommendation", classOf[Rating]) {
extends BasePickler[Rating] {

def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
val rating: Rating = obj.asInstanceOf[Rating]
Expand All @@ -561,18 +579,12 @@ private[spark] object SerDe extends Serializable {
}
}

def registerPickler[T](pickler: BasePickler): Unit = {
Pickler.registerCustomPickler(pickler.getClass, new MetaPickler)
Pickler.registerCustomPickler(pickler.cls, pickler)
Unpickler.registerConstructor(pickler.module, pickler.name, pickler)
}

def initialize(): Unit = {
registerPickler(new DenseVectorPickler)
registerPickler(new DenseMatrixPickler)
registerPickler(new SparseVectorPickler)
registerPickler(new LabeledPointPickler)
registerPickler(new RatingPickler)
new DenseVectorPickler().register()
new DenseMatrixPickler().register()
new SparseVectorPickler().register()
new LabeledPointPickler().register()
new RatingPickler().register()
}

def dumps(obj: AnyRef): Array[Byte] = {
Expand Down