Skip to content

Commit 4c7718e

Browse files
authored
PYTHON-2634 Only update pools for data-bearing servers (mongodb#590)
Fixes a noisy OperationFailure: Authentication failed error. Do not attempt to create unneeded connections to arbiters, ghosts, hidden members, or unknown members.
1 parent cc029a1 commit 4c7718e

File tree

4 files changed

+124
-24
lines changed

4 files changed

+124
-24
lines changed

pymongo/topology.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,20 +429,30 @@ def request_check_all(self, wait_time=5):
429429
self._request_check_all()
430430
self._condition.wait(wait_time)
431431

432+
def data_bearing_servers(self):
433+
"""Return a list of all data-bearing servers.
434+
435+
This includes any server that might be selected for an operation.
436+
"""
437+
if self._description.topology_type == TOPOLOGY_TYPE.Single:
438+
return self._description.known_servers
439+
return self._description.readable_servers
440+
432441
def update_pool(self, all_credentials):
433442
# Remove any stale sockets and add new sockets if pool is too small.
434443
servers = []
435444
with self._lock:
436-
for server in self._servers.values():
437-
servers.append((server, server._pool.generation))
445+
# Only update pools for data-bearing servers.
446+
for sd in self.data_bearing_servers():
447+
server = self._servers[sd.address]
448+
servers.append((server, server.pool.generation))
438449

439450
for server, generation in servers:
440-
pool = server._pool
441451
try:
442-
pool.remove_stale_sockets(generation, all_credentials)
452+
server.pool.remove_stale_sockets(generation, all_credentials)
443453
except PyMongoError as exc:
444454
ctx = _ErrorContext(exc, 0, generation, False)
445-
self.handle_error(pool.address, ctx)
455+
self.handle_error(server.description.address, ctx)
446456
raise
447457

448458
def close(self):

test/pymongo_mocks.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,13 @@ def _check_once(self):
106106
class MockClient(MongoClient):
107107
def __init__(
108108
self, standalones, members, mongoses, ismaster_hosts=None,
109-
*args, **kwargs):
109+
arbiters=None, down_hosts=None, *args, **kwargs):
110110
"""A MongoClient connected to the default server, with a mock topology.
111111
112-
standalones, members, mongoses determine the configuration of the
113-
topology. They are formatted like ['a:1', 'b:2']. ismaster_hosts
114-
provides an alternative host list for the server's mocked ismaster
115-
response; see test_connect_with_internal_ips.
112+
standalones, members, mongoses, arbiters, and down_hosts determine the
113+
configuration of the topology. They are formatted like ['a:1', 'b:2'].
114+
ismaster_hosts provides an alternative host list for the server's
115+
mocked ismaster response; see test_connect_with_internal_ips.
116116
"""
117117
self.mock_standalones = standalones[:]
118118
self.mock_members = members[:]
@@ -122,6 +122,9 @@ def __init__(
122122
else:
123123
self.mock_primary = None
124124

125+
# Hosts that should be considered an arbiter.
126+
self.mock_arbiters = arbiters[:] if arbiters else []
127+
125128
if ismaster_hosts is not None:
126129
self.mock_ismaster_hosts = ismaster_hosts
127130
else:
@@ -130,7 +133,7 @@ def __init__(
130133
self.mock_mongoses = mongoses[:]
131134

132135
# Hosts that should raise socket errors.
133-
self.mock_down_hosts = []
136+
self.mock_down_hosts = down_hosts[:] if down_hosts else []
134137

135138
# Hostname -> (min wire version, max wire version)
136139
self.mock_wire_versions = {}
@@ -203,6 +206,10 @@ def mock_is_master(self, host):
203206

204207
if self.mock_primary:
205208
response['primary'] = self.mock_primary
209+
210+
if host in self.mock_arbiters:
211+
response['arbiterOnly'] = True
212+
response['secondary'] = False
206213
elif host in self.mock_mongoses:
207214
response = {
208215
'ok': 1,

test/test_client.py

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from bson.son import SON
3636
from bson.tz_util import utc
3737
import pymongo
38-
from pymongo import message
38+
from pymongo import message, monitoring
3939
from pymongo.common import CONNECT_TIMEOUT, _UUID_REPRESENTATIONS
4040
from pymongo.command_cursor import CommandCursor
4141
from pymongo.compression_support import _HAVE_SNAPPY, _HAVE_ZSTD
@@ -57,7 +57,7 @@
5757
from pymongo.pool import SocketInfo, _METADATA
5858
from pymongo.read_preferences import ReadPreference
5959
from pymongo.server_description import ServerDescription
60-
from pymongo.server_selectors import (any_server_selector,
60+
from pymongo.server_selectors import (readable_server_selector,
6161
writable_server_selector)
6262
from pymongo.server_type import SERVER_TYPE
6363
from pymongo.settings import TOPOLOGY_TYPE
@@ -77,6 +77,7 @@
7777
from test.pymongo_mocks import MockClient
7878
from test.utils import (assertRaisesExactly,
7979
connected,
80+
CMAPListener,
8081
delay,
8182
FunctionCallRecorder,
8283
get_pool,
@@ -448,21 +449,25 @@ def test_uri_security_options(self):
448449

449450
class TestClient(IntegrationTest):
450451

451-
def test_max_idle_time_reaper(self):
452+
def test_max_idle_time_reaper_default(self):
452453
with client_knobs(kill_cursor_frequency=0.1):
453454
# Assert reaper doesn't remove sockets when maxIdleTimeMS not set
454455
client = rs_or_single_client()
455-
server = client._get_topology().select_server(any_server_selector)
456+
server = client._get_topology().select_server(
457+
readable_server_selector)
456458
with server._pool.get_socket({}) as sock_info:
457459
pass
458460
self.assertEqual(1, len(server._pool.sockets))
459461
self.assertTrue(sock_info in server._pool.sockets)
460462
client.close()
461463

464+
def test_max_idle_time_reaper_removes_stale_minPoolSize(self):
465+
with client_knobs(kill_cursor_frequency=0.1):
462466
# Assert reaper removes idle socket and replaces it with a new one
463467
client = rs_or_single_client(maxIdleTimeMS=500,
464468
minPoolSize=1)
465-
server = client._get_topology().select_server(any_server_selector)
469+
server = client._get_topology().select_server(
470+
readable_server_selector)
466471
with server._pool.get_socket({}) as sock_info:
467472
pass
468473
# When the reaper runs at the same time as the get_socket, two
@@ -474,11 +479,14 @@ def test_max_idle_time_reaper(self):
474479
"replace stale socket")
475480
client.close()
476481

482+
def test_max_idle_time_reaper_does_not_exceed_maxPoolSize(self):
483+
with client_knobs(kill_cursor_frequency=0.1):
477484
# Assert reaper respects maxPoolSize when adding new sockets.
478485
client = rs_or_single_client(maxIdleTimeMS=500,
479486
minPoolSize=1,
480487
maxPoolSize=1)
481-
server = client._get_topology().select_server(any_server_selector)
488+
server = client._get_topology().select_server(
489+
readable_server_selector)
482490
with server._pool.get_socket({}) as sock_info:
483491
pass
484492
# When the reaper runs at the same time as the get_socket,
@@ -490,9 +498,12 @@ def test_max_idle_time_reaper(self):
490498
"replace stale socket")
491499
client.close()
492500

501+
def test_max_idle_time_reaper_removes_stale(self):
502+
with client_knobs(kill_cursor_frequency=0.1):
493503
# Assert reaper has removed idle socket and NOT replaced it
494504
client = rs_or_single_client(maxIdleTimeMS=500)
495-
server = client._get_topology().select_server(any_server_selector)
505+
server = client._get_topology().select_server(
506+
readable_server_selector)
496507
with server._pool.get_socket({}) as sock_info_one:
497508
pass
498509
# Assert that the pool does not close sockets prematurely.
@@ -508,12 +519,14 @@ def test_max_idle_time_reaper(self):
508519
def test_min_pool_size(self):
509520
with client_knobs(kill_cursor_frequency=.1):
510521
client = rs_or_single_client()
511-
server = client._get_topology().select_server(any_server_selector)
522+
server = client._get_topology().select_server(
523+
readable_server_selector)
512524
self.assertEqual(0, len(server._pool.sockets))
513525

514526
# Assert that pool started up at minPoolSize
515527
client = rs_or_single_client(minPoolSize=10)
516-
server = client._get_topology().select_server(any_server_selector)
528+
server = client._get_topology().select_server(
529+
readable_server_selector)
517530
wait_until(lambda: 10 == len(server._pool.sockets),
518531
"pool initialized with 10 sockets")
519532

@@ -528,7 +541,8 @@ def test_max_idle_time_checkout(self):
528541
# Use high frequency to test _get_socket_no_auth.
529542
with client_knobs(kill_cursor_frequency=99999999):
530543
client = rs_or_single_client(maxIdleTimeMS=500)
531-
server = client._get_topology().select_server(any_server_selector)
544+
server = client._get_topology().select_server(
545+
readable_server_selector)
532546
with server._pool.get_socket({}) as sock_info:
533547
pass
534548
self.assertEqual(1, len(server._pool.sockets))
@@ -542,7 +556,8 @@ def test_max_idle_time_checkout(self):
542556

543557
# Test that sockets are reused if maxIdleTimeMS is not set.
544558
client = rs_or_single_client()
545-
server = client._get_topology().select_server(any_server_selector)
559+
server = client._get_topology().select_server(
560+
readable_server_selector)
546561
with server._pool.get_socket({}) as sock_info:
547562
pass
548563
self.assertEqual(1, len(server._pool.sockets))
@@ -1944,5 +1959,61 @@ def timeout_task():
19441959
self.assertIsNone(ct.get())
19451960

19461961

1962+
class TestClientPool(MockClientTest):
1963+
1964+
def test_rs_client_does_not_maintain_pool_to_arbiters(self):
1965+
listener = CMAPListener()
1966+
c = MockClient(
1967+
standalones=[],
1968+
members=['a:1', 'b:2', 'c:3', 'd:4'],
1969+
mongoses=[],
1970+
arbiters=['c:3'], # c:3 is an arbiter.
1971+
down_hosts=['d:4'], # d:4 is unreachable.
1972+
host=['a:1', 'b:2', 'c:3', 'd:4'],
1973+
replicaSet='rs',
1974+
minPoolSize=1, # minPoolSize
1975+
event_listeners=[listener],
1976+
)
1977+
self.addCleanup(c.close)
1978+
1979+
wait_until(lambda: len(c.nodes) == 3, 'connect')
1980+
self.assertEqual(c.address, ('a', 1))
1981+
self.assertEqual(c.arbiters, set([('c', 3)]))
1982+
# Assert that we create 2 and only 2 pooled connections.
1983+
listener.wait_for_event(monitoring.ConnectionReadyEvent, 2)
1984+
self.assertEqual(
1985+
listener.event_count(monitoring.ConnectionCreatedEvent), 2)
1986+
# Assert that we do not create connections to arbiters.
1987+
arbiter = c._topology.get_server_by_address(('c', 3))
1988+
self.assertFalse(arbiter.pool.sockets)
1989+
# Assert that we do not create connections to unknown servers.
1990+
arbiter = c._topology.get_server_by_address(('d', 4))
1991+
self.assertFalse(arbiter.pool.sockets)
1992+
1993+
def test_direct_client_maintains_pool_to_arbiter(self):
1994+
listener = CMAPListener()
1995+
c = MockClient(
1996+
standalones=[],
1997+
members=['a:1', 'b:2', 'c:3'],
1998+
mongoses=[],
1999+
arbiters=['c:3'], # c:3 is an arbiter.
2000+
host='c:3',
2001+
directConnection=True,
2002+
minPoolSize=1, # minPoolSize
2003+
event_listeners=[listener],
2004+
)
2005+
self.addCleanup(c.close)
2006+
2007+
print(c.topology_description)
2008+
wait_until(lambda: len(c.nodes) == 1, 'connect')
2009+
self.assertEqual(c.address, ('c', 3))
2010+
# Assert that we create 1 pooled connection.
2011+
listener.wait_for_event(monitoring.ConnectionReadyEvent, 1)
2012+
self.assertEqual(
2013+
listener.event_count(monitoring.ConnectionCreatedEvent), 1)
2014+
arbiter = c._topology.get_server_by_address(('c', 3))
2015+
self.assertEqual(len(arbiter.pool.sockets), 1)
2016+
2017+
19472018
if __name__ == "__main__":
19482019
unittest.main()

test/test_cmap.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
PoolClosedEvent)
4242
from pymongo.read_preferences import ReadPreference
4343
from pymongo.pool import _PoolClosedError, PoolState
44+
from pymongo.topology_description import updated_topology_description
4445

4546
from test import (client_knobs,
4647
IntegrationTest,
@@ -226,12 +227,23 @@ def run_scenario(self, scenario_def, test):
226227
opts = test['poolOptions'].copy()
227228
opts['event_listeners'] = [self.listener]
228229
opts['_monitor_class'] = DummyMonitor
230+
opts['connect'] = False
229231
with client_knobs(kill_cursor_frequency=.05,
230232
min_heartbeat_interval=.05):
231233
client = single_client(**opts)
234+
# Update the SD to a known type because the DummyMonitor will not.
235+
# Note we cannot simply call topology.on_change because that would
236+
# internally call pool.ready() which introduces unexpected
237+
# PoolReadyEvents. Instead, update the initial state before
238+
# opening the Topology.
239+
td = client_context.client._topology.description
240+
sd = td.server_descriptions()[(client_context.host,
241+
client_context.port)]
242+
client._topology._description = updated_topology_description(
243+
client._topology._description, sd)
244+
client._get_topology()
232245
self.addCleanup(client.close)
233-
# self.pool = get_pools(client)[0]
234-
self.pool = list(client._get_topology()._servers.values())[0].pool
246+
self.pool = list(client._topology._servers.values())[0].pool
235247

236248
# Map of target names to Thread objects.
237249
self.targets = dict()

0 commit comments

Comments
 (0)