diff --git a/bson/__init__.py b/bson/__init__.py index 3cc1fd26c6..bf9f22a560 100644 --- a/bson/__init__.py +++ b/bson/__init__.py @@ -509,6 +509,26 @@ def decode(self, as_class=dict, tz_aware=False): return document +def dumps(document): + """Decode a bson document into its :class:`dict` representation. + + This interface mirrors that of the json module. + :Parameters: + - `document`: mapping type representing a document + """ + return BSON.encode(document) + + +def loads(bsondoc): + """Encode a :class:`dict` document into its BSON data representation. + + This interface mirrors that of the json module. + :Parameters: + - `bsondoc`: string representing a bson document + """ + return BSON(bsondoc).decode() + + def has_c(): """Is the C extension installed? diff --git a/pymongo/connection.py b/pymongo/connection.py index 982c4fa6fa..4bd7897ba9 100644 --- a/pymongo/connection.py +++ b/pymongo/connection.py @@ -38,7 +38,7 @@ import select import socket import struct -import threading +import thread import time import warnings @@ -60,6 +60,21 @@ _CONNECT_TIMEOUT = 20.0 +try: + from greenlet import greenlet +except ImportError: + def _thread_identifier(): + """Return the identifier of the current thread-of-execution.""" + return os.getpid(), thread.get_ident() +else: + def _thread_identifier(): + """Return the identifier of the current thread-of-execution. + Supports greenlets. + """ + return os.getpid(), thread.get_ident(), greenlet.getcurrent() + + + def _closed(sock): """Return True if we know socket has been closed, False otherwise. """ @@ -84,25 +99,20 @@ def _partition_node(node): return host, port -class _Pool(threading.local): + +class _Pool(object): """A simple connection pool. - Uses thread-local socket per thread. By calling return_socket() a - thread can return a socket to the pool. + Uses thread-local socket per thread (including greenlets). + By calling return_socket() a thread can return a socket to the pool. """ - # Non thread-locals - __slots__ = ["sockets", "socket_factory", "pool_size", "pid"] - - # thread-local default - sock = None - def __init__(self, pool_size, network_timeout): self.pid = os.getpid() self.pool_size = pool_size self.network_timeout = network_timeout - if not hasattr(self, "sockets"): - self.sockets = [] + self.sockets = [] + self.active_sockets = {} def connect(self, host, port): """Connect to Mongo and return a new (connected) socket. @@ -126,36 +136,30 @@ def connect(self, host, port): return s def get_socket(self, host, port): - # We use the pid here to avoid issues with fork / multiprocessing. + # We use the _thread_identifier here to avoid issues with multiple + # threads of execution (processes, proper threads, greenlets) # See test.test_connection:TestConnection.test_fork for an example of # what could go wrong otherwise - pid = os.getpid() - - if pid != self.pid: - self.sock = None - self.sockets = [] - self.pid = pid - - if self.sock is not None and self.sock[0] == pid: - return self.sock[1] + sock_id = _thread_identifier() try: - self.sock = (pid, self.sockets.pop()) - except IndexError: - self.sock = (pid, self.connect(host, port)) - - return self.sock[1] + sock = self.active_sockets[sock_id] + except KeyError: + try: + sock = self.sockets.pop() + except IndexError: + sock = self.connect(host, port) + self.active_sockets[sock_id] = sock + return sock def return_socket(self): - if self.sock is not None and self.sock[0] == os.getpid(): + sock = self.active_sockets.pop(_thread_identifier(), None) + if sock is not None: # There's a race condition here, but we deliberately # ignore it. It means that if the pool_size is 10 we # might actually keep slightly more than that. if len(self.sockets) < self.pool_size: - self.sockets.append(self.sock[1]) - else: - self.sock[1].close() - self.sock = None + self.sockets.append(sock) class Connection(common.BaseObject): # TODO support auth for pooling @@ -963,7 +967,7 @@ def unlock(self): .. versionadded:: 1.11+ """ - self.admin['$cmd'].sys.unlock.find_one() + self.admin['$cmd'].sys.unlock.find_one() def __iter__(self): return self diff --git a/test/test_bson.py b/test/test_bson.py index 763b7dd95e..edab93d7a1 100644 --- a/test/test_bson.py +++ b/test/test_bson.py @@ -380,5 +380,62 @@ def test_ordered_dict(self): d = OrderedDict([("one", 1), ("two", 2), ("three", 3), ("four", 4)]) self.assertEqual(d, BSON.encode(d).decode(as_class=OrderedDict)) + def test_loads_dumps(self): + + def _test_conversion(dictrep, bsonrep): + self.assertEqual(bson.dumps(dictrep), bsonrep) + self.assertEqual(bson.loads(bsonrep), dictrep) + self.assertEqual(bson.loads(bson.dumps(dictrep)), dictrep) + self.assertEqual(bson.dumps(bson.loads(bsonrep)), bsonrep) + + _test_conversion({}, "\x05\x00\x00\x00\x00") + _test_conversion({"test": u"hello world"}, + "\x1B\x00\x00\x00\x02\x74\x65\x73\x74\x00\x0C\x00\x00" + "\x00\x68\x65\x6C\x6C\x6F\x20\x77\x6F\x72\x6C\x64\x00" + "\x00") + _test_conversion({u"mike": 100}, + "\x0F\x00\x00\x00\x10\x6D\x69\x6B\x65\x00\x64\x00\x00" + "\x00\x00") + _test_conversion({"hello": 1.5}, + "\x14\x00\x00\x00\x01\x68\x65\x6C\x6C\x6F\x00\x00\x00" + "\x00\x00\x00\x00\xF8\x3F\x00") + _test_conversion({"true": True}, + "\x0C\x00\x00\x00\x08\x74\x72\x75\x65\x00\x01\x00") + _test_conversion({"false": False}, + "\x0D\x00\x00\x00\x08\x66\x61\x6C\x73\x65\x00\x00" + "\x00") + _test_conversion({"empty": []}, + "\x11\x00\x00\x00\x04\x65\x6D\x70\x74\x79\x00\x05\x00" + "\x00\x00\x00\x00") + _test_conversion({"none": {}}, + "\x10\x00\x00\x00\x03\x6E\x6F\x6E\x65\x00\x05\x00\x00" + "\x00\x00\x00") + _test_conversion({"test": Binary("test", 0)}, + "\x14\x00\x00\x00\x05\x74\x65\x73\x74\x00\x04\x00\x00" + "\x00\x00\x74\x65\x73\x74\x00") + _test_conversion({"test": Binary("test")}, + "\x18\x00\x00\x00\x05\x74\x65\x73\x74\x00\x08\x00\x00" + "\x00\x02\x04\x00\x00\x00\x74\x65\x73\x74\x00") + _test_conversion({"test": Binary("test", 128)}, + "\x14\x00\x00\x00\x05\x74\x65\x73\x74\x00\x04\x00\x00" + "\x00\x80\x74\x65\x73\x74\x00") + _test_conversion({"test": None}, + "\x0B\x00\x00\x00\x0A\x74\x65\x73\x74\x00\x00") + _test_conversion({"date": datetime.datetime(2007, 1, 8, 0, 30, 11)}, + "\x13\x00\x00\x00\x09\x64\x61\x74\x65\x00\x38\xBE\x1C" + "\xFF\x0F\x01\x00\x00\x00") + _test_conversion({"$where": Code("test")}, + "\x1F\x00\x00\x00\x0F\x24\x77\x68\x65\x72\x65\x00\x12" + "\x00\x00\x00\x05\x00\x00\x00\x74\x65\x73\x74\x00\x05" + "\x00\x00\x00\x00\x00") + a = ObjectId("\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B") + _test_conversion({"oid": a}, + "\x16\x00\x00\x00\x07\x6F\x69\x64\x00\x00\x01\x02\x03" + "\x04\x05\x06\x07\x08\x09\x0A\x0B\x00") + _test_conversion({"ref": DBRef("coll", a)}, + "\x2F\x00\x00\x00\x03ref\x00\x25\x00\x00\x00\x02$ref" + "\x00\x05\x00\x00\x00coll\x00\x07$id\x00\x00\x01\x02" + "\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x00\x00") + if __name__ == "__main__": unittest.main() diff --git a/test/test_pooling.py b/test/test_pooling.py index a0ead2760e..7d3dd1bb78 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -159,13 +159,16 @@ def test_no_disconnect(self): def test_simple_disconnect(self): self.c.test.stuff.find() self.assertEqual(0, len(self.c._Connection__pool.sockets)) - self.assertNotEqual(None, self.c._Connection__pool.sock) + if hasattr(self.c._Connection__pool, 'sock'): + self.assertNotEqual(None, self.c._Connection__pool.sock) self.c.end_request() self.assertEqual(1, len(self.c._Connection__pool.sockets)) - self.assertEqual(None, self.c._Connection__pool.sock) + if hasattr(self.c._Connection__pool, 'sock'): + self.assertEqual(None, self.c._Connection__pool.sock) self.c.disconnect() self.assertEqual(0, len(self.c._Connection__pool.sockets)) - self.assertEqual(None, self.c._Connection__pool.sock) + if hasattr(self.c._Connection__pool, 'sock'): + self.assertEqual(None, self.c._Connection__pool.sock) def test_disconnect(self): run_cases(self, [SaveAndFind, Disconnect, Unique]) @@ -271,6 +274,43 @@ def loop(pipe): self.assert_(b_sock != c_sock) self.assertEqual(a_sock, a._Connection__pool.get_socket(a.host, a.port)) + def test_pool_with_greenlets(self): + try: + from greenlet import greenlet + except ImportError: + raise SkipTest() + + c = get_connection() + c.test.test.find_one() + c.end_request() + self.assertEqual(1, len(c._Connection__pool.sockets)) + a_sock = c._Connection__pool.sockets[0] + + def loop(name, pipe): + c.test.test.find_one() + self.assertEqual(0, len(c._Connection__pool.sockets)) + greenlet.getcurrent().parent.switch() + c.end_request() + pipe.append(c._Connection__pool.sockets[-1]) + + ga1 = [] + ga2 = [] + + g1 = greenlet(loop) + g2 = greenlet(loop) + + g1.switch('g1', ga1) + g2.switch('g2', ga2) + g1.switch() + g2.switch() + + b_sock = ga1[0] + c_sock = ga2[0] + self.assert_(a_sock is b_sock) + self.assert_(a_sock is not c_sock) + self.assert_(b_sock is not c_sock) + + def test_max_pool_size(self): c = get_connection(max_pool_size=4)