-
Notifications
You must be signed in to change notification settings - Fork 3
Python API for Spark Streaming #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ac3440f
74f82c7
41e0a21
cc491f6
acff9a7
1aad911
2643e66
d94f599
d0ae3f3
69ec678
1d70c4f
82624e2
b70bae4
5a826c0
63bdb1f
c7b5201
ee7f308
09f7e45
4878911
a6cd311
d614967
4e98236
25cff10
e537b33
c6889d2
4e00833
17caae4
4201d27
a263a7e
a120d07
ffd1f59
47ccd5e
75993a6
8d1dec4
b9e9e53
80ec5ba
32096c2
6906b69
4c51098
9de6a42
9a54de1
9016af3
0489cee
c874723
45d8f4d
b7c89a7
74d6f62
ec79063
1c84dba
43af281
28dbae8
b431e67
e45daf2
4f4a988
5b6585d
482c5af
3570119
1d03a26
28dcbb5
b715aa0
90ae568
ba28a8f
2cfd3a0
db0a303
3334169
e8c7bfc
bdde697
a65f302
db06a81
3733866
90a6484
7712e72
32638b5
6fab941
490ecfa
21a95ef
e83fdcd
647aeba
c9c89c3
c686b7d
bad21ed
5d54d71
9038d94
f0060b7
882da57
c235b83
676f982
246cb3f
2bd8126
fe47359
869f06c
c974a71
434bea1
7ecb867
bdc7a1a
13f54e2
9256d4a
376a82e
9fde1ff
905dc4b
63d6777
0c7b452
0704b86
9497b12
e424565
69a57a1
d069c5d
080541a
6b8de0e
078f3fb
add75d4
fde692b
267fdff
eaeb0f7
9622106
a7f8a4f
a75bc7a
fa5a08e
2112638
655699f
3a8b68b
9422a9b
500f84e
e1b85f3
fba8ec3
536def4
a14c7e1
7589c39
fd9fcd2
e3033fc
0afe5cb
c703229
cc36487
89ae38a
5d25c0b
2e069ca
ea9c873
c9da466
a83c772
20fcf3d
b4a0592
4bdfaa1
76fa0ea
7e70708
ac6411c
379e758
2fc8aca
bc95fe0
fbad722
73ab7f1
318e28b
5ecb08e
bfa09b0
9924328
95470a0
c77f406
5173f3c
df652ea
3c8fa50
eef779b
d8b593b
e7ebb08
636090a
a3d2379
665bfdb
5c3a683
e497b9b
6e0d9c7
9af03f4
dcf243f
c5518b4
3758175
e551e13
2adca84
5594bd4
490e338
856d98e
4ce4058
02f618a
4b69fb1
57fb740
967dc26
7f7c5d1
d25d5cf
0b8b7d0
d1ee6ca
a9f4ecb
05459c6
9fa249b
aeaf8a5
5e822d4
4eff053
4caae3f
c9fc124
19ddcdd
b47b5fd
b6468e6
b8d7d24
189dcea
79c5809
5a9b525
ea4b06b
5d22c92
c880a33
1fd12ae
c05922c
1f68b78
3dda31a
7f96294
fa75d71
8efa266
3a671cc
774f18d
33c0f94
4f2d7e6
9767712
35933e1
7051a84
99e4bb3
580fbc2
94f2b65
e9fab72
4aa99e4
6d8190a
14d4c0e
97742fe
e162822
e70f706
f1798c4
185fdbf
199e37f
58150f5
09a28bf
268a6a5
4dedd2d
171edeb
f0ea311
1d84142
583e66d
b7dab85
0d30109
24f95db
9c85e48
7339df2
9d1de23
4f82c89
50fd6f9
93f7637
acfcaeb
3b27bd4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,7 @@ | |
| <parent> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-parent</artifactId> | ||
| <version>1.1.0-SNAPSHOT</version> | ||
| <version>1.0.0</version> | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not change at all. Revert this file.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted |
||
| <relativePath>../pom.xml</relativePath> | ||
| </parent> | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| import sys | ||
| from operator import add | ||
|
|
||
| from pyspark.streaming.context import StreamingContext | ||
| from pyspark.streaming.duration import * | ||
|
|
||
| if __name__ == "__main__": | ||
| if len(sys.argv) != 2: | ||
| print >> sys.stderr, "Usage: wordcount <directory>" | ||
| exit(-1) | ||
| ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) | ||
|
|
||
| lines = ssc.textFileStream(sys.argv[1]) | ||
| fm_lines = lines.flatMap(lambda x: x.split(" ")) | ||
| filtered_lines = fm_lines.filter(lambda line: "Spark" in line) | ||
| mapped_lines = fm_lines.map(lambda x: (x, 1)) | ||
|
|
||
| fm_lines.pyprint() | ||
| filtered_lines.pyprint() | ||
| mapped_lines.pyprint() | ||
| ssc.start() | ||
| ssc.awaitTermination() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| __author__ = 'ktakagiw' | ||
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| __author__ = 'ktakagiw' | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove the author. We dont have individual author names in any file. The commit records has all the all the author names. :)
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also remove the extra lines before the license.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deleted |
||
|
|
||
|
|
||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| import os | ||
| import shutil | ||
| import sys | ||
| from threading import Lock | ||
| from tempfile import NamedTemporaryFile | ||
|
|
||
| from pyspark import accumulators | ||
| from pyspark.accumulators import Accumulator | ||
| from pyspark.broadcast import Broadcast | ||
| from pyspark.conf import SparkConf | ||
| from pyspark.files import SparkFiles | ||
| from pyspark.java_gateway import launch_gateway | ||
| from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer | ||
| from pyspark.storagelevel import StorageLevel | ||
| from pyspark.rdd import RDD | ||
| from pyspark.context import SparkContext | ||
|
|
||
| from py4j.java_collections import ListConverter | ||
|
|
||
| from pyspark.streaming.dstream import DStream | ||
|
|
||
| class StreamingContext(object): | ||
| """ | ||
| Main entry point for Spark functionality. A StreamingContext represents the | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Main entry point for Spark Streaming functionality.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modified the code based on your comment |
||
| connection to a Spark cluster, and can be used to create L{RDD}s and | ||
| broadcast variables on that cluster. | ||
| """ | ||
|
|
||
| def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, | ||
| environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, | ||
| gateway=None, duration=None): | ||
| """ | ||
| Create a new StreamingContext. At least the master and app name and duration | ||
| should be set, either through the named parameters here or through C{conf}. | ||
|
|
||
| @param master: Cluster URL to connect to | ||
| (e.g. mesos://host:port, spark://host:port, local[4]). | ||
| @param appName: A name for your job, to display on the cluster web UI. | ||
| @param sparkHome: Location where Spark is installed on cluster nodes. | ||
| @param pyFiles: Collection of .zip or .py files to send to the cluster | ||
| and add to PYTHONPATH. These can be paths on the local file | ||
| system or HDFS, HTTP, HTTPS, or FTP URLs. | ||
| @param environment: A dictionary of environment variables to set on | ||
| worker nodes. | ||
| @param batchSize: The number of Python objects represented as a single | ||
| Java object. Set 1 to disable batching or -1 to use an | ||
| unlimited batch size. | ||
| @param serializer: The serializer for RDDs. | ||
| @param conf: A L{SparkConf} object setting Spark properties. | ||
| @param gateway: Use an existing gateway and JVM, otherwise a new JVM | ||
| will be instatiated. | ||
| @param duration: A L{Duration} Duration for SparkStreaming | ||
|
|
||
| """ | ||
| # Create the Python Sparkcontext | ||
| self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome, | ||
| pyFiles=pyFiles, environment=environment, batchSize=batchSize, | ||
| serializer=serializer, conf=conf, gateway=gateway) | ||
| self._jvm = self._sc._jvm | ||
| self._jssc = self._initialize_context(self._sc._jsc, duration._jduration) | ||
|
|
||
| # Initialize StremaingContext in function to allow subclass specific initialization | ||
| def _initialize_context(self, jspark_context, jduration): | ||
| return self._jvm.JavaStreamingContext(jspark_context, jduration) | ||
|
|
||
| def actorStream(self, props, name, storageLevel, supervisorStrategy): | ||
| raise NotImplementedError | ||
|
|
||
| def addStreamingListener(self, streamingListener): | ||
| raise NotImplementedError | ||
|
|
||
| def awaitTermination(self, timeout=None): | ||
| if timeout: | ||
| self._jssc.awaitTermination(timeout) | ||
| else: | ||
| self._jssc.awaitTermination() | ||
|
|
||
| def checkpoint(self, directory): | ||
| raise NotImplementedError | ||
|
|
||
| def fileStream(self, directory, filter=None, newFilesOnly=None): | ||
| raise NotImplementedError | ||
|
|
||
| def networkStream(self, receiver): | ||
| raise NotImplementedError | ||
|
|
||
| def queueStream(self, queue, oneAtATime=True, defaultRDD=None): | ||
| raise NotImplementedError | ||
|
|
||
| def rawSocketStream(self, hostname, port, storagelevel): | ||
| raise NotImplementedError | ||
|
|
||
| def remember(self, duration): | ||
| raise NotImplementedError | ||
|
|
||
| def socketStream(hostname, port, converter,storageLevel): | ||
| raise NotImplementedError | ||
|
|
||
| def start(self): | ||
| self._jssc.start() | ||
|
|
||
| def stop(self, stopSparkContext=True): | ||
| raise NotImplementedError | ||
|
|
||
| def textFileStream(self, directory): | ||
| return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) | ||
|
|
||
| def transform(self, seq): | ||
| raise NotImplementedError | ||
|
|
||
| def union(self, seq): | ||
| raise NotImplementedError | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a hack to make DStream.pyprint work. This will be removed after pyprint is moved to PythonDStream.
Problem is that I wrote the pyprint function in (Scala)DStream. Whenever python code is executed, we call PythonDStream which can pass pythonExec(which python Spark should execute).
Since pyprint is located in DStream, Spark cannot get this variable from python. Spark does not know which python should use. In that case, get python path from PYSPARK_PYTHON, environmental variable. This fix is ongoing in print branch in my repo.