Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
d86325f
Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop …
MLnick Dec 9, 2013
4b0a43f
Refactoring utils into own objects. Cleaning up old commented-out code
MLnick Dec 12, 2013
c304cc8
Adding supporting sequncefiles for tests. Cleaning up
MLnick Dec 15, 2013
4e7c9e3
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Dec 15, 2013
818a1e6
Add seqencefile and Hadoop InputFormat support to PythonRDD
MLnick Dec 15, 2013
4294cbb
Add old Hadoop api methods. Clean up and expand comments. Clean up ar…
MLnick Dec 19, 2013
0f5cd84
Remove unused pair UTF8 class. Add comments to msgpack deserializer
MLnick Dec 19, 2013
f1d73e3
mergeConfs returns a copy rather than mutating one of the input argum…
MLnick Dec 19, 2013
4d7ef2e
Fix indentation
MLnick Dec 19, 2013
eb40036
Remove unused comment lines
MLnick Dec 19, 2013
1c8efbc
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jan 13, 2014
619c0fa
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jan 20, 2014
703ee65
Add back msgpack
MLnick Jan 20, 2014
174f520
Add back graphx settings
MLnick Jan 20, 2014
795a763
Change name to WriteInputFormatTestDataGenerator. Cleanup some var na…
MLnick Jan 20, 2014
2beeedb
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Feb 8, 2014
97ef708
Remove old writeToStream
MLnick Feb 14, 2014
41856a5
Merge branch 'master' into pyspark-inputformats
MLnick Mar 19, 2014
f2d76a0
Merge branch 'master' into pyspark-inputformats
MLnick Mar 19, 2014
e67212a
Add back msgpack dependency
MLnick Mar 19, 2014
dd57922
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Apr 10, 2014
d72bf18
msgpack
MLnick Apr 10, 2014
0c612e5
Merge branch 'master' into pyspark-inputformats
MLnick Apr 12, 2014
65360d5
Adding test SequenceFiles
MLnick Apr 18, 2014
25da1ca
Add generator for nulls, bools, bytes and maps
MLnick Apr 18, 2014
7237263
Add back msgpack serializer and hadoop file code lost during merging
MLnick Apr 18, 2014
a67dfad
Clean up Msgpack serialization and registering
MLnick Apr 18, 2014
1bbbfb0
Clean up SparkBuild from merge
MLnick Apr 18, 2014
9d2256e
Merge branch 'master' into pyspark-inputformats
MLnick Apr 18, 2014
f6aac55
Bring back msgpack
MLnick Apr 18, 2014
951c117
Merge branch 'master' into pyspark-inputformats
MLnick Apr 19, 2014
b20ec7e
Clean up merge duplicate dependencies
MLnick Apr 19, 2014
4e08983
Clean up docs for PySpark context methods
MLnick Apr 19, 2014
fc5099e
Add Apache license headers
MLnick Apr 19, 2014
31a2fff
Scalastyle fixes
MLnick Apr 21, 2014
450e0a2
Merge branch 'master' into pyspark-inputformats
MLnick Apr 21, 2014
f60959e
Remove msgpack dependency and serializer from PySpark
MLnick Apr 21, 2014
17a656b
remove binary sequencefile for tests
MLnick Apr 21, 2014
1d7c17c
Amend tests to auto-generate sequencefile data in temp dir
MLnick Apr 21, 2014
c0ebfb6
Change sequencefile test data generator to easily be called from PySp…
MLnick Apr 21, 2014
44f2857
Remove msgpack dependency and switch serialization to Pyrolite, plus …
MLnick Apr 21, 2014
e7552fa
Merge branch 'master' into pyspark-inputformats
MLnick Apr 22, 2014
64eb051
Scalastyle fix
MLnick Apr 22, 2014
78978d9
Add doc for SequenceFile and InputFormat support to Python programmin…
MLnick Apr 22, 2014
e001b94
Fix test failures due to ordering
MLnick Apr 23, 2014
bef3afb
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Apr 23, 2014
35b8e3a
Another fix for test ordering
MLnick Apr 23, 2014
5af4770
Merge branch 'master' into pyspark-inputformats
MLnick Apr 23, 2014
077ecb2
Recover earlier changes lost in previous merge for context.py
MLnick Apr 23, 2014
9ef1896
Recover earlier changes lost in previous merge for serializers.py
MLnick Apr 23, 2014
93ef995
Add back context.py changes
MLnick Apr 23, 2014
7caa73a
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick May 23, 2014
d0f52b6
Python programming guide
MLnick May 23, 2014
84fe8e3
Python programming guide space formatting
MLnick May 23, 2014
9fe6bd5
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick May 31, 2014
15a7d07
Remove default args for key/value classes. Arg names to camelCase
MLnick Jun 3, 2014
01e0813
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jun 3, 2014
1a4a1d6
Address @mateiz style comments
MLnick Jun 3, 2014
94beedc
Clean up args in PythonRDD. Set key/value converter defaults to None …
MLnick Jun 3, 2014
43eb728
PySpark InputFormats docs into programming guide
MLnick Jun 3, 2014
085b55f
Move input format tests to tests.py and clean up docs
MLnick Jun 3, 2014
5757f6e
Default key/value classes for sequenceFile asre None
MLnick Jun 3, 2014
b65606f
Add converter interface
MLnick Jun 4, 2014
2c18513
Add examples for reading HBase and Cassandra InputFormats from Python
MLnick Jun 4, 2014
3f90c3e
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jun 4, 2014
1eaa08b
HBase -> Cassandra app name oversight
MLnick Jun 4, 2014
eeb8205
Fix path relative to SPARK_HOME in tests
MLnick Jun 4, 2014
365d0be
Make classes private[python]. Add docs and @Experimental annotation t…
MLnick Jun 5, 2014
a985492
Move Converter examples to own package
MLnick Jun 5, 2014
5ebacfa
Update docs for PySpark input formats
MLnick Jun 5, 2014
cde6af9
Parameterize converter trait
MLnick Jun 6, 2014
d150431
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
MLnick Jun 6, 2014
4c972d8
Add license headers
MLnick Jun 6, 2014
761269b
Address @pwendell comments, simplify default writable conversions and…
MLnick Jun 7, 2014
268df7e
Documentation changes mer @pwendell comments
MLnick Jun 8, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Scalastyle fixes
  • Loading branch information
MLnick committed Apr 21, 2014
commit 31a2ffff2b4518b9672078bc4f52095ad2f51b64
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ private[python] object PythonHadoopUtil {
conf
}

/** Merges two configurations, returns a copy of left with keys from right overwriting any matching keys in left */
/**
* Merges two configurations, returns a copy of left with keys from right overwriting
* any matching keys in left
*/
def mergeConfs(left: Configuration, right: Configuration) = {
import collection.JavaConversions._
val copy = new Configuration(left)
Expand Down
102 changes: 54 additions & 48 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,6 @@ private[spark] object PythonRDD {
}
}

// PySpark / Hadoop InputFormat//

/** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
def sequenceFile[K, V](sc: JavaSparkContext,
path: String,
Expand All @@ -295,14 +293,15 @@ private[spark] object PythonRDD {
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
* key and value class
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
Expand All @@ -314,16 +313,18 @@ private[spark] object PythonRDD {
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
* using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]], key and value class
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is
* passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
* key and value class
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Expand All @@ -332,12 +333,13 @@ private[spark] object PythonRDD {
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
Expand All @@ -356,14 +358,15 @@ private[spark] object PythonRDD {
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
* key and value class
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
def hadoopFile[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
Expand All @@ -375,16 +378,18 @@ private[spark] object PythonRDD {
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
* using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], key and value class
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map
* that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
* key and value class
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
def hadoopRDD[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Expand All @@ -393,12 +398,13 @@ private[spark] object PythonRDD {
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
Expand Down
18 changes: 11 additions & 7 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import scala.util.Failure

/**
* Utilities for serialization / deserialization between Python and Java, using MsgPack.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually using Pickle now instead of MsgPack

* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] -> Scala objects and primitives
* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] ->
* Scala objects and primitives
*/
private[python] object SerDeUtil extends Logging {

/**
* Checks whether a Scala object needs to be registered with MsgPack. String, primitives and the standard collections
* don't need to be registered as MsgPack takes care of serializing them and registering them throws scary looking
* errors (but still works).
* Checks whether a Scala object needs to be registered with MsgPack. String, primitives
* and the standard collections don't need to be registered as MsgPack takes care of serializing
* them and registering them throws scary looking errors (but still works).
*/
def needsToBeRegistered[T](t: T) = {
t match {
Expand Down Expand Up @@ -101,8 +102,8 @@ private[python] object SerDeUtil extends Logging {
}

/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of [[org.apache.hadoop.io.Writable]],
* into an RDD[(K, V)]
* Converts an RDD of key-value pairs, where key and/or value could be instances of
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
*/
def convertRDD[K, V](rdd: RDD[(K, V)]) = {
rdd.map{
Expand All @@ -113,7 +114,9 @@ private[python] object SerDeUtil extends Logging {
}
}

/** Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or object representation */
/** Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
* object representation
*/
def convert(writable: Writable): Any = {
import collection.JavaConversions._
writable match {
Expand All @@ -132,3 +135,4 @@ private[python] object SerDeUtil extends Logging {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.SparkContext
import org.apache.hadoop.io._
import scala.Array
import java.io.{DataOutput, DataInput}
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat

/**
* A class to test MsgPack serialization on the Scala side, that will be deserialized
Expand Down Expand Up @@ -66,15 +67,20 @@ object WriteInputFormatTestDataGenerator extends App {
val boolPath = s"$basePath/sfbool"
val nullPath = s"$basePath/sfnull"

// Create test data for IntWritable, DoubleWritable, Text, BytesWritable, BooleanWritable and NullWritable
/*
* Create test data for IntWritable, DoubleWritable, Text, BytesWritable,
* BooleanWritable and NullWritable
*/
val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa"))
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
sc.parallelize(intKeys).map{ case (k, v) => (new IntWritable(k), NullWritable.get()) }.saveAsSequenceFile(nullPath)
sc.parallelize(intKeys).map{ case (k, v) =>
(new IntWritable(k), NullWritable.get())
}.saveAsSequenceFile(nullPath)

// Create test data for ArrayWritable
val data = Seq(
Expand All @@ -86,7 +92,7 @@ object WriteInputFormatTestDataGenerator extends App {
.map{ case (k, v) =>
(new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
}
.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)

// Create test data for MapWritable, with keys DoubleWritable and values Text
val mapData = Seq(
Expand Down Expand Up @@ -116,6 +122,6 @@ object WriteInputFormatTestDataGenerator extends App {
val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
rdd.saveAsNewAPIHadoopFile(classPath,
classOf[Text], classOf[TestWritable],
classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestWritable]])
classOf[SequenceFileOutputFormat[Text, TestWritable]])

}
}