Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
85a8718
[SPARK-11394][SQL] Throw IllegalArgumentException for unsupported typ…
maropu Dec 29, 2015
c069ffc
[SPARK-12526][SPARKR] ifelse`, `when`, `otherwise` unable to take Col…
saurfang Dec 29, 2015
8dc6549
[SPARK-12300] [SQL] [PYSPARK] fix schema inferance on local collections
holdenk Dec 30, 2015
cd86075
[SPARK-12399] Display correct error message when accessing REST API w…
carsonwang Dec 30, 2015
4e9dd16
[SPARK-12327][SPARKR] fix code for lintr warning for commented code
felixcheung Jan 3, 2016
f7a3223
[SPARK-12562][SQL] DataFrame.write.format(text) requires the column n…
xguo27 Jan 4, 2016
cd02038
[SPARK-12486] Worker should kill the executors more forcefully if pos…
nongli Jan 4, 2016
b5a1f56
[SPARK-12470] [SQL] Fix size reduction calculation
robbinspg Jan 4, 2016
7f37c1e
[SPARK-12579][SQL] Force user-specified JDBC driver to take precedence
JoshRosen Jan 4, 2016
1005ee3
[DOC] Adjust coverage for partitionBy()
tedyu Jan 4, 2016
8ac9198
[SPARK-12589][SQL] Fix UnsafeRowParquetRecordReader to properly set t…
nongli Jan 4, 2016
6f4a224
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jan 4, 2016
8950482
[SPARKR][DOC] minor doc update for version in migration guide
felixcheung Jan 5, 2016
d9e4438
[SPARK-12568][SQL] Add BINARY to Encoders
marmbrus Jan 5, 2016
5afa62b
[SPARK-12647][SQL] Fix o.a.s.sqlexecution.ExchangeCoordinatorSuite.de…
robbinspg Jan 5, 2016
f31d0fd
[SPARK-12617] [PYSPARK] Clean up the leak sockets of Py4J
zsxwing Jan 5, 2016
83fe5cf
[SPARK-12511] [PYSPARK] [STREAMING] Make sure PythonDStream.registerS…
zsxwing Jan 5, 2016
0afad66
[SPARK-12450][MLLIB] Un-persist broadcasted variables in KMeans
rnowling Jan 5, 2016
bf3dca2
[SPARK-12453][STREAMING] Remove explicit dependency on aws-java-sdk
BrianLondon Jan 5, 2016
c3135d0
[SPARK-12393][SPARKR] Add read.text and write.text for SparkR
yanboliang Jan 6, 2016
1756819
[SPARK-12006][ML][PYTHON] Fix GMM failure if initialModel is not None
zero323 Jan 6, 2016
d821fae
[SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming
zsxwing Jan 6, 2016
8f0ead3
[SPARK-12672][STREAMING][UI] Use the uiRoot function instead of defau…
SaintBacchus Jan 6, 2016
39b0a34
Revert "[SPARK-12672][STREAMING][UI] Use the uiRoot function instead …
zsxwing Jan 6, 2016
11b901b
[SPARK-12016] [MLLIB] [PYSPARK] Wrap Word2VecModel when loading it in…
viirya Dec 14, 2015
94af69c
[SPARK-12673][UI] Add missing uri prepending for job description
jerryshao Jan 7, 2016
d061b85
[SPARK-12678][CORE] MapPartitionsRDD clearDependencies
gpoulin Jan 7, 2016
34effc4
Revert "[SPARK-12006][ML][PYTHON] Fix GMM failure if initialModel is …
yhuai Jan 7, 2016
47a58c7
[DOC] fix 'spark.memory.offHeap.enabled' default value to false
zzcclp Jan 7, 2016
13895cb
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jan 7, 2016
69a885a
[SPARK-12006][ML][PYTHON] Fix GMM failure if initialModel is not None
zero323 Jan 7, 2016
017b73e
[SPARK-12662][SQL] Fix DataFrame.randomSplit to avoid creating overla…
sameeragarwal Jan 7, 2016
6ef8235
[SPARK-12598][CORE] bug in setMinPartitions
datafarmer Jan 7, 2016
a7c3636
[SPARK-12507][STREAMING][DOCUMENT] Expose closeFileAfterWrite and all…
zsxwing Jan 8, 2016
0d96c54
[SPARK-12591][STREAMING] Register OpenHashMapBasedStateMap for Kryo (…
zsxwing Jan 8, 2016
a77a7c5
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jan 8, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-12617] [PYSPARK] Clean up the leak sockets of Py4J
This patch added Py4jCallbackConnectionCleaner to clean the leak sockets of Py4J every 30 seconds. This is a workaround before Py4J fixes the leak issue py4j/py4j#187

Author: Shixiong Zhu <[email protected]>

Closes apache#10579 from zsxwing/SPARK-12617.

(cherry picked from commit 047a31b)
Signed-off-by: Davies Liu <[email protected]>
  • Loading branch information
zsxwing authored and davies committed Jan 5, 2016
commit f31d0fd9ea12bfe94434671fbcfe3d0e06a4a97d
61 changes: 61 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,64 @@
}


class Py4jCallbackConnectionCleaner(object):

"""
A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
It will scan all callback connections every 30 seconds and close the dead connections.
"""

def __init__(self, gateway):
self._gateway = gateway
self._stopped = False
self._timer = None
self._lock = RLock()

def start(self):
if self._stopped:
return

def clean_closed_connections():
from py4j.java_gateway import quiet_close, quiet_shutdown

callback_server = self._gateway._callback_server
with callback_server.lock:
try:
closed_connections = []
for connection in callback_server.connections:
if not connection.isAlive():
quiet_close(connection.input)
quiet_shutdown(connection.socket)
quiet_close(connection.socket)
closed_connections.append(connection)

for closed_connection in closed_connections:
callback_server.connections.remove(closed_connection)
except Exception:
import traceback
traceback.print_exc()

self._start_timer(clean_closed_connections)

self._start_timer(clean_closed_connections)

def _start_timer(self, f):
from threading import Timer

with self._lock:
if not self._stopped:
self._timer = Timer(30.0, f)
self._timer.daemon = True
self._timer.start()

def stop(self):
with self._lock:
self._stopped = True
if self._timer:
self._timer.cancel()
self._timer = None


class SparkContext(object):

"""
Expand All @@ -68,6 +126,7 @@ class SparkContext(object):
_active_spark_context = None
_lock = RLock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
_py4j_cleaner = None

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')

Expand Down Expand Up @@ -244,6 +303,8 @@ def _ensure_initialized(cls, instance=None, gateway=None):
if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
_py4j_cleaner = Py4jCallbackConnectionCleaner(SparkContext._gateway)
_py4j_cleaner.start()

if instance:
if (SparkContext._active_spark_context and
Expand Down