Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
85a8718
[SPARK-11394][SQL] Throw IllegalArgumentException for unsupported typ…
maropu Dec 29, 2015
c069ffc
[SPARK-12526][SPARKR] ifelse`, `when`, `otherwise` unable to take Col…
saurfang Dec 29, 2015
8dc6549
[SPARK-12300] [SQL] [PYSPARK] fix schema inferance on local collections
holdenk Dec 30, 2015
cd86075
[SPARK-12399] Display correct error message when accessing REST API w…
carsonwang Dec 30, 2015
4e9dd16
[SPARK-12327][SPARKR] fix code for lintr warning for commented code
felixcheung Jan 3, 2016
f7a3223
[SPARK-12562][SQL] DataFrame.write.format(text) requires the column n…
xguo27 Jan 4, 2016
cd02038
[SPARK-12486] Worker should kill the executors more forcefully if pos…
nongli Jan 4, 2016
b5a1f56
[SPARK-12470] [SQL] Fix size reduction calculation
robbinspg Jan 4, 2016
7f37c1e
[SPARK-12579][SQL] Force user-specified JDBC driver to take precedence
JoshRosen Jan 4, 2016
1005ee3
[DOC] Adjust coverage for partitionBy()
tedyu Jan 4, 2016
8ac9198
[SPARK-12589][SQL] Fix UnsafeRowParquetRecordReader to properly set t…
nongli Jan 4, 2016
6f4a224
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jan 4, 2016
8950482
[SPARKR][DOC] minor doc update for version in migration guide
felixcheung Jan 5, 2016
d9e4438
[SPARK-12568][SQL] Add BINARY to Encoders
marmbrus Jan 5, 2016
5afa62b
[SPARK-12647][SQL] Fix o.a.s.sqlexecution.ExchangeCoordinatorSuite.de…
robbinspg Jan 5, 2016
f31d0fd
[SPARK-12617] [PYSPARK] Clean up the leak sockets of Py4J
zsxwing Jan 5, 2016
83fe5cf
[SPARK-12511] [PYSPARK] [STREAMING] Make sure PythonDStream.registerS…
zsxwing Jan 5, 2016
0afad66
[SPARK-12450][MLLIB] Un-persist broadcasted variables in KMeans
rnowling Jan 5, 2016
bf3dca2
[SPARK-12453][STREAMING] Remove explicit dependency on aws-java-sdk
BrianLondon Jan 5, 2016
c3135d0
[SPARK-12393][SPARKR] Add read.text and write.text for SparkR
yanboliang Jan 6, 2016
1756819
[SPARK-12006][ML][PYTHON] Fix GMM failure if initialModel is not None
zero323 Jan 6, 2016
d821fae
[SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming
zsxwing Jan 6, 2016
8f0ead3
[SPARK-12672][STREAMING][UI] Use the uiRoot function instead of defau…
SaintBacchus Jan 6, 2016
39b0a34
Revert "[SPARK-12672][STREAMING][UI] Use the uiRoot function instead …
zsxwing Jan 6, 2016
11b901b
[SPARK-12016] [MLLIB] [PYSPARK] Wrap Word2VecModel when loading it in…
viirya Dec 14, 2015
94af69c
[SPARK-12673][UI] Add missing uri prepending for job description
jerryshao Jan 7, 2016
d061b85
[SPARK-12678][CORE] MapPartitionsRDD clearDependencies
gpoulin Jan 7, 2016
34effc4
Revert "[SPARK-12006][ML][PYTHON] Fix GMM failure if initialModel is …
yhuai Jan 7, 2016
47a58c7
[DOC] fix 'spark.memory.offHeap.enabled' default value to false
zzcclp Jan 7, 2016
13895cb
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jan 7, 2016
69a885a
[SPARK-12006][ML][PYTHON] Fix GMM failure if initialModel is not None
zero323 Jan 7, 2016
017b73e
[SPARK-12662][SQL] Fix DataFrame.randomSplit to avoid creating overla…
sameeragarwal Jan 7, 2016
6ef8235
[SPARK-12598][CORE] bug in setMinPartitions
datafarmer Jan 7, 2016
a7c3636
[SPARK-12507][STREAMING][DOCUMENT] Expose closeFileAfterWrite and all…
zsxwing Jan 8, 2016
0d96c54
[SPARK-12591][STREAMING] Register OpenHashMapBasedStateMap for Kryo (…
zsxwing Jan 8, 2016
a77a7c5
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jan 8, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-12511] [PYSPARK] [STREAMING] Make sure PythonDStream.registerS…
…erializer is called only once

There is an issue that Py4J's PythonProxyHandler.finalize blocks forever. (py4j/py4j#184)

Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when calling "registerSerializer". If we call "registerSerializer" twice, the second PythonProxyHandler will override the first one, then the first one will be GCed and trigger "PythonProxyHandler.finalize". To avoid that, we should not call"registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't be GCed.

Author: Shixiong Zhu <[email protected]>

Closes apache#10514 from zsxwing/SPARK-12511.

(cherry picked from commit 6cfe341)
Signed-off-by: Davies Liu <[email protected]>
  • Loading branch information
zsxwing authored and davies committed Jan 5, 2016
commit 83fe5cf9a2621d7e53b5792a7c7549c9da7f130a
33 changes: 25 additions & 8 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,28 @@ def _ensure_initialized(cls):

# register serializer for TransformFunction
# it happens before creating SparkContext when loading from checkpointing
cls._transformerSerializer = TransformFunctionSerializer(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
if cls._transformerSerializer is None:
transformer_serializer = TransformFunctionSerializer()
transformer_serializer.init(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
# SPARK-12511 streaming driver with checkpointing unable to finalize leading to OOM
# There is an issue that Py4J's PythonProxyHandler.finalize blocks forever.
# (https://github.com/bartdag/py4j/pull/184)
#
# Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when
# calling "registerSerializer". If we call "registerSerializer" twice, the second
# PythonProxyHandler will override the first one, then the first one will be GCed and
# trigger "PythonProxyHandler.finalize". To avoid that, we should not call
# "registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't
# be GCed.
#
# TODO Once Py4J fixes this issue, we should upgrade Py4j to the latest version.
transformer_serializer.gateway.jvm.PythonDStream.registerSerializer(
transformer_serializer)
cls._transformerSerializer = transformer_serializer
else:
cls._transformerSerializer.init(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)

@classmethod
def getOrCreate(cls, checkpointPath, setupFunc):
Expand All @@ -116,16 +136,13 @@ def getOrCreate(cls, checkpointPath, setupFunc):
gw = SparkContext._gateway

# Check whether valid checkpoint information exists in the given path
if gw.jvm.CheckpointReader.read(checkpointPath).isEmpty():
ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath)
if ssc_option.isEmpty():
ssc = setupFunc()
ssc.checkpoint(checkpointPath)
return ssc

try:
jssc = gw.jvm.JavaStreamingContext(checkpointPath)
except Exception:
print("failed to load StreamingContext from checkpoint", file=sys.stderr)
raise
jssc = gw.jvm.JavaStreamingContext(ssc_option.get())

# If there is already an active instance of Python SparkContext use it, or create a new one
if not SparkContext._active_spark_context:
Expand Down
3 changes: 1 addition & 2 deletions python/pyspark/streaming/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,10 @@ class TransformFunctionSerializer(object):
it uses this class to invoke Python, which returns the serialized function
as a byte array.
"""
def __init__(self, ctx, serializer, gateway=None):
def init(self, ctx, serializer, gateway=None):
self.ctx = ctx
self.serializer = serializer
self.gateway = gateway or self.ctx._gateway
self.gateway.jvm.PythonDStream.registerSerializer(self)
self.failure = None

def dumps(self, id):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,3 +897,15 @@ object StreamingContext extends Logging {
result
}
}

private class StreamingContextPythonHelper {

/**
* This is a private method only for Python to implement `getOrCreate`.
*/
def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = {
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false)
checkpointOption.map(new StreamingContext(null, _, null))
}
}