Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
69da6cf
Merge pull request #1 from apache/master
nchammas Jun 10, 2014
6544b7e
[SPARK-2065] give launched instances names
nchammas Jun 10, 2014
2627247
broke up lines before they hit 100 chars
nchammas Jun 11, 2014
69f6e22
PEP8 fixes
nchammas Jun 11, 2014
89fde08
Merge pull request #2 from apache/master
nchammas Jun 13, 2014
2e4fe00
Merge pull request #3 from apache/master
nchammas Jun 20, 2014
de7292a
Merge pull request #4 from apache/master
nchammas Jul 9, 2014
a36eed0
name ec2 instances and security groups consistently
nchammas Jul 9, 2014
f7e4581
unrelated pep8 fix
nchammas Jul 9, 2014
4dd148f
Merge pull request #5 from apache/master
nchammas Jul 20, 2014
f0a7ebf
[SPARK-2470] PEP8 fixes to rddsampler.py
nchammas Jul 20, 2014
a6d5e4b
[SPARK-2470] PEP8 fixes to cloudpickle.py
nchammas Jul 20, 2014
f4e0039
[SPARK-2470] PEP8 fixes to conf.py
nchammas Jul 20, 2014
ca2d28b
[SPARK-2470] PEP8 fixes to context.py
nchammas Jul 20, 2014
7fc849c
[SPARK-2470] PEP8 fixes to daemon.py
nchammas Jul 20, 2014
1bde265
[SPARK-2470] PEP8 fixes to java_gateway.py
nchammas Jul 20, 2014
81fcb20
[SPARK-2470] PEP8 fixes to resultiterable.py
nchammas Jul 20, 2014
d14f2f1
[SPARK-2470] PEP8 fixes to __init__.py
nchammas Jul 20, 2014
c85e1e5
[SPARK-2470] PEP8 fixes to join.py
nchammas Jul 20, 2014
a0fec2e
[SPARK-2470] PEP8 fixes to mllib
nchammas Jul 20, 2014
95d1d95
[SPARK-2470] PEP8 fixes to serializers.py
nchammas Jul 20, 2014
1916859
[SPARK-2470] PEP8 fixes to shell.py
nchammas Jul 20, 2014
aa3a7b6
[SPARK-2470] PEP8 fixes to sql.py
nchammas Jul 20, 2014
d644477
[SPARK-2470] PEP8 fixes to worker.py
nchammas Jul 20, 2014
b3b96cf
[SPARK-2470] PEP8 fixes to statcounter.py
nchammas Jul 20, 2014
8f8e4c0
[SPARK-2470] PEP8 fixes to storagelevel.py
nchammas Jul 20, 2014
7d557b7
[SPARK-2470] PEP8 fixes to tests.py
nchammas Jul 20, 2014
24639bc
[SPARK-2470] fix whitespace for doctest
nchammas Jul 21, 2014
22132a4
[SPARK-2470] wrap conditionals in parentheses
nchammas Jul 21, 2014
9127d2b
[SPARK-2470] wrap expression lists in parentheses
nchammas Jul 21, 2014
e178dbe
[SPARK-2470] style - change position of line break
nchammas Jul 21, 2014
cba7768
[SPARK-2470] wrap expression list in parentheses
nchammas Jul 21, 2014
98171af
[SPARK-2470] revert PEP 8 fixes to cloudpickle
nchammas Jul 21, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-2470] PEP8 fixes to context.py
  • Loading branch information
nchammas committed Jul 20, 2014
commit ca2d28b63776b1901afc255f81b59cd532f39fcb
45 changes: 25 additions & 20 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer
PairDeserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
Expand All @@ -50,12 +50,11 @@ class SparkContext(object):
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH

_python_includes = None # zip and egg files that need to be added to PYTHONPATH

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None):
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
either through the named parameters here or through C{conf}.
Expand Down Expand Up @@ -138,8 +137,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
self._accumulatorServer = accumulators._start_update_server()
(host, port) = self._accumulatorServer.server_address
self._javaAccumulator = self._jsc.accumulator(
self._jvm.java.util.ArrayList(),
self._jvm.PythonAccumulatorParam(host, port))
self._jvm.java.util.ArrayList(),
self._jvm.PythonAccumulatorParam(host, port))

self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')

Expand All @@ -165,7 +164,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
(dirname, filename) = os.path.split(path)
self._python_includes.append(filename)
sys.path.append(path)
if not dirname in sys.path:
if dirname not in sys.path:
sys.path.append(dirname)

# Create a temporary directory inside spark.local.dir:
Expand All @@ -192,15 +191,19 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile

if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
if SparkContext._active_spark_context and
SparkContext._active_spark_context != instance:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite

# Raise error if there is already a running Spark context
raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
" created by %s at %s:%s " \
% (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum))
raise ValueError(
"Cannot run multiple SparkContexts at once; "
"existing SparkContext(app=%s, master=%s)"
" created by %s at %s:%s "
% (currentAppName, currentMaster,
callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance

Expand Down Expand Up @@ -290,7 +293,7 @@ def textFile(self, name, minPartitions=None):
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... testFile.write("Hello world!")
Expand Down Expand Up @@ -584,11 +587,12 @@ def addPyFile(self, path):
HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix

if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
self._python_includes.append(filename)
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
# for tests in local mode
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename))

def setCheckpointDir(self, dirName):
"""
Expand Down Expand Up @@ -649,9 +653,9 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False):
Cancelled

If interruptOnCancel is set to true for the job group, then job cancellation will result
in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
where HDFS may respond to Thread.interrupt() by marking nodes as dead.
in Thread.interrupt() being called on the job's executor threads. This is useful to help
ensure that the tasks are actually stopped in a timely manner, but is off by default due
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
"""
self._jsc.setJobGroup(groupId, description, interruptOnCancel)

Expand Down Expand Up @@ -688,7 +692,7 @@ def cancelAllJobs(self):
"""
self._jsc.sc().cancelAllJobs()

def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
"""
Executes the given partitionFunc on the specified set of partitions,
returning the result as an array of elements.
Expand All @@ -703,7 +707,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]
"""
if partitions == None:
if partitions is None:
partitions = range(rdd._jrdd.partitions().size())
javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client)

Expand All @@ -714,6 +718,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))


def _test():
import atexit
import doctest
Expand Down