Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
d86325f
Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop …
MLnick Dec 9, 2013
4b0a43f
Refactoring utils into own objects. Cleaning up old commented-out code
MLnick Dec 12, 2013
c304cc8
Adding supporting sequncefiles for tests. Cleaning up
MLnick Dec 15, 2013
4e7c9e3
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Dec 15, 2013
818a1e6
Add seqencefile and Hadoop InputFormat support to PythonRDD
MLnick Dec 15, 2013
4294cbb
Add old Hadoop api methods. Clean up and expand comments. Clean up ar…
MLnick Dec 19, 2013
0f5cd84
Remove unused pair UTF8 class. Add comments to msgpack deserializer
MLnick Dec 19, 2013
f1d73e3
mergeConfs returns a copy rather than mutating one of the input argum…
MLnick Dec 19, 2013
4d7ef2e
Fix indentation
MLnick Dec 19, 2013
eb40036
Remove unused comment lines
MLnick Dec 19, 2013
1c8efbc
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jan 13, 2014
619c0fa
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jan 20, 2014
703ee65
Add back msgpack
MLnick Jan 20, 2014
174f520
Add back graphx settings
MLnick Jan 20, 2014
795a763
Change name to WriteInputFormatTestDataGenerator. Cleanup some var na…
MLnick Jan 20, 2014
2beeedb
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Feb 8, 2014
97ef708
Remove old writeToStream
MLnick Feb 14, 2014
41856a5
Merge branch 'master' into pyspark-inputformats
MLnick Mar 19, 2014
f2d76a0
Merge branch 'master' into pyspark-inputformats
MLnick Mar 19, 2014
e67212a
Add back msgpack dependency
MLnick Mar 19, 2014
dd57922
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Apr 10, 2014
d72bf18
msgpack
MLnick Apr 10, 2014
0c612e5
Merge branch 'master' into pyspark-inputformats
MLnick Apr 12, 2014
65360d5
Adding test SequenceFiles
MLnick Apr 18, 2014
25da1ca
Add generator for nulls, bools, bytes and maps
MLnick Apr 18, 2014
7237263
Add back msgpack serializer and hadoop file code lost during merging
MLnick Apr 18, 2014
a67dfad
Clean up Msgpack serialization and registering
MLnick Apr 18, 2014
1bbbfb0
Clean up SparkBuild from merge
MLnick Apr 18, 2014
9d2256e
Merge branch 'master' into pyspark-inputformats
MLnick Apr 18, 2014
f6aac55
Bring back msgpack
MLnick Apr 18, 2014
951c117
Merge branch 'master' into pyspark-inputformats
MLnick Apr 19, 2014
b20ec7e
Clean up merge duplicate dependencies
MLnick Apr 19, 2014
4e08983
Clean up docs for PySpark context methods
MLnick Apr 19, 2014
fc5099e
Add Apache license headers
MLnick Apr 19, 2014
31a2fff
Scalastyle fixes
MLnick Apr 21, 2014
450e0a2
Merge branch 'master' into pyspark-inputformats
MLnick Apr 21, 2014
f60959e
Remove msgpack dependency and serializer from PySpark
MLnick Apr 21, 2014
17a656b
remove binary sequencefile for tests
MLnick Apr 21, 2014
1d7c17c
Amend tests to auto-generate sequencefile data in temp dir
MLnick Apr 21, 2014
c0ebfb6
Change sequencefile test data generator to easily be called from PySp…
MLnick Apr 21, 2014
44f2857
Remove msgpack dependency and switch serialization to Pyrolite, plus …
MLnick Apr 21, 2014
e7552fa
Merge branch 'master' into pyspark-inputformats
MLnick Apr 22, 2014
64eb051
Scalastyle fix
MLnick Apr 22, 2014
78978d9
Add doc for SequenceFile and InputFormat support to Python programmin…
MLnick Apr 22, 2014
e001b94
Fix test failures due to ordering
MLnick Apr 23, 2014
bef3afb
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Apr 23, 2014
35b8e3a
Another fix for test ordering
MLnick Apr 23, 2014
5af4770
Merge branch 'master' into pyspark-inputformats
MLnick Apr 23, 2014
077ecb2
Recover earlier changes lost in previous merge for context.py
MLnick Apr 23, 2014
9ef1896
Recover earlier changes lost in previous merge for serializers.py
MLnick Apr 23, 2014
93ef995
Add back context.py changes
MLnick Apr 23, 2014
7caa73a
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick May 23, 2014
d0f52b6
Python programming guide
MLnick May 23, 2014
84fe8e3
Python programming guide space formatting
MLnick May 23, 2014
9fe6bd5
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick May 31, 2014
15a7d07
Remove default args for key/value classes. Arg names to camelCase
MLnick Jun 3, 2014
01e0813
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jun 3, 2014
1a4a1d6
Address @mateiz style comments
MLnick Jun 3, 2014
94beedc
Clean up args in PythonRDD. Set key/value converter defaults to None …
MLnick Jun 3, 2014
43eb728
PySpark InputFormats docs into programming guide
MLnick Jun 3, 2014
085b55f
Move input format tests to tests.py and clean up docs
MLnick Jun 3, 2014
5757f6e
Default key/value classes for sequenceFile asre None
MLnick Jun 3, 2014
b65606f
Add converter interface
MLnick Jun 4, 2014
2c18513
Add examples for reading HBase and Cassandra InputFormats from Python
MLnick Jun 4, 2014
3f90c3e
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jun 4, 2014
1eaa08b
HBase -> Cassandra app name oversight
MLnick Jun 4, 2014
eeb8205
Fix path relative to SPARK_HOME in tests
MLnick Jun 4, 2014
365d0be
Make classes private[python]. Add docs and @Experimental annotation t…
MLnick Jun 5, 2014
a985492
Move Converter examples to own package
MLnick Jun 5, 2014
5ebacfa
Update docs for PySpark input formats
MLnick Jun 5, 2014
cde6af9
Parameterize converter trait
MLnick Jun 6, 2014
d150431
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jun 6, 2014
4c972d8
Add license headers
MLnick Jun 6, 2014
761269b
Address @pwendell comments, simplify default writable conversions and…
MLnick Jun 7, 2014
268df7e
Documentation changes mer @pwendell comments
MLnick Jun 8, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactoring utils into own objects. Cleaning up old commented-out code
  • Loading branch information
MLnick committed Dec 12, 2013
commit 4b0a43fa2236ed997a5f9bd273151f21c1cc23f8
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.spark.api.python

import org.apache.hadoop.conf.Configuration

/**
* Utilities for working with Python objects -> Hadoop-related objects
*/
private[python] object PythonHadoopUtil {

def mapToConf(map: java.util.HashMap[String, String]) = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/* Merges two configurations, keys from right overwrite any matching keys in left */
def mergeConfs(left: Configuration, right: Configuration) = {
import collection.JavaConversions._
right.iterator().foreach(entry => left.set(entry.getKey, entry.getValue))
left
}

}
171 changes: 7 additions & 164 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,51 +281,7 @@ private[spark] object PythonRDD extends Logging {

// PySpark / Hadoop InputFormat stuff

def register[T](clazz: Class[T], msgpack: ScalaMessagePack) = {
Try {
if (!clazz.isPrimitive) msgpack.register(clazz)
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. " +
"Falling back to default MsgPack serialization, or 'toString' as last resort".format(clazz.toString)))
}

// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
import org.msgpack.ScalaMessagePack._
val msgpack = new ScalaMessagePack with Serializable
val first = rdd.first()
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
register(kc, msgpack)
register(vc, msgpack)
/*
Try {
if (!kc.isPrimitive) msgpack.register(kc)
if (!vc.isPrimitive) msgpack.register(vc)
} match {
case Failure(err) => log.warn(("Failed to register key/value class (%s/%s) with MsgPack. " +
"Falling back to default MsgPack serialization, or 'toString' as last resort. " +
"Exception: %s").format(kc, vc, err.getMessage))
}
*/
rdd.map{ pair =>
Try {
msgpack.write(pair)
} match {
case Failure(err) =>
Try {
write((pair._1.toString, pair._2.toString))
} match {
case Success(result) => result
case Failure(e) => throw e
}
case Success(result) => result

}
//write(_)
}
}

// SequenceFile converted to Text and then to String
// SequenceFile
def sequenceFile[K ,V](sc: JavaSparkContext,
path: String,
keyClass: String,
Expand All @@ -339,37 +295,8 @@ private[spark] object PythonRDD extends Logging {
val vc = vcm.erasure.asInstanceOf[Class[V]]

val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
//JavaRDD.fromRDD(
// .map{ case (a, b) => (a.toString, b.toString) }.map(stuff => write(stuff)))
}

/*
def sequenceFile[K, V](sc: JavaSparkContext,
path: String,
keyWrapper: String,
valueWrapper: String,
minSplits: Int): JavaRDD[Array[Byte]] = {
val rdd = sc.sc.sequenceFile(path, classOf[Any], classOf[Any], minSplits)
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
//sequenceFile(sc, path, "java.lang.String", "java.lang.String", keyWrapper, valueWrapper, minSplits)
}
*/

def mapToConf(map: java.util.HashMap[String, String]) = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/* Merges two configurations, keys from right overwrite any matching keys in left */
def mergeConfs(left: Configuration, right: Configuration) = {
import collection.JavaConversions._
right.iterator().foreach(entry => left.set(entry.getKey, entry.getValue))
left
val converted = SerDeUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

// Arbitrary Hadoop InputFormat, key class and value class
Expand All @@ -381,19 +308,14 @@ private[spark] object PythonRDD extends Logging {
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = mapToConf(confAsMap)
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = mergeConfs(baseConf, conf)
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
newHadoopFileFromClassNames[K, V, F](sc,
path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper, mergedConf)
//.map{ case (k, v) => (k.toString, v.toString) }
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
//JavaPairRDD.fromRDD(
// newHadoopFileFromClassNames(sc, path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper)
// .map(new PairFunction[(K, V), String, String] { def call(t: (K, V)) = (t._1.toString, t._2.toString) } )
//)
val converted = SerDeUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
Expand All @@ -413,82 +335,6 @@ private[spark] object PythonRDD extends Logging {
sc.sc.newAPIHadoopFile(path, fc, kc, vc, conf)
}

/*
private def sequenceFile[K, V](sc: JavaSparkContext,
path: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
minSplits: Int) = {
implicit val kcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[V]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]

val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))

/*
val rdd = if (kc.isInstanceOf[Writable] && vc.isInstanceOf[Writable]) {
val writables = sc.sc.sequenceFile(path, kc.asInstanceOf[Class[Writable]], vc.asInstanceOf[Class[Writable]], minSplits)
val w = writables.map{case (k,v) => (t.convert(k), t.convert(v))}
//implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K <:< Writable]]
//ClassManifest.fromClass(kc.asInstanceOf[Class[Writable]])
//sequenceFileWritable(sc, path ,minSplits).asInstanceOf[RDD[(K, V)]]
//sequenceFileWritable(sc, kc, vc, path, minSplits)
}
else {
sc.sc.sequenceFile[K, V](path, minSplits)

}

*/
}
*/

private def convertRDD[K, V](rdd: RDD[(K, V)]) = {
rdd.map{
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
}
}

private def convert(writable: Writable): Any = {
writable match {
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
case t: Text => SparkContext.stringWritableConverter().convert(t)
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
case n: NullWritable => None
case aw: ArrayWritable => aw.get().map(convert(_))
case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap
case other => other
}
}

/*
def sequenceFileWritable[K, V](sc: JavaSparkContext,
path: String,
minSplits: Int)
//(implicit km: ClassManifest[K], vm: ClassManifest[V])
// kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
= {

import SparkContext._
implicit val kcm = ClassManifest.fromClass(keyClazz) //.asInstanceOf[ClassManifest[K]]
//implicit val vcm = ClassManifest.fromClass(valueClazz) //.asInstanceOf[ClassManifest[V]]
sc.sc.sequenceFile(path) //, kc, vc, minSplits)
// JavaRDD.fromRDD(serMsgPack[K, V](rdd))
}
*/

//

def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) {
Expand All @@ -503,9 +349,6 @@ private[spark] object PythonRDD extends Logging {
dataOut.write(b)
case str: String =>
dataOut.writeUTF(str)
//case (a: String, b: String) =>
// dataOut.writeUTF(a)
// dataOut.writeUTF(b)
case other =>
throw new SparkException("Unexpected element type " + other.getClass)
}
Expand Down
81 changes: 81 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.apache.spark.api.python

import org.msgpack.ScalaMessagePack
import scala.util.Try
import org.apache.spark.rdd.RDD
import java.io.Serializable
import org.apache.spark.{SparkContext, Logging}
import org.apache.hadoop.io._
import scala.util.Success
import scala.util.Failure

/**
* Utilities for serialization / deserialization between Python and Java, using MsgPack.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually using Pickle now instead of MsgPack

* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] -> Scala objects and primitives
*/
private[python] object SerDeUtil extends Logging {

def register[T](clazz: Class[T], msgpack: ScalaMessagePack) {
Try {
log.info("%s".format(clazz))
clazz match {
case c if c.isPrimitive =>
case c if c.isInstanceOf[java.lang.String] =>
case _ => msgpack.register(clazz)
}
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. ".format(clazz.getName) +
"Falling back to default MsgPack serialization, or 'toString' as last resort"))
}

// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
import org.msgpack.ScalaMessagePack._
val msgpack = new ScalaMessagePack with Serializable
val first = rdd.first()
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
register(kc, msgpack)
register(vc, msgpack)
rdd.map{ pair =>
Try {
msgpack.write(pair)
} match {
case Failure(err) =>
Try {
write((pair._1.toString, pair._2.toString))
} match {
case Success(result) => result
case Failure(e) => throw e
}
case Success(result) => result
}
}
}

def convertRDD[K, V](rdd: RDD[(K, V)]) = {
rdd.map{
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
}
}

def convert(writable: Writable): Any = {
import collection.JavaConversions._
writable match {
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
case t: Text => SparkContext.stringWritableConverter().convert(t)
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
case n: NullWritable => None
case aw: ArrayWritable => aw.get().map(convert(_))
case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap
case other => other
}
}

}
11 changes: 7 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,25 @@ def sequenceFileAsText(self, name):
jrdd = self._jvm.PythonRDD.sequenceFileAsText(self._jsc, name)
return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer

def sequenceFile(self, name, keyClass, valueClass, keyWrapper="", valueWrapper="", minSplits=None):
def sequenceFile(self, name, keyClass="org.apache.hadoop.io.Text", valueClass="org.apache.hadoop.io.Text",
keyWrapper="", valueWrapper="", minSplits=None):
"""
Read a Hadoopp SequenceFile with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
and return it as an RDD of (String, String) where the key and value representations
are generated using the 'toString()' method of the relevant Java class.

>>> sc.sequenceFile("/tmp/spark/test/sfint/").collect()
>>> sc.sequenceFile("test_support/data/sfint/").collect()
[(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')]
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyClass, valueClass, keyWrapper, valueWrapper, minSplits)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyClass, valueClass, keyWrapper, valueWrapper,
minSplits)
#jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyWrapper, valueWrapper, minSplits)
return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer

def newHadoopFile(self, name, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString", conf = {}):
def newHadoopFile(self, name, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString",
conf = {}):
"""
Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
Expand Down
Binary file added python/test_support/data/sfdouble/._SUCCESS.crc
Binary file not shown.
Binary file added python/test_support/data/sfdouble/.part-00000.crc
Binary file not shown.
Binary file added python/test_support/data/sfdouble/.part-00001.crc
Binary file not shown.
Empty file.
Binary file added python/test_support/data/sfdouble/part-00000
Binary file not shown.
Binary file added python/test_support/data/sfdouble/part-00001
Binary file not shown.
Binary file added python/test_support/data/sfint/._SUCCESS.crc
Binary file not shown.
Binary file added python/test_support/data/sfint/.part-00000.crc
Binary file not shown.
Binary file added python/test_support/data/sfint/.part-00001.crc
Binary file not shown.
Empty file.
Binary file added python/test_support/data/sfint/part-00000
Binary file not shown.
Binary file added python/test_support/data/sfint/part-00001
Binary file not shown.
Binary file added python/test_support/data/sftext/.part-00000.crc
Binary file not shown.
Binary file added python/test_support/data/sftext/.part-00001.crc
Binary file not shown.
Binary file added python/test_support/data/sftext/part-00000
Binary file not shown.
Binary file added python/test_support/data/sftext/part-00001
Binary file not shown.