Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
reviewer comments and moved more ser/de
  • Loading branch information
dorx committed Aug 13, 2014
commit de9cbbe3d0925cc9ec7680693415121ab1fad35a
Original file line number Diff line number Diff line change
Expand Up @@ -255,45 +255,6 @@ class PythonMLLibAPI extends Serializable {
ret
}

/** Unpack a Rating object from an array of bytes */
private def unpackRating(ratingBytes: Array[Byte]): Rating = {
val bb = ByteBuffer.wrap(ratingBytes)
bb.order(ByteOrder.nativeOrder())
val user = bb.getInt()
val product = bb.getInt()
val rating = bb.getDouble()
new Rating(user, product, rating)
}

/** Unpack a tuple of Ints from an array of bytes */
private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
val bb = ByteBuffer.wrap(tupleBytes)
bb.order(ByteOrder.nativeOrder())
val v1 = bb.getInt()
val v2 = bb.getInt()
(v1, v2)
}

/**
* Serialize a Rating object into an array of bytes.
* It can be deserialized using RatingDeserializer().
*
* @param rate the Rating object to serialize
* @return
*/
private[spark] def serializeRating(rate: Rating): Array[Byte] = {
val len = 3
val bytes = new Array[Byte](4 + 8 * len)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.putInt(len)
val db = bb.asDoubleBuffer()
db.put(rate.user.toDouble)
db.put(rate.product.toDouble)
db.put(rate.rating)
bytes
}

/**
* Java stub for Python mllib ALS.train(). This stub returns a handle
* to the Java object instead of the content of the Java object. Extra care
Expand All @@ -306,7 +267,7 @@ class PythonMLLibAPI extends Serializable {
iterations: Int,
lambda: Double,
blocks: Int): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
val ratings = ratingsBytesJRDD.rdd.map(SerDe.unpackRating)
ALS.train(ratings, rank, iterations, lambda, blocks)
}

Expand All @@ -323,7 +284,7 @@ class PythonMLLibAPI extends Serializable {
lambda: Double,
blocks: Int,
alpha: Double): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
val ratings = ratingsBytesJRDD.rdd.map(SerDe.unpackRating)
ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
}

Expand Down Expand Up @@ -405,7 +366,7 @@ class PythonMLLibAPI extends Serializable {
def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = {
val inputMatrix = X.rdd.map(SerDe.deserializeDoubleVector(_))
val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
SerDe.serializeDoubleMatrix(to2dArray(result))
SerDe.serializeDoubleMatrix(SerDe.to2dArray(result))
}

/**
Expand All @@ -422,12 +383,6 @@ class PythonMLLibAPI extends Serializable {
if (method == null) CorrelationNames.defaultCorrName else method
}

// Reformat a Matrix into Array[Array[Double]] for serialization
private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
val values = matrix.toArray
Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
}

// Used by the *RDD methods to get default seed if not passed in from pyspark
private def getSeedOrDefault(seed: java.lang.Long): Long = {
if (seed == null) Utils.random.nextLong else seed
Expand Down Expand Up @@ -528,6 +483,7 @@ class PythonMLLibAPI extends Serializable {
/**
* MultivariateStatisticalSummary with Vector fields serialized.
*/
@DeveloperApi
class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DeveloperApi?

extends Serializable {

Expand All @@ -547,13 +503,13 @@ class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisti
/**
* SerDe utility functions for PythonMLLibAPI.
*/
private object SerDe extends Serializable {
private[spark] object SerDe extends Serializable {
private val DENSE_VECTOR_MAGIC: Byte = 1
private val SPARSE_VECTOR_MAGIC: Byte = 2
private val DENSE_MATRIX_MAGIC: Byte = 3
private val LABELED_POINT_MAGIC: Byte = 4

def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
require(bytes.length - offset >= 5, "Byte array too short")
val magic = bytes(offset)
if (magic == DENSE_VECTOR_MAGIC) {
Expand All @@ -565,14 +521,14 @@ private object SerDe extends Serializable {
}
}

def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
require(bytes.length - offset == 8, "Wrong size byte array for Double")
val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
bb.order(ByteOrder.nativeOrder())
bb.getDouble
}

def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
val packetLength = bytes.length - offset
require(packetLength >= 5, "Byte array too short")
val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
Expand All @@ -587,7 +543,7 @@ private object SerDe extends Serializable {
Vectors.dense(ans)
}

def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
val packetLength = bytes.length - offset
require(packetLength >= 9, "Byte array too short")
val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
Expand Down Expand Up @@ -615,15 +571,15 @@ private object SerDe extends Serializable {
* The corresponding deserializer, deserializeDouble, needs to be modified as well if the
* serialization scheme changes.
*/
def serializeDouble(double: Double): Array[Byte] = {
private[python] def serializeDouble(double: Double): Array[Byte] = {
val bytes = new Array[Byte](8)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.putDouble(double)
bytes
}

def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
private[python] def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
val len = doubles.length
val bytes = new Array[Byte](5 + 8 * len)
val bb = ByteBuffer.wrap(bytes)
Expand All @@ -635,7 +591,7 @@ private object SerDe extends Serializable {
bytes
}

def serializeSparseVector(vector: SparseVector): Array[Byte] = {
private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] = {
val nonZeros = vector.indices.length
val bytes = new Array[Byte](9 + 12 * nonZeros)
val bb = ByteBuffer.wrap(bytes)
Expand All @@ -651,14 +607,14 @@ private object SerDe extends Serializable {
bytes
}

def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
case s: SparseVector =>
serializeSparseVector(s)
case _ =>
serializeDenseVector(vector.toArray)
}

def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
val packetLength = bytes.length
if (packetLength < 9) {
throw new IllegalArgumentException("Byte array too short.")
Expand All @@ -683,7 +639,7 @@ private object SerDe extends Serializable {
ans
}

def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
val rows = doubles.length
var cols = 0
if (rows > 0) {
Expand All @@ -702,7 +658,7 @@ private object SerDe extends Serializable {
bytes
}

def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
val fb = serializeDoubleVector(p.features)
val bytes = new Array[Byte](1 + 8 + fb.length)
val bb = ByteBuffer.wrap(bytes)
Expand All @@ -713,7 +669,7 @@ private object SerDe extends Serializable {
bytes
}

def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
require(bytes.length >= 9, "Byte array too short")
val magic = bytes(0)
if (magic != LABELED_POINT_MAGIC) {
Expand All @@ -724,4 +680,50 @@ private object SerDe extends Serializable {
val label = labelBytes.asDoubleBuffer().get(0)
LabeledPoint(label, deserializeDoubleVector(bytes, 9))
}

// Reformat a Matrix into Array[Array[Double]] for serialization
private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
val values = matrix.toArray
Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
}


/** Unpack a Rating object from an array of bytes */
private[python] def unpackRating(ratingBytes: Array[Byte]): Rating = {
val bb = ByteBuffer.wrap(ratingBytes)
bb.order(ByteOrder.nativeOrder())
val user = bb.getInt()
val product = bb.getInt()
val rating = bb.getDouble()
new Rating(user, product, rating)
}

/** Unpack a tuple of Ints from an array of bytes */
def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
val bb = ByteBuffer.wrap(tupleBytes)
bb.order(ByteOrder.nativeOrder())
val v1 = bb.getInt()
val v2 = bb.getInt()
(v1, v2)
}

/**
* Serialize a Rating object into an array of bytes.
* It can be deserialized using RatingDeserializer().
*
* @param rate the Rating object to serialize
* @return
*/
def serializeRating(rate: Rating): Array[Byte] = {
val len = 3
val bytes = new Array[Byte](4 + 8 * len)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.putInt(len)
val db = bb.asDoubleBuffer()
db.put(rate.user.toDouble)
db.put(rate.product.toDouble)
db.put(rate.rating)
bytes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.api.python.PythonMLLibAPI
import org.apache.spark.mllib.api.python.SerDe

/**
* Model representing the result of matrix factorization.
Expand Down Expand Up @@ -117,9 +117,8 @@ class MatrixFactorizationModel private[mllib] (
*/
@DeveloperApi
def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
val pythonAPI = new PythonMLLibAPI()
val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes))
predict(usersProducts).map(rate => pythonAPI.serializeRating(rate))
val usersProducts = usersProductsJRDD.rdd.map(xBytes => SerDe.unpackTuple(xBytes))
predict(usersProducts).map(rate => SerDe.serializeRating(rate))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint

class PythonMLLibAPISuite extends FunSuite {
val py = new PythonMLLibAPI

test("vector serialization") {
val vectors = Seq(
Expand All @@ -34,8 +33,8 @@ class PythonMLLibAPISuite extends FunSuite {
Vectors.sparse(1, Array.empty[Int], Array.empty[Double]),
Vectors.sparse(2, Array(1), Array(-2.0)))
vectors.foreach { v =>
val bytes = py.serializeDoubleVector(v)
val u = py.deserializeDoubleVector(bytes)
val bytes = SerDe.serializeDoubleVector(v)
val u = SerDe.deserializeDoubleVector(bytes)
assert(u.getClass === v.getClass)
assert(u === v)
}
Expand All @@ -50,8 +49,8 @@ class PythonMLLibAPISuite extends FunSuite {
LabeledPoint(1.0, Vectors.sparse(1, Array.empty[Int], Array.empty[Double])),
LabeledPoint(-0.5, Vectors.sparse(2, Array(1), Array(-2.0))))
points.foreach { p =>
val bytes = py.serializeLabeledPoint(p)
val q = py.deserializeLabeledPoint(bytes)
val bytes = SerDe.serializeLabeledPoint(p)
val q = SerDe.deserializeLabeledPoint(bytes)
assert(q.label === p.label)
assert(q.features.getClass === p.features.getClass)
assert(q.features === p.features)
Expand All @@ -60,8 +59,8 @@ class PythonMLLibAPISuite extends FunSuite {

test("double serialization") {
for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue, Double.NaN)) {
val bytes = py.serializeDouble(x)
val deser = py.deserializeDouble(bytes)
val bytes = SerDe.serializeDouble(x)
val deser = SerDe.deserializeDouble(bytes)
// We use `equals` here for comparison because we cannot use `==` for NaN
assert(x.equals(deser))
}
Expand All @@ -70,14 +69,14 @@ class PythonMLLibAPISuite extends FunSuite {
test("matrix to 2D array") {
val values = Array[Double](0, 1.2, 3, 4.56, 7, 8)
val matrix = Matrices.dense(2, 3, values)
val arr = py.to2dArray(matrix)
val arr = SerDe.to2dArray(matrix)
val expected = Array(Array[Double](0, 3, 7), Array[Double](1.2, 4.56, 8))
assert(arr === expected)

// Test conversion for empty matrix
val empty = Array[Double]()
val emptyMatrix = Matrices.dense(0, 0, empty)
val empty2D = py.to2dArray(emptyMatrix)
val empty2D = SerDe.to2dArray(emptyMatrix)
assert(empty2D === Array[Array[Double]]())
}
}
1 change: 0 additions & 1 deletion python/pyspark/mllib/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def colStats(X):
cStats = sc._jvm.PythonMLLibAPI().colStats(Xser._jrdd)
return MultivariateStatisticalSummary(sc, cStats)


@staticmethod
def corr(x, y=None, method=None):
"""
Expand Down