3535from bson .son import SON
3636from bson .tz_util import utc
3737import pymongo
38- from pymongo import auth , message
38+ from pymongo import auth , message , monitoring
3939from pymongo .common import CONNECT_TIMEOUT , _UUID_REPRESENTATIONS
4040from pymongo .command_cursor import CommandCursor
4141from pymongo .compression_support import _HAVE_SNAPPY , _HAVE_ZSTD
5858from pymongo .pool import SocketInfo , _METADATA
5959from pymongo .read_preferences import ReadPreference
6060from pymongo .server_description import ServerDescription
61- from pymongo .server_selectors import (any_server_selector ,
61+ from pymongo .server_selectors import (readable_server_selector ,
6262 writable_server_selector )
6363from pymongo .server_type import SERVER_TYPE
6464from pymongo .settings import TOPOLOGY_TYPE
7676from test .pymongo_mocks import MockClient
7777from test .utils import (assertRaisesExactly ,
7878 connected ,
79+ CMAPListener ,
7980 delay ,
8081 FunctionCallRecorder ,
8182 get_pool ,
@@ -452,21 +453,25 @@ def test_uri_security_options(self):
452453
453454class TestClient (IntegrationTest ):
454455
455- def test_max_idle_time_reaper (self ):
456+ def test_max_idle_time_reaper_default (self ):
456457 with client_knobs (kill_cursor_frequency = 0.1 ):
457458 # Assert reaper doesn't remove sockets when maxIdleTimeMS not set
458459 client = rs_or_single_client ()
459- server = client ._get_topology ().select_server (any_server_selector )
460+ server = client ._get_topology ().select_server (
461+ readable_server_selector )
460462 with server ._pool .get_socket ({}) as sock_info :
461463 pass
462464 self .assertEqual (1 , len (server ._pool .sockets ))
463465 self .assertTrue (sock_info in server ._pool .sockets )
464466 client .close ()
465467
468+ def test_max_idle_time_reaper_removes_stale_minPoolSize (self ):
469+ with client_knobs (kill_cursor_frequency = 0.1 ):
466470 # Assert reaper removes idle socket and replaces it with a new one
467471 client = rs_or_single_client (maxIdleTimeMS = 500 ,
468472 minPoolSize = 1 )
469- server = client ._get_topology ().select_server (any_server_selector )
473+ server = client ._get_topology ().select_server (
474+ readable_server_selector )
470475 with server ._pool .get_socket ({}) as sock_info :
471476 pass
472477 # When the reaper runs at the same time as the get_socket, two
@@ -478,11 +483,14 @@ def test_max_idle_time_reaper(self):
478483 "replace stale socket" )
479484 client .close ()
480485
486+ def test_max_idle_time_reaper_does_not_exceed_maxPoolSize (self ):
487+ with client_knobs (kill_cursor_frequency = 0.1 ):
481488 # Assert reaper respects maxPoolSize when adding new sockets.
482489 client = rs_or_single_client (maxIdleTimeMS = 500 ,
483490 minPoolSize = 1 ,
484491 maxPoolSize = 1 )
485- server = client ._get_topology ().select_server (any_server_selector )
492+ server = client ._get_topology ().select_server (
493+ readable_server_selector )
486494 with server ._pool .get_socket ({}) as sock_info :
487495 pass
488496 # When the reaper runs at the same time as the get_socket,
@@ -494,9 +502,12 @@ def test_max_idle_time_reaper(self):
494502 "replace stale socket" )
495503 client .close ()
496504
505+ def test_max_idle_time_reaper_removes_stale (self ):
506+ with client_knobs (kill_cursor_frequency = 0.1 ):
497507 # Assert reaper has removed idle socket and NOT replaced it
498508 client = rs_or_single_client (maxIdleTimeMS = 500 )
499- server = client ._get_topology ().select_server (any_server_selector )
509+ server = client ._get_topology ().select_server (
510+ readable_server_selector )
500511 with server ._pool .get_socket ({}) as sock_info_one :
501512 pass
502513 # Assert that the pool does not close sockets prematurely.
@@ -512,12 +523,14 @@ def test_max_idle_time_reaper(self):
512523 def test_min_pool_size (self ):
513524 with client_knobs (kill_cursor_frequency = .1 ):
514525 client = rs_or_single_client ()
515- server = client ._get_topology ().select_server (any_server_selector )
526+ server = client ._get_topology ().select_server (
527+ readable_server_selector )
516528 self .assertEqual (0 , len (server ._pool .sockets ))
517529
518530 # Assert that pool started up at minPoolSize
519531 client = rs_or_single_client (minPoolSize = 10 )
520- server = client ._get_topology ().select_server (any_server_selector )
532+ server = client ._get_topology ().select_server (
533+ readable_server_selector )
521534 wait_until (lambda : 10 == len (server ._pool .sockets ),
522535 "pool initialized with 10 sockets" )
523536
@@ -532,7 +545,8 @@ def test_max_idle_time_checkout(self):
532545 # Use high frequency to test _get_socket_no_auth.
533546 with client_knobs (kill_cursor_frequency = 99999999 ):
534547 client = rs_or_single_client (maxIdleTimeMS = 500 )
535- server = client ._get_topology ().select_server (any_server_selector )
548+ server = client ._get_topology ().select_server (
549+ readable_server_selector )
536550 with server ._pool .get_socket ({}) as sock_info :
537551 pass
538552 self .assertEqual (1 , len (server ._pool .sockets ))
@@ -546,7 +560,8 @@ def test_max_idle_time_checkout(self):
546560
547561 # Test that sockets are reused if maxIdleTimeMS is not set.
548562 client = rs_or_single_client ()
549- server = client ._get_topology ().select_server (any_server_selector )
563+ server = client ._get_topology ().select_server (
564+ readable_server_selector )
550565 with server ._pool .get_socket ({}) as sock_info :
551566 pass
552567 self .assertEqual (1 , len (server ._pool .sockets ))
@@ -2008,5 +2023,60 @@ def timeout_task():
20082023 self .assertIsNone (ct .get ())
20092024
20102025
2026+ class TestClientPool (MockClientTest ):
2027+
2028+ def test_rs_client_does_not_maintain_pool_to_arbiters (self ):
2029+ listener = CMAPListener ()
2030+ c = MockClient (
2031+ standalones = [],
2032+ members = ['a:1' , 'b:2' , 'c:3' , 'd:4' ],
2033+ mongoses = [],
2034+ arbiters = ['c:3' ], # c:3 is an arbiter.
2035+ down_hosts = ['d:4' ], # d:4 is unreachable.
2036+ host = ['a:1' , 'b:2' , 'c:3' , 'd:4' ],
2037+ replicaSet = 'rs' ,
2038+ minPoolSize = 1 , # minPoolSize
2039+ event_listeners = [listener ],
2040+ )
2041+ self .addCleanup (c .close )
2042+
2043+ wait_until (lambda : len (c .nodes ) == 3 , 'connect' )
2044+ self .assertEqual (c .address , ('a' , 1 ))
2045+ self .assertEqual (c .arbiters , set ([('c' , 3 )]))
2046+ # Assert that we create 2 and only 2 pooled connections.
2047+ listener .wait_for_event (monitoring .ConnectionReadyEvent , 2 )
2048+ self .assertEqual (
2049+ listener .event_count (monitoring .ConnectionCreatedEvent ), 2 )
2050+ # Assert that we do not create connections to arbiters.
2051+ arbiter = c ._topology .get_server_by_address (('c' , 3 ))
2052+ self .assertFalse (arbiter .pool .sockets )
2053+ # Assert that we do not create connections to unknown servers.
2054+ arbiter = c ._topology .get_server_by_address (('d' , 4 ))
2055+ self .assertFalse (arbiter .pool .sockets )
2056+
2057+ def test_direct_client_maintains_pool_to_arbiter (self ):
2058+ listener = CMAPListener ()
2059+ c = MockClient (
2060+ standalones = [],
2061+ members = ['a:1' , 'b:2' , 'c:3' ],
2062+ mongoses = [],
2063+ arbiters = ['c:3' ], # c:3 is an arbiter.
2064+ host = 'c:3' ,
2065+ directConnection = True ,
2066+ minPoolSize = 1 , # minPoolSize
2067+ event_listeners = [listener ],
2068+ )
2069+ self .addCleanup (c .close )
2070+
2071+ wait_until (lambda : len (c .nodes ) == 1 , 'connect' )
2072+ self .assertEqual (c .address , ('c' , 3 ))
2073+ # Assert that we create 1 pooled connection.
2074+ listener .wait_for_event (monitoring .ConnectionReadyEvent , 1 )
2075+ self .assertEqual (
2076+ listener .event_count (monitoring .ConnectionCreatedEvent ), 1 )
2077+ arbiter = c ._topology .get_server_by_address (('c' , 3 ))
2078+ self .assertEqual (len (arbiter .pool .sockets ), 1 )
2079+
2080+
20112081if __name__ == "__main__" :
20122082 unittest .main ()
0 commit comments