2323import time
2424
2525from nose .plugins .skip import SkipTest
26+
27+ from bson .son import SON
2628from pymongo import MongoClient , MongoReplicaSetClient
2729from pymongo .errors import AutoReconnect , ConnectionFailure , OperationFailure
2830from pymongo .pool import NO_REQUEST , NO_SOCKET_YET , SocketInfo
@@ -588,15 +590,6 @@ def test_max_bson_size(self):
588590 c .max_message_size )
589591
590592
591- def collect_until (fn ):
592- start = time .time ()
593- while not fn ():
594- if (time .time () - start ) > 5 :
595- raise AssertionError ("timed out" )
596-
597- gc .collect ()
598-
599-
600593class _TestExhaustCursorMixin (object ):
601594 """Test that clients properly handle errors from exhaust cursors.
602595
@@ -609,15 +602,18 @@ def test_exhaust_query_server_error(self):
609602 client = self ._get_client (max_pool_size = 1 )
610603 if is_mongos (client ):
611604 raise SkipTest ("Can't use exhaust cursors with mongos" )
605+ if not version .at_least (client , (2 , 2 , 0 )):
606+ raise SkipTest ("mongod < 2.2.0 closes exhaust socket on error" )
612607
613608 collection = client .pymongo_test .test
614609 pool = get_pool (client )
615610
616611 sock_info = one (pool .sockets )
617- cursor = collection .find ({'$bad_query_operator' : 1 }, exhaust = True )
612+ # This will cause OperationFailure in all mongo versions since
613+ # the value for $orderby must be a document.
614+ cursor = collection .find (
615+ SON ([('$query' , {}), ('$orderby' , True )]), exhaust = True )
618616 self .assertRaises (OperationFailure , cursor .next )
619- del cursor
620- collect_until (lambda : sock_info in pool .sockets )
621617 self .assertFalse (sock_info .closed )
622618
623619 # The semaphore was decremented despite the error.
@@ -639,7 +635,7 @@ def test_exhaust_getmore_server_error(self):
639635
640636 # Enough data to ensure it streams down for a few milliseconds.
641637 long_str = 'a' * (256 * 1024 )
642- collection .insert ([{'a' : long_str } for _ in range (1000 )])
638+ collection .insert ([{'a' : long_str } for _ in range (200 )])
643639
644640 pool = get_pool (client )
645641 pool ._check_interval_seconds = None # Never check.
@@ -653,12 +649,9 @@ def test_exhaust_getmore_server_error(self):
653649 # Cause a server error on getmore.
654650 client2 .pymongo_test .test .drop ()
655651 self .assertRaises (OperationFailure , list , cursor )
656- del cursor
657- collect_until (lambda : sock_info .closed )
658- self .assertFalse (sock_info in pool .sockets )
659652
660- # The semaphore was decremented despite the error.
661- self .assertTrue ( pool . _socket_semaphore . acquire ( blocking = False ))
653+ # Make sure the socket is still valid
654+ self .assertEqual ( 0 , collection . count ( ))
662655
663656 def test_exhaust_query_network_error (self ):
664657 # When doing an exhaust query, the socket stays checked out on success
0 commit comments