Skip to content

Commit 973167d

Browse files
committed
Revert "Pin transactions to a single server address"
This reverts commit 25bc085.
1 parent 7f2dc73 commit 973167d

File tree

6 files changed

+42
-72
lines changed

6 files changed

+42
-72
lines changed

pymongo/bulk.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ def execute(self, write_concern, session):
427427

428428
client = self.collection.database.client
429429
if not write_concern.acknowledged:
430-
with client._socket_for_writes(session) as sock_info:
430+
with client._socket_for_writes() as sock_info:
431431
self.execute_no_results(sock_info, generator)
432432
else:
433433
return self.execute_command(generator, write_concern, session)

pymongo/client_session.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -319,13 +319,6 @@ def in_transaction(self):
319319
"""True if this session has an active multi-statement transaction."""
320320
return self._transaction is not None
321321

322-
def _pin_server_address(self, address):
323-
assert self._transaction.address is None, "Transaction already pinned"
324-
self._transaction.address = address
325-
326-
def _pinned_server_address(self):
327-
return self._transaction.address
328-
329322
def _apply_to(self, command, is_retryable, read_preference):
330323
self._check_ended()
331324

pymongo/collection.py

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -184,16 +184,14 @@ def __init__(self, database, name, create=False, codec_options=None,
184184
unicode_decode_error_handler='replace',
185185
document_class=dict)
186186

187-
def _socket_for_reads(self, session):
188-
return self.__database.client._socket_for_reads(
189-
self.read_preference, session)
187+
def _socket_for_reads(self):
188+
return self.__database.client._socket_for_reads(self.read_preference)
190189

191-
def _socket_for_primary_reads(self, session):
192-
return self.__database.client._socket_for_reads(
193-
ReadPreference.PRIMARY, session)
190+
def _socket_for_primary_reads(self):
191+
return self.__database.client._socket_for_reads(ReadPreference.PRIMARY)
194192

195-
def _socket_for_writes(self, session):
196-
return self.__database.client._socket_for_writes(session)
193+
def _socket_for_writes(self):
194+
return self.__database.client._socket_for_writes()
197195

198196
def _command(self, sock_info, command, slave_ok=False,
199197
read_preference=None,
@@ -254,7 +252,7 @@ def __create(self, options, collation, session):
254252
if "size" in options:
255253
options["size"] = float(options["size"])
256254
cmd.update(options)
257-
with self._socket_for_writes(session) as sock_info:
255+
with self._socket_for_writes() as sock_info:
258256
self._command(
259257
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
260258
write_concern=self.write_concern,
@@ -581,7 +579,7 @@ def _insert_command(session, sock_info, retryable_write):
581579
True, _insert_command, session)
582580
_check_write_command_response(result)
583581
else:
584-
with self._socket_for_writes(session=None) as sock_info:
582+
with self._socket_for_writes() as sock_info:
585583
# Legacy OP_INSERT.
586584
self._legacy_write(
587585
sock_info, 'insert', command, op_id,
@@ -1487,7 +1485,7 @@ def parallel_scan(self, num_cursors, session=None, **kwargs):
14871485
('numCursors', num_cursors)])
14881486
cmd.update(kwargs)
14891487

1490-
with self._socket_for_reads(session) as (sock_info, slave_ok):
1488+
with self._socket_for_reads() as (sock_info, slave_ok):
14911489
result = self._command(sock_info, cmd, slave_ok,
14921490
read_concern=self.read_concern,
14931491
session=session)
@@ -1503,7 +1501,7 @@ def parallel_scan(self, num_cursors, session=None, **kwargs):
15031501

15041502
def _count(self, cmd, collation=None, session=None):
15051503
"""Internal count helper."""
1506-
with self._socket_for_reads(session) as (sock_info, slave_ok):
1504+
with self._socket_for_reads() as (sock_info, slave_ok):
15071505
res = self._command(
15081506
sock_info, cmd, slave_ok,
15091507
allowable_errors=["ns missing"],
@@ -1600,7 +1598,7 @@ def create_indexes(self, indexes, session=None, **kwargs):
16001598
"""
16011599
common.validate_list('indexes', indexes)
16021600
names = []
1603-
with self._socket_for_writes(session) as sock_info:
1601+
with self._socket_for_writes() as sock_info:
16041602
supports_collations = sock_info.max_wire_version >= 5
16051603
def gen_indexes():
16061604
for index in indexes:
@@ -1641,7 +1639,7 @@ def __create_index(self, keys, index_options, session, **kwargs):
16411639
index_options.pop('collation', None))
16421640
index.update(index_options)
16431641

1644-
with self._socket_for_writes(session) as sock_info:
1642+
with self._socket_for_writes() as sock_info:
16451643
if collation is not None:
16461644
if sock_info.max_wire_version < 5:
16471645
raise ConfigurationError(
@@ -1868,7 +1866,7 @@ def drop_index(self, index_or_name, session=None, **kwargs):
18681866
self.__database.name, self.__name, name)
18691867
cmd = SON([("dropIndexes", self.__name), ("index", name)])
18701868
cmd.update(kwargs)
1871-
with self._socket_for_writes(session) as sock_info:
1869+
with self._socket_for_writes() as sock_info:
18721870
self._command(sock_info,
18731871
cmd,
18741872
read_preference=ReadPreference.PRIMARY,
@@ -1905,7 +1903,7 @@ def reindex(self, session=None, **kwargs):
19051903
"""
19061904
cmd = SON([("reIndex", self.__name)])
19071905
cmd.update(kwargs)
1908-
with self._socket_for_writes(session) as sock_info:
1906+
with self._socket_for_writes() as sock_info:
19091907
return self._command(
19101908
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
19111909
parse_write_concern_error=True, session=session)
@@ -1934,7 +1932,7 @@ def list_indexes(self, session=None):
19341932
codec_options = CodecOptions(SON)
19351933
coll = self.with_options(codec_options=codec_options,
19361934
read_preference=ReadPreference.PRIMARY)
1937-
with self._socket_for_primary_reads(session) as (sock_info, slave_ok):
1935+
with self._socket_for_primary_reads() as (sock_info, slave_ok):
19381936
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
19391937
if sock_info.max_wire_version > 2:
19401938
with self.__database.client._tmp_session(session, False) as s:
@@ -2055,7 +2053,7 @@ def _aggregate(self, pipeline, cursor_class, first_batch_size, session,
20552053
"batchSize", kwargs.pop("batchSize", None))
20562054
# If the server does not support the "cursor" option we
20572055
# ignore useCursor and batchSize.
2058-
with self._socket_for_reads(session) as (sock_info, slave_ok):
2056+
with self._socket_for_reads() as (sock_info, slave_ok):
20592057
dollar_out = pipeline and '$out' in pipeline[-1]
20602058
if use_cursor:
20612059
if "cursor" not in kwargs:
@@ -2344,7 +2342,7 @@ def group(self, key, condition, initial, reduce, finalize=None, **kwargs):
23442342
collation = validate_collation_or_none(kwargs.pop('collation', None))
23452343
cmd.update(kwargs)
23462344

2347-
with self._socket_for_reads(session=None) as (sock_info, slave_ok):
2345+
with self._socket_for_reads() as (sock_info, slave_ok):
23482346
return self._command(sock_info, cmd, slave_ok,
23492347
collation=collation)["retval"]
23502348

@@ -2390,7 +2388,7 @@ def rename(self, new_name, session=None, **kwargs):
23902388

23912389
new_name = "%s.%s" % (self.__database.name, new_name)
23922390
cmd = SON([("renameCollection", self.__full_name), ("to", new_name)])
2393-
with self._socket_for_writes(session) as sock_info:
2391+
with self._socket_for_writes() as sock_info:
23942392
with self.__database.client._tmp_session(session) as s:
23952393
if sock_info.max_wire_version >= 5 and self.write_concern:
23962394
cmd['writeConcern'] = self.write_concern.document
@@ -2445,7 +2443,7 @@ def distinct(self, key, filter=None, session=None, **kwargs):
24452443
kwargs["query"] = filter
24462444
collation = validate_collation_or_none(kwargs.pop('collation', None))
24472445
cmd.update(kwargs)
2448-
with self._socket_for_reads(session) as (sock_info, slave_ok):
2446+
with self._socket_for_reads() as (sock_info, slave_ok):
24492447
return self._command(sock_info, cmd, slave_ok,
24502448
read_concern=self.read_concern,
24512449
collation=collation, session=session)["values"]
@@ -2517,7 +2515,7 @@ def map_reduce(self, map, reduce, out, full_response=False, session=None,
25172515
cmd.update(kwargs)
25182516

25192517
inline = 'inline' in cmd['out']
2520-
with self._socket_for_primary_reads(session) as (sock_info, slave_ok):
2518+
with self._socket_for_primary_reads() as (sock_info, slave_ok):
25212519
if (sock_info.max_wire_version >= 5 and self.write_concern and
25222520
not inline):
25232521
cmd['writeConcern'] = self.write_concern.document
@@ -2586,7 +2584,7 @@ def inline_map_reduce(self, map, reduce, full_response=False, session=None,
25862584
("out", {"inline": 1})])
25872585
collation = validate_collation_or_none(kwargs.pop('collation', None))
25882586
cmd.update(kwargs)
2589-
with self._socket_for_reads(session) as (sock_info, slave_ok):
2587+
with self._socket_for_reads() as (sock_info, slave_ok):
25902588
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
25912589
res = self._command(sock_info, cmd, slave_ok,
25922590
read_concern=self.read_concern,

pymongo/database.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -526,8 +526,7 @@ def command(self, command, value=1, check=True,
526526
.. mongodoc:: commands
527527
"""
528528
client = self.__client
529-
with client._socket_for_reads(
530-
read_preference, session) as (sock_info, slave_ok):
529+
with client._socket_for_reads(read_preference) as (sock_info, slave_ok):
531530
return self._command(sock_info, command, slave_ok, value,
532531
check, allowable_errors, read_preference,
533532
codec_options, session=session, **kwargs)
@@ -585,7 +584,7 @@ def list_collections(self, session=None, **kwargs):
585584
.. versionadded:: 3.6
586585
"""
587586
with self.__client._socket_for_reads(
588-
ReadPreference.PRIMARY, session) as (sock_info, slave_okay):
587+
ReadPreference.PRIMARY) as (sock_info, slave_okay):
589588
return self._list_collections(
590589
sock_info, slave_okay, session=session, **kwargs)
591590

@@ -650,7 +649,7 @@ def drop_collection(self, name_or_collection, session=None):
650649
self.__client._purge_index(self.__name, name)
651650

652651
with self.__client._socket_for_reads(
653-
ReadPreference.PRIMARY, session) as (sock_info, slave_ok):
652+
ReadPreference.PRIMARY) as (sock_info, slave_ok):
654653
return self._command(
655654
sock_info, 'drop', slave_ok, _unicode(name),
656655
allowable_errors=['ns not found'],
@@ -731,7 +730,7 @@ def current_op(self, include_all=False, session=None):
731730
Added ``session`` parameter.
732731
"""
733732
cmd = SON([("currentOp", 1), ("$all", include_all)])
734-
with self.__client._socket_for_writes(session) as sock_info:
733+
with self.__client._socket_for_writes() as sock_info:
735734
if sock_info.max_wire_version >= 4:
736735
with self.__client._tmp_session(session) as s:
737736
return sock_info.command("admin", cmd, session=s,

pymongo/mongo_client.py

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -868,8 +868,7 @@ def _end_sessions(self, session_ids):
868868
# Use SocketInfo.command directly to avoid implicitly creating
869869
# another session.
870870
with self._socket_for_reads(
871-
ReadPreference.PRIMARY_PREFERRED,
872-
None) as (sock_info, slave_ok):
871+
ReadPreference.PRIMARY_PREFERRED) as (sock_info, slave_ok):
873872
if not sock_info.supports_sessions:
874873
return
875874

@@ -965,31 +964,13 @@ def _get_socket(self, server):
965964
self.__reset_server(server.description.address)
966965
raise
967966

968-
def _select_server(self, read_preference, session):
969-
topology = self._get_topology()
970-
if session and session.in_transaction:
971-
address = session._pinned_server_address()
972-
if address:
973-
server = topology.select_server_by_address(address)
974-
if not server:
975-
raise AutoReconnect(
976-
'Pinned server %s:%d for transaction no longer'
977-
'available' % address)
978-
return server
979-
980-
server = topology.select_server(read_preference)
981-
session._pin_server_address(server.description.address)
982-
return server
983-
else:
984-
return topology.select_server(read_preference)
985-
986-
def _socket_for_writes(self, session):
987-
return self._get_socket(self._select_server(
988-
ReadPreference.PRIMARY, session))
967+
def _socket_for_writes(self):
968+
server = self._get_topology().select_server(writable_server_selector)
969+
return self._get_socket(server)
989970

990971
@contextlib.contextmanager
991-
def _socket_for_reads(self, read_preference, session):
992-
assert read_preference is not None, "read_preference must not be None"
972+
def _socket_for_reads(self, read_preference):
973+
preference = read_preference or ReadPreference.PRIMARY
993974
# Get a socket for a server matching the read preference, and yield
994975
# sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to
995976
# mongods with topology type Single. If the server type is Mongos,
@@ -998,11 +979,10 @@ def _socket_for_reads(self, read_preference, session):
998979
# Thread safe: if the type is single it cannot change.
999980
topology = self._get_topology()
1000981
single = topology.description.topology_type == TOPOLOGY_TYPE.Single
1001-
server = self._select_server(read_preference, session)
1002-
982+
server = topology.select_server(read_preference)
1003983
with self._get_socket(server) as sock_info:
1004984
slave_ok = (single and not sock_info.is_mongos) or (
1005-
read_preference != ReadPreference.PRIMARY)
985+
preference != ReadPreference.PRIMARY)
1006986
yield sock_info, slave_ok
1007987

1008988
def _send_message_with_response(self, operation, read_preference=None,
@@ -1022,14 +1002,14 @@ def _send_message_with_response(self, operation, read_preference=None,
10221002
self._kill_cursors_executor.open()
10231003

10241004
topology = self._get_topology()
1025-
session = operation.session
10261005
if address:
10271006
server = topology.select_server_by_address(address)
10281007
if not server:
10291008
raise AutoReconnect('server %s:%d no longer available'
10301009
% address)
10311010
else:
1032-
server = self._select_server(read_preference, session)
1011+
selector = read_preference or writable_server_selector
1012+
server = topology.select_server(selector)
10331013

10341014
# A _Query's slaveOk bit is already set for queries with non-primary
10351015
# read preference. If this is a direct connection to a mongod, override
@@ -1082,7 +1062,8 @@ def is_retrying():
10821062
return bulk.retrying if bulk else retrying
10831063
while True:
10841064
try:
1085-
server = self._select_server(ReadPreference.PRIMARY, session)
1065+
server = self._get_topology().select_server(
1066+
writable_server_selector)
10861067
supports_session = (
10871068
session is not None and
10881069
server.description.retryable_writes_supported)
@@ -1556,7 +1537,7 @@ def drop_database(self, name_or_database, session=None):
15561537

15571538
self._purge_index(name)
15581539
with self._socket_for_reads(
1559-
ReadPreference.PRIMARY, None) as (sock_info, slave_ok):
1540+
ReadPreference.PRIMARY) as (sock_info, slave_ok):
15601541
self[name]._command(
15611542
sock_info,
15621543
"dropDatabase",
@@ -1698,7 +1679,7 @@ def unlock(self, session=None):
16981679
Added ``session`` parameter.
16991680
"""
17001681
cmd = SON([("fsyncUnlock", 1)])
1701-
with self._socket_for_writes(session=None) as sock_info:
1682+
with self._socket_for_writes() as sock_info:
17021683
if sock_info.max_wire_version >= 4:
17031684
try:
17041685
with self._tmp_session(session) as s:

test/test_read_preferences.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,9 +315,8 @@ def __init__(self, *args, **kwargs):
315315
super(ReadPrefTester, self).__init__(*args, **client_options)
316316

317317
@contextlib.contextmanager
318-
def _socket_for_reads(self, read_preference, session):
319-
context = super(ReadPrefTester, self)._socket_for_reads(
320-
read_preference, session)
318+
def _socket_for_reads(self, read_preference):
319+
context = super(ReadPrefTester, self)._socket_for_reads(read_preference)
321320
with context as (sock_info, slave_ok):
322321
self.record_a_read(sock_info.address)
323322
yield sock_info, slave_ok

0 commit comments

Comments
 (0)