Skip to content

Commit 6aeba5a

Browse files
committed
Implemented alternative socket pooling to support greenlets (gevent and eventlet). This is entirely based on mgood's fork, commit 2388960.
1 parent 055e42f commit 6aeba5a

File tree

2 files changed

+79
-35
lines changed

2 files changed

+79
-35
lines changed

pymongo/connection.py

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import select
3939
import socket
4040
import struct
41-
import threading
41+
import thread
4242
import time
4343
import warnings
4444

@@ -60,6 +60,21 @@
6060
_CONNECT_TIMEOUT = 20.0
6161

6262

63+
try:
64+
from greenlet import greenlet
65+
except ImportError:
66+
def _thread_identifier():
67+
"""Return the identifier of the current thread-of-execution."""
68+
return os.getpid(), thread.get_ident()
69+
else:
70+
def _thread_identifier():
71+
"""Return the identifier of the current thread-of-execution.
72+
Supports greenlets.
73+
"""
74+
return os.getpid(), thread.get_ident(), greenlet.getcurrent()
75+
76+
77+
6378
def _closed(sock):
6479
"""Return True if we know socket has been closed, False otherwise.
6580
"""
@@ -84,25 +99,20 @@ def _partition_node(node):
8499
return host, port
85100

86101

87-
class _Pool(threading.local):
102+
103+
class _Pool(object):
88104
"""A simple connection pool.
89105
90-
Uses thread-local socket per thread. By calling return_socket() a
91-
thread can return a socket to the pool.
106+
Uses thread-local socket per thread (including greenlets).
107+
By calling return_socket() a thread can return a socket to the pool.
92108
"""
93109

94-
# Non thread-locals
95-
__slots__ = ["sockets", "pool_size", "pid"]
96-
97-
# thread-local default
98-
sock = None
99-
100110
def __init__(self, pool_size, network_timeout):
101111
self.pid = os.getpid()
102112
self.pool_size = pool_size
103113
self.network_timeout = network_timeout
104-
if not hasattr(self, "sockets"):
105-
self.sockets = []
114+
self.sockets = []
115+
self.active_sockets = {}
106116

107117
def connect(self, host, port):
108118
"""Connect to Mongo and return a new (connected) socket.
@@ -126,36 +136,30 @@ def connect(self, host, port):
126136
return s
127137

128138
def get_socket(self, host, port):
129-
# We use the pid here to avoid issues with fork / multiprocessing.
139+
# We use the _thread_identifier here to avoid issues with multiple
140+
# threads of execution (processes, proper threads, greenlets)
130141
# See test.test_connection:TestConnection.test_fork for an example of
131142
# what could go wrong otherwise
132-
pid = os.getpid()
133-
134-
if pid != self.pid:
135-
self.sock = None
136-
self.sockets = []
137-
self.pid = pid
138-
139-
if self.sock is not None and self.sock[0] == pid:
140-
return self.sock[1]
143+
sock_id = _thread_identifier()
141144

142145
try:
143-
self.sock = (pid, self.sockets.pop())
144-
except IndexError:
145-
self.sock = (pid, self.connect(host, port))
146-
147-
return self.sock[1]
146+
sock = self.active_sockets[sock_id]
147+
except KeyError:
148+
try:
149+
sock = self.sockets.pop()
150+
except IndexError:
151+
sock = self.connect(host, port)
152+
self.active_sockets[sock_id] = sock
153+
return sock
148154

149155
def return_socket(self):
150-
if self.sock is not None and self.sock[0] == os.getpid():
156+
sock = self.active_sockets.pop(_thread_identifier(), None)
157+
if sock is not None:
151158
# There's a race condition here, but we deliberately
152159
# ignore it. It means that if the pool_size is 10 we
153160
# might actually keep slightly more than that.
154161
if len(self.sockets) < self.pool_size:
155-
self.sockets.append(self.sock[1])
156-
else:
157-
self.sock[1].close()
158-
self.sock = None
162+
self.sockets.append(sock)
159163

160164

161165
class Connection(common.BaseObject): # TODO support auth for pooling

test/test_pooling.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,16 @@ def test_no_disconnect(self):
159159
def test_simple_disconnect(self):
160160
self.c.test.stuff.find()
161161
self.assertEqual(0, len(self.c._Connection__pool.sockets))
162-
self.assertNotEqual(None, self.c._Connection__pool.sock)
162+
if hasattr(self.c._Connection__pool, 'sock'):
163+
self.assertNotEqual(None, self.c._Connection__pool.sock)
163164
self.c.end_request()
164165
self.assertEqual(1, len(self.c._Connection__pool.sockets))
165-
self.assertEqual(None, self.c._Connection__pool.sock)
166+
if hasattr(self.c._Connection__pool, 'sock'):
167+
self.assertEqual(None, self.c._Connection__pool.sock)
166168
self.c.disconnect()
167169
self.assertEqual(0, len(self.c._Connection__pool.sockets))
168-
self.assertEqual(None, self.c._Connection__pool.sock)
170+
if hasattr(self.c._Connection__pool, 'sock'):
171+
self.assertEqual(None, self.c._Connection__pool.sock)
169172

170173
def test_disconnect(self):
171174
run_cases(self, [SaveAndFind, Disconnect, Unique])
@@ -271,6 +274,43 @@ def loop(pipe):
271274
self.assert_(b_sock != c_sock)
272275
self.assertEqual(a_sock, a._Connection__pool.get_socket(a.host, a.port))
273276

277+
def test_pool_with_greenlets(self):
278+
try:
279+
from greenlet import greenlet
280+
except ImportError:
281+
raise SkipTest()
282+
283+
c = get_connection()
284+
c.test.test.find_one()
285+
c.end_request()
286+
self.assertEqual(1, len(c._Connection__pool.sockets))
287+
a_sock = c._Connection__pool.sockets[0]
288+
289+
def loop(name, pipe):
290+
c.test.test.find_one()
291+
self.assertEqual(0, len(c._Connection__pool.sockets))
292+
greenlet.getcurrent().parent.switch()
293+
c.end_request()
294+
pipe.append(c._Connection__pool.sockets[-1])
295+
296+
ga1 = []
297+
ga2 = []
298+
299+
g1 = greenlet(loop)
300+
g2 = greenlet(loop)
301+
302+
g1.switch('g1', ga1)
303+
g2.switch('g2', ga2)
304+
g1.switch()
305+
g2.switch()
306+
307+
b_sock = ga1[0]
308+
c_sock = ga2[0]
309+
self.assert_(a_sock is b_sock)
310+
self.assert_(a_sock is not c_sock)
311+
self.assert_(b_sock is not c_sock)
312+
313+
274314
def test_max_pool_size(self):
275315
c = get_connection(max_pool_size=4)
276316

0 commit comments

Comments
 (0)