-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-2377] Python API for Streaming #2538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
1fd12ae
c05922c
1f68b78
3dda31a
7f96294
fa75d71
8efa266
3a671cc
774f18d
33c0f94
4f2d7e6
9767712
35933e1
7051a84
99e4bb3
580fbc2
94f2b65
e9fab72
4aa99e4
6d8190a
14d4c0e
97742fe
e162822
e70f706
f1798c4
185fdbf
199e37f
58150f5
09a28bf
268a6a5
4dedd2d
171edeb
f0ea311
1d84142
583e66d
b7dab85
0d30109
24f95db
9c85e48
7339df2
9d1de23
4f82c89
50fd6f9
93f7637
acfcaeb
3b27bd4
2ea769e
c97377c
67473a9
d9d59fe
1fd6bc7
4afa390
d68b568
da09768
f5bfb70
fdc9125
ee50c5a
f7bc8f9
150b94c
454981d
b406252
87438e2
d7b4d6f
1a0f065
17a74c6
494cae5
e1df940
5bac7ec
d2099d8
224fc5e
bb7ccf3
f746109
0d1b954
ccfd214
b31446a
dc6995d
c455c8d
6f98e50
15feea9
d3ee86a
72b9738
bab31c1
0a8bbbb
678e854
b1d2a30
05e991b
9ab8952
84a9668
3b498e1
b349649
3c45cd2
d2c01ba
c462bb3
4d40d63
29c2bc5
fe648e3
8a0fbbc
1523b66
1df77f5
9ad6855
ce2acd2
878bad7
f21cab3
3d37822
253a863
bb10956
270a9e1
bcdec33
ff14070
3000b2b
13fb44c
18c8723
f76c182
74535d4
16aa64f
e54f986
10b5b04
10ab87b
5625bdc
c214199
0b99bec
41886c2
66fcfff
38adf95
4bcb318
247fd74
dd6de81
f485b1d
0df7111
58591d2
98c2a00
eb4bf48
6197a11
2ad7bd3
fe02547
4f07163
54b5358
88f7506
1b83354
92e333e
0b09cff
932372a
376e3ac
1934726
019ef38
5c04a5f
bd3ba53
9cde7c9
b3b0362
99410be
c1d546e
af610d3
953deb0
f67cf57
1e126bf
795b2cd
8dcda84
c5ecfc1
2a06cdb
99ce042
ddd4ee1
af336b7
455e5af
58e41ff
e80647e
c00e091
3166d31
f198d14
b171ec3
f04882c
62dc7a3
7dc7391
6ae3caa
fa4af88
066ba90
8ed93af
fbed8da
bebb3f3
b0f2015
f385976
c0a06bc
2fdf0de
d542743
d39f102
63c881a
d5f5fcb
8ffdbf1
4a59e1e
2d32a74
e685853
5cdb6fa
550dfd9
df098fc
7f53086
7339be0
bd27874
9a57685
eec401e
bd13026
d357b70
c28f520
3f0fb4b
c499ba0
604323f
b32774c
74df565
26ea396
7001b51
fce0ef5
e059ca2
847f9b9
b983f0f
98ac6c2
c40c52d
6ebceca
19797f9
338580a
069a94c
e00136b
eed6e2a
b98d63f
9a16bd1
8466916
a13ff34
fa7261b
6f0da2f
d328aca
ff88bec
7797c70
bd8a4c2
7a88f9f
c2b31cb
54bd92b
4d0ea8b
6bb9d91
c7bbbce
8071541
be5e5ff
d05871e
37fe06f
e108ec1
52c535b
8380064
6db00da
bebeb4a
02d0575
182be73
3e2492b
331ecce
64561e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,11 @@ | |
|
|
||
| package org.apache.spark.streaming.api.python | ||
|
|
||
| import java.io.{ObjectInputStream, ObjectOutputStream} | ||
| import java.lang.reflect.Proxy | ||
| import java.util.{ArrayList => JArrayList, List => JList} | ||
| import scala.collection.JavaConversions._ | ||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.api.java._ | ||
| import org.apache.spark.api.python._ | ||
|
|
@@ -35,14 +36,14 @@ import org.apache.spark.streaming.api.java._ | |
| * Interface for Python callback function with three arguments | ||
| */ | ||
| private[python] trait PythonRDDFunction { | ||
| // callback in Python | ||
| def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]] | ||
| } | ||
|
|
||
| /** | ||
| * Wrapper for PythonRDDFunction | ||
| * TODO: support checkpoint | ||
| */ | ||
| private[python] class RDDFunction(pfunc: PythonRDDFunction) | ||
| private[python] class RDDFunction(@transient var pfunc: PythonRDDFunction) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still find the name very confusing, even after looking at this code twice. Its more intuitive to call is TransformFunction (that is, function for transform operation) that RDDFunction (makes me think function of RDD operation, which is wrong). Please rename |
||
| extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function2 is already Serializable. |
||
|
|
||
| def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { | ||
|
|
@@ -58,30 +59,62 @@ private[python] class RDDFunction(pfunc: PythonRDDFunction) | |
| def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = { | ||
| pfunc.call(time.milliseconds, rdds) | ||
| } | ||
| } | ||
|
|
||
| private def writeObject(out: ObjectOutputStream): Unit = { | ||
| assert(PythonDStream.serializer != null, "Serializer has not been registered!") | ||
| val bytes = PythonDStream.serializer.serialize(pfunc) | ||
| out.writeInt(bytes.length) | ||
| out.write(bytes) | ||
| } | ||
|
|
||
| private def readObject(in: ObjectInputStream): Unit = { | ||
| assert(PythonDStream.serializer != null, "Serializer has not been registered!") | ||
| val length = in.readInt() | ||
| val bytes = new Array[Byte](length) | ||
| in.readFully(bytes) | ||
| pfunc = PythonDStream.serializer.deserialize(bytes) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Base class for PythonDStream with some common methods | ||
| * Inferface for Python Serializer to serialize PythonRDDFunction | ||
| */ | ||
| private[python] | ||
| abstract class PythonDStream(parent: DStream[_], pfunc: PythonRDDFunction) | ||
| extends DStream[Array[Byte]] (parent.ssc) { | ||
|
|
||
| val func = new RDDFunction(pfunc) | ||
|
|
||
| override def dependencies = List(parent) | ||
| private[python] trait PythonRDDFunctionSerializer { | ||
| def dumps(id: String): Array[Byte] // | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra |
||
| def loads(bytes: Array[Byte]): PythonRDDFunction | ||
| } | ||
|
|
||
| override def slideDuration: Duration = parent.slideDuration | ||
| /** | ||
| * Wrapper for PythonRDDFunctionSerializer | ||
| */ | ||
| private[python] class RDDFunctionSerializer(pser: PythonRDDFunctionSerializer) { | ||
| def serialize(func: PythonRDDFunction): Array[Byte] = { | ||
| // get the id of PythonRDDFunction in py4j | ||
| val h = Proxy.getInvocationHandler(func.asInstanceOf[Proxy]) | ||
| val f = h.getClass().getDeclaredField("id"); | ||
| f.setAccessible(true); | ||
| val id = f.get(h).asInstanceOf[String]; | ||
| pser.dumps(id) | ||
| } | ||
|
|
||
| val asJavaDStream = JavaDStream.fromDStream(this) | ||
| def deserialize(bytes: Array[Byte]): PythonRDDFunction = { | ||
| pser.loads(bytes) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Helper functions | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a note here saying that these functions are called from Python via Py4J. |
||
| */ | ||
| private[python] object PythonDStream { | ||
|
|
||
| // A serializer in Python, used to serialize PythonRDDFunction | ||
| var serializer: RDDFunctionSerializer = _ | ||
|
|
||
| // Register a serializer from Python, should be called during initialization | ||
| def registerSerializer(ser: PythonRDDFunctionSerializer) = { | ||
| serializer = new RDDFunctionSerializer(ser) | ||
| } | ||
|
|
||
| // convert Option[RDD[_]] to JavaRDD, handle null gracefully | ||
| def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isnt this just |
||
| if (rdd.isDefined) { | ||
|
|
@@ -123,14 +156,30 @@ private[python] object PythonDStream { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Base class for PythonDStream with some common methods | ||
| */ | ||
| private[python] | ||
| abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonRDDFunction) | ||
| extends DStream[Array[Byte]] (parent.ssc) { | ||
|
|
||
| val func = new RDDFunction(pfunc) | ||
|
|
||
| override def dependencies = List(parent) | ||
|
|
||
| override def slideDuration: Duration = parent.slideDuration | ||
|
|
||
| val asJavaDStream = JavaDStream.fromDStream(this) | ||
| } | ||
|
|
||
| /** | ||
| * Transformed DStream in Python. | ||
| * | ||
| * If `reuse` is true and the result of the `func` is an PythonRDD, then it will cache it | ||
| * as an template for future use, this can reduce the Python callbacks. | ||
| */ | ||
| private[python] | ||
| class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction, | ||
| class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonRDDFunction, | ||
| var reuse: Boolean = false) | ||
| extends PythonDStream(parent, pfunc) { | ||
|
|
||
|
|
@@ -170,7 +219,7 @@ class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction, | |
| */ | ||
| private[python] | ||
| class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_], | ||
| pfunc: PythonRDDFunction) | ||
| @transient pfunc: PythonRDDFunction) | ||
| extends DStream[Array[Byte]] (parent.ssc) { | ||
|
|
||
| val func = new RDDFunction(pfunc) | ||
|
|
@@ -190,7 +239,7 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_], | |
| * similar to StateDStream | ||
| */ | ||
| private[python] | ||
| class PythonStateDStream(parent: DStream[Array[Byte]], reduceFunc: PythonRDDFunction) | ||
| class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: PythonRDDFunction) | ||
| extends PythonDStream(parent, reduceFunc) { | ||
|
|
||
| super.persist(StorageLevel.MEMORY_ONLY) | ||
|
|
@@ -212,8 +261,8 @@ class PythonStateDStream(parent: DStream[Array[Byte]], reduceFunc: PythonRDDFunc | |
| */ | ||
| private[python] | ||
| class PythonReducedWindowedDStream(parent: DStream[Array[Byte]], | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's called in Python, should private[spark] also work?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incorrect scala style. See http://docs.scala-lang.org/style/declarations.html |
||
| preduceFunc: PythonRDDFunction, | ||
| pinvReduceFunc: PythonRDDFunction, | ||
| @transient preduceFunc: PythonRDDFunction, | ||
| @transient pinvReduceFunc: PythonRDDFunction, | ||
| _windowDuration: Duration, | ||
| _slideDuration: Duration | ||
| ) extends PythonStateDStream(parent, preduceFunc) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this now only takes two arguments.