Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
60e4e2f
support unpickle array.array for Python 2.6
davies Sep 11, 2014
c77c87b
cleanup debugging code
davies Sep 13, 2014
3908f5c
Merge branch 'master' into pickle
davies Sep 13, 2014
f44f771
enable tests about array
davies Sep 13, 2014
b30ef35
use pickle to serialize data for mllib/recommendation
davies Sep 13, 2014
52d1350
use new protocol in mllib/stat
davies Sep 13, 2014
f1544c4
refactor clustering
davies Sep 13, 2014
aa2287e
random
davies Sep 13, 2014
8fe166a
Merge branch 'pickle' into pickle_mllib
davies Sep 13, 2014
cccb8b1
mllib/tree
davies Sep 14, 2014
d9f691f
mllib/util
davies Sep 14, 2014
f2a0856
mllib/regression
davies Sep 14, 2014
c383544
classification
davies Sep 14, 2014
6d26b03
fix tests
davies Sep 14, 2014
4d7963e
remove muanlly serialization
davies Sep 14, 2014
84c721d
Merge branch 'master' into pickle_mllib
davies Sep 14, 2014
b02e34f
remove _common.py
davies Sep 14, 2014
0ee1525
remove outdated tests
davies Sep 14, 2014
722dd96
cleanup _common.py
davies Sep 15, 2014
f3506c5
Merge branch 'master' into pickle_mllib
davies Sep 16, 2014
df19464
memorize the module and class name during pickleing
davies Sep 16, 2014
46a501e
choose batch size automatically
davies Sep 16, 2014
88034f0
rafactor, address comments
davies Sep 16, 2014
9dcfb63
fix style
davies Sep 16, 2014
708dc02
fix tests
davies Sep 16, 2014
e1d1bfc
refactor
davies Sep 16, 2014
44736d7
speed up pickling array in Python 2.7
davies Sep 16, 2014
154d141
fix autobatchedpickler
davies Sep 16, 2014
df625c7
Merge commit '154d141' into pickle_mllib
davies Sep 16, 2014
a379a81
fix pickle array in python2.7
davies Sep 16, 2014
44e0551
fix cache
davies Sep 16, 2014
9ceff73
test size of serialized Rating
davies Sep 16, 2014
a2cc855
fix tests
davies Sep 16, 2014
1fccf1a
address comments
davies Sep 16, 2014
2511e76
cleanup
davies Sep 16, 2014
19d0967
refactor Picklers
davies Sep 17, 2014
e431377
fix cache of rdd, refactor
davies Sep 17, 2014
bd738ab
address comments
davies Sep 18, 2014
032cd62
add more type check and conversion for user_product
davies Sep 18, 2014
810f97f
fix equal of matrix
davies Sep 18, 2014
dffbba2
Merge branch 'master' of github.com:apache/spark into pickle_mllib
davies Sep 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use pickle to serialize data for mllib/recommendation
  • Loading branch information
davies committed Sep 13, 2014
commit b30ef35ec7830cee08b4f8d692da26d98cac70e8
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,8 @@ private[spark] object PythonRDD extends Logging {
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
jRDD.rdd.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
pickle.dumps(row)
iter.grouped(1024).map { rows =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we divide groups based on the serialized size?

pickle.dumps(rows.toArray)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@

package org.apache.spark.mllib.api.python

import java.io.OutputStream
import java.nio.{ByteBuffer, ByteOrder}

import scala.collection.JavaConverters._

import net.razorvine.pickle.{Pickler, Unpickler, IObjectConstructor, IObjectPickler, PickleException, Opcodes}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use _


import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.random.{RandomRDDs => RG}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
Expand Down Expand Up @@ -262,12 +265,12 @@ class PythonMLLibAPI extends Serializable {
* the Py4J documentation.
*/
def trainALSModel(
ratingsBytesJRDD: JavaRDD[Array[Byte]],
ratingsJRDD: JavaRDD[Object],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(SerDe.unpackRating)
val ratings = ratingsJRDD.rdd.map(_.asInstanceOf[Rating])
ALS.train(ratings, rank, iterations, lambda, blocks)
}

Expand All @@ -278,13 +281,13 @@ class PythonMLLibAPI extends Serializable {
* exit; see the Py4J documentation.
*/
def trainImplicitALSModel(
ratingsBytesJRDD: JavaRDD[Array[Byte]],
ratingsJRDD: JavaRDD[Object],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(SerDe.unpackRating)
val ratings = ratingsJRDD.rdd.map(_.asInstanceOf[Rating])
ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
}

Expand Down Expand Up @@ -510,6 +513,129 @@ private[spark] object SerDe extends Serializable {
private val DENSE_MATRIX_MAGIC: Byte = 3
private val LABELED_POINT_MAGIC: Byte = 4

private[python] def reduce_object(out: OutputStream, pickler: Pickler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use camelCase for method names

module: String, name: String, objects: Object*) = {
out.write(Opcodes.GLOBAL)
out.write((module + "\n" + name + "\n").getBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it increase the storage cost by a lot for small objects?

out.write(Opcodes.MARK)
objects.foreach(pickler.save(_))
out.write(Opcodes.TUPLE)
out.write(Opcodes.REDUCE)
}

private[python] class DenseVectorPickler extends IObjectPickler {
def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
val vector: DenseVector = obj.asInstanceOf[DenseVector]
reduce_object(out, pickler, "pyspark.mllib.linalg", "DenseVector", vector.toArray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: what is the cost of using class names?

}
}

private[python] class DenseVectorConstructor extends IObjectConstructor {
def construct(args: Array[Object]) :Object = {
require(args.length == 1)
new DenseVector(args(0).asInstanceOf[Array[Double]])
}
}

private[python] class DenseMatrixPickler extends IObjectPickler {
def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
reduce_object(out, pickler, "pyspark.mllib.linalg", "DenseMatrix",
m.numRows.asInstanceOf[Object], m.numCols.asInstanceOf[Object], m.values)
}
}

private[python] class DenseMatrixConstructor extends IObjectConstructor {
def construct(args: Array[Object]) :Object = {
require(args.length == 3)
new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
args(2).asInstanceOf[Array[Double]])
}
}

private[python] class SparseVectorPickler extends IObjectPickler {
def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
val v: SparseVector = obj.asInstanceOf[SparseVector]
reduce_object(out, pickler, "pyspark.mllib.linalg", "SparseVector",
v.size.asInstanceOf[Object], v.indices, v.values)
}
}

private[python] class SparseVectorConstructor extends IObjectConstructor {
def construct(args: Array[Object]) :Object = {
require(args.length == 3)
new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
args(2).asInstanceOf[Array[Double]])
}
}

private[python] class LabeledPointPickler extends IObjectPickler {
def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
reduce_object(out, pickler, "pyspark.mllib.regression", "LabeledPoint",
point.label.asInstanceOf[Object], point.features)
}
}

private[python] class LabeledPointConstructor extends IObjectConstructor {
def construct(args: Array[Object]) :Object = {
if (args.length != 2) {
throw new PickleException("should be 2")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use consistent Exception type. (In some other places, require() is used instead.)

}
new LabeledPoint(args(0).asInstanceOf[Double], args(1).asInstanceOf[Vector])
}
}

/**
* Pickle Rating
*/
private[python] class RatingPickler extends IObjectPickler {
def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
val rating: Rating = obj.asInstanceOf[Rating]
reduce_object(out, pickler, "pyspark.mllib.recommendation", "Rating",
rating.user.asInstanceOf[Object], rating.product.asInstanceOf[Object],
rating.rating.asInstanceOf[Object])
}
}

/**
* Unpickle Rating
*/
private[python] class RatingConstructor extends IObjectConstructor {
def construct(args: Array[Object]) :Object = {
if (args.length != 3) {
throw new PickleException("should be 3")
}
new Rating(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
args(2).asInstanceOf[Double])
}
}

def initialize() = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add return type explicitly.

Pickler.registerCustomPickler(classOf[DenseVector], new DenseVectorPickler)
Pickler.registerCustomPickler(classOf[DenseMatrix], new DenseMatrixPickler)
Pickler.registerCustomPickler(classOf[SparseVector], new SparseVectorPickler)
Pickler.registerCustomPickler(classOf[LabeledPoint], new LabeledPointPickler)
Pickler.registerCustomPickler(classOf[Rating], new RatingPickler)
Unpickler.registerConstructor("pyspark.mllib.linalg", "DenseVector",
new DenseVectorConstructor)
Unpickler.registerConstructor("pyspark.mllib.linalg", "DenseMatrix",
new DenseMatrixConstructor)
Unpickler.registerConstructor("pyspark.mllib.linalg", "SparseVector",
new SparseVectorConstructor)
Unpickler.registerConstructor("pyspark.mllib.regression", "LabeledPoint",
new LabeledPointConstructor)
Unpickler.registerConstructor("pyspark.mllib.recommendation", "Rating", new RatingConstructor)
}

private[python] def dumps(obj: AnyRef): Array[Byte] = {
new Pickler().dumps(obj)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python, dumps and loads deal with strings. Do we have load and dump defined in Pickler and Unpickler?

}

private[python] def loads(bytes: Array[Byte]): AnyRef = {
new Unpickler().loads(bytes)
}

private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
require(bytes.length - offset >= 5, "Byte array too short")
val magic = bytes(offset)
Expand Down Expand Up @@ -688,43 +814,8 @@ private[spark] object SerDe extends Serializable {
Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
}


/** Unpack a Rating object from an array of bytes */
private[python] def unpackRating(ratingBytes: Array[Byte]): Rating = {
val bb = ByteBuffer.wrap(ratingBytes)
bb.order(ByteOrder.nativeOrder())
val user = bb.getInt()
val product = bb.getInt()
val rating = bb.getDouble()
new Rating(user, product, rating)
}

/** Unpack a tuple of Ints from an array of bytes */
def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
val bb = ByteBuffer.wrap(tupleBytes)
bb.order(ByteOrder.nativeOrder())
val v1 = bb.getInt()
val v2 = bb.getInt()
(v1, v2)
}

/**
* Serialize a Rating object into an array of bytes.
* It can be deserialized using RatingDeserializer().
*
* @param rate the Rating object to serialize
* @return
*/
def serializeRating(rate: Rating): Array[Byte] = {
val len = 3
val bytes = new Array[Byte](4 + 8 * len)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.putInt(len)
val db = bb.asDoubleBuffer()
db.put(rate.user.toDouble)
db.put(rate.product.toDouble)
db.put(rate.rating)
bytes
/* convert object into Tuple */
def asTupleRDD(rdd: RDD[Array[Object]]): RDD[(Int, Int)] = {
rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int]))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])
override def toArray: Array[Double] = values

private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values)

override def equals(o: Any) = o match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not check values? Even though that would be expensive, it should be necessary to match the expected behavior of equals().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

case that: DenseMatrix =>
that.numRows == numRows && that.numCols == numCols
case _ => false
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,4 @@ class MatrixFactorizationModel private[mllib] (
}
scored.top(num)(Ordering.by(_._2))
}

/**
* :: DeveloperApi ::
* Predict the rating of many users for many products.
* This is a Java stub for python predictAll()
*
* @param usersProductsJRDD A JavaRDD with serialized tuples (user, product)
* @return JavaRDD of serialized Rating objects.
*/
@DeveloperApi
def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
val usersProducts = usersProductsJRDD.rdd.map(xBytes => SerDe.unpackTuple(xBytes))
predict(usersProducts).map(rate => SerDe.serializeRating(rate))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,29 @@ package org.apache.spark.mllib.api.python

import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.recommendation.Rating

class PythonMLLibAPISuite extends FunSuite {

SerDe.initialize()

test("pickle vector") {
val vectors = Seq(
Vectors.dense(Array.empty[Double]),
Vectors.dense(0.0),
Vectors.dense(0.0, -2.0),
Vectors.sparse(0, Array.empty[Int], Array.empty[Double]),
Vectors.sparse(1, Array.empty[Int], Array.empty[Double]),
Vectors.sparse(2, Array(1), Array(-2.0)))
vectors.foreach { v =>
val u = SerDe.loads(SerDe.dumps(v))
assert(u.getClass === v.getClass)
assert(u === v)
}
}

test("vector serialization") {
val vectors = Seq(
Vectors.dense(Array.empty[Double]),
Expand All @@ -40,6 +58,22 @@ class PythonMLLibAPISuite extends FunSuite {
}
}

test("pickle labeled point") {
val points = Seq(
LabeledPoint(0.0, Vectors.dense(Array.empty[Double])),
LabeledPoint(1.0, Vectors.dense(0.0)),
LabeledPoint(-0.5, Vectors.dense(0.0, -2.0)),
LabeledPoint(0.0, Vectors.sparse(0, Array.empty[Int], Array.empty[Double])),
LabeledPoint(1.0, Vectors.sparse(1, Array.empty[Int], Array.empty[Double])),
LabeledPoint(-0.5, Vectors.sparse(2, Array(1), Array(-2.0))))
points.foreach { p =>
val q = SerDe.loads(SerDe.dumps(p)).asInstanceOf[LabeledPoint]
assert(q.label === p.label)
assert(q.features.getClass === p.features.getClass)
assert(q.features === p.features)
}
}

test("labeled point serialization") {
val points = Seq(
LabeledPoint(0.0, Vectors.dense(Array.empty[Double])),
Expand All @@ -57,6 +91,14 @@ class PythonMLLibAPISuite extends FunSuite {
}
}

test("pickle double") {
for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue, Double.NaN)) {
val deser = SerDe.loads(SerDe.dumps(x.asInstanceOf[AnyRef])).asInstanceOf[Double]
// We use `equals` here for comparison because we cannot use `==` for NaN
assert(x.equals(deser))
}
}

test("double serialization") {
for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue, Double.NaN)) {
val bytes = SerDe.serializeDouble(x)
Expand All @@ -66,6 +108,19 @@ class PythonMLLibAPISuite extends FunSuite {
}
}

test("pickle matrix") {
val values = Array[Double](0, 1.2, 3, 4.56, 7, 8)
val matrix = Matrices.dense(2, 3, values)
val nm = SerDe.loads(SerDe.dumps(matrix)).asInstanceOf[DenseMatrix]
assert(matrix == nm)

// Test conversion for empty matrix
val empty = Array[Double]()
val emptyMatrix = Matrices.dense(0, 0, empty)
val ne = SerDe.loads(SerDe.dumps(emptyMatrix)).asInstanceOf[DenseMatrix]
assert(emptyMatrix == ne)
}

test("matrix to 2D array") {
val values = Array[Double](0, 1.2, 3, 4.56, 7, 8)
val matrix = Matrices.dense(2, 3, values)
Expand All @@ -79,4 +134,10 @@ class PythonMLLibAPISuite extends FunSuite {
val empty2D = SerDe.to2dArray(emptyMatrix)
assert(empty2D === Array[Array[Double]]())
}

test("pickle rating") {
val rat = new Rating(1, 2, 3.0)
val rat2 = SerDe.loads(SerDe.dumps(rat)).asInstanceOf[Rating]
assert(rat == rat2)
}
}
1 change: 1 addition & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
SparkContext._jvm.SerDe.initialize()

if instance:
if (SparkContext._active_spark_context and
Expand Down
Loading