Skip to content

Commit 01b28de

Browse files
committed
Test changing transaction readPreference
Add original readPreference to getMores to simplify code check.
1 parent 3276cf8 commit 01b28de

File tree

11 files changed

+48
-20
lines changed

11 files changed

+48
-20
lines changed

pymongo/bulk.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
_do_batched_write_command,
3838
_randint,
3939
_BulkWriteContext)
40+
from pymongo.read_preferences import ReadPreference
4041
from pymongo.write_concern import WriteConcern
4142

4243

@@ -266,7 +267,7 @@ def _execute_command(self, generator, write_concern, session,
266267
results = []
267268
while run.idx_offset < len(run.ops):
268269
if session:
269-
session._apply_to(cmd, retryable)
270+
session._apply_to(cmd, retryable, ReadPreference.PRIMARY)
270271
sock_info.send_cluster_time(cmd, session, client)
271272
check_keys = run.op_type == _INSERT
272273
ops = islice(run.ops, run.idx_offset, None)

pymongo/client_session.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def __init__(self, client, server_session, options, authset):
117117
self._authset = authset
118118
self._cluster_time = None
119119
self._operation_time = None
120+
self._current_txn_read_pref = None
120121
if self.options.auto_start_transaction:
121122
# TODO: Get transaction options from self.options.
122123
self._current_transaction_opts = TransactionOptions()
@@ -234,6 +235,7 @@ def _finish_transaction(self, command_name):
234235
stmtId=self._server_session.statement_id,
235236
session=self,
236237
write_concern=self._current_transaction_opts.write_concern,
238+
read_preference=self._current_txn_read_pref,
237239
parse_write_concern_error=True)
238240
finally:
239241
self._server_session.reset_transaction()
@@ -293,7 +295,7 @@ def in_transaction(self):
293295
"""True if this session has an active multi-statement transaction."""
294296
return self._current_transaction_opts is not None
295297

296-
def _apply_to(self, command, is_retryable):
298+
def _apply_to(self, command, is_retryable, read_preference):
297299
self._check_ended()
298300

299301
if self.options.auto_start_transaction and not self.in_transaction:
@@ -310,9 +312,12 @@ def _apply_to(self, command, is_retryable):
310312
if self._current_transaction_opts:
311313
if self._server_session.statement_id == 0:
312314
# First statement begins a new transaction.
315+
self._current_txn_read_pref = read_preference
313316
self._server_session._transaction_id += 1
314317
command['readConcern'] = {'level': 'snapshot'}
315318
command['autocommit'] = False
319+
elif read_preference != self._current_txn_read_pref:
320+
raise InvalidOperation('Transaction readPreference changed')
316321

317322
command['txnNumber'] = self._server_session.transaction_id
318323

pymongo/collection.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1930,7 +1930,8 @@ def list_indexes(self, session=None):
19301930
.. versionadded:: 3.0
19311931
"""
19321932
codec_options = CodecOptions(SON)
1933-
coll = self.with_options(codec_options)
1933+
coll = self.with_options(codec_options=codec_options,
1934+
read_preference=ReadPreference.PRIMARY)
19341935
with self._socket_for_primary_reads() as (sock_info, slave_ok):
19351936
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
19361937
if sock_info.max_wire_version > 2:

pymongo/command_cursor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ def _refresh(self):
229229
self.__batch_size,
230230
self.__id,
231231
self.__collection.codec_options,
232+
self.__collection.read_preference,
232233
self.__session,
233234
self.__collection.database.client,
234235
self.__max_await_time_ms))

pymongo/cursor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,6 +1100,7 @@ def _refresh(self):
11001100
limit,
11011101
self.__id,
11021102
self.__codec_options,
1103+
self.__read_preference,
11031104
self.__session,
11041105
self.__collection.database.client,
11051106
self.__max_await_time_ms)

pymongo/database.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,8 @@ def command(self, command, value=1, check=True,
534534
def _list_collections(self, sock_info, slave_okay, session=None, **kwargs):
535535
"""Internal listCollections helper."""
536536

537-
coll = self["$cmd"]
537+
coll = self.get_collection(
538+
"$cmd", read_preference=ReadPreference.PRIMARY)
538539
if sock_info.max_wire_version > 2:
539540
cmd = SON([("listCollections", 1),
540541
("cursor", {})])

pymongo/message.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ def as_command(self, sock_info):
283283
cmd = SON([('explain', cmd)])
284284
session = self.session
285285
if session:
286-
session._apply_to(cmd, False)
286+
session._apply_to(cmd, False, self.read_preference)
287287
# Explain does not support readConcern.
288288
if (not explain and session.options.causal_consistency
289289
and session.operation_time is not None
@@ -333,17 +333,19 @@ class _GetMore(object):
333333
"""A getmore operation."""
334334

335335
__slots__ = ('db', 'coll', 'ntoreturn', 'cursor_id', 'max_await_time_ms',
336-
'codec_options', 'session', 'client', '__as_command')
336+
'codec_options', 'read_preference', 'session', 'client',
337+
'__as_command')
337338

338339
name = 'getMore'
339340

340-
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options, session,
341-
client, max_await_time_ms=None):
341+
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
342+
read_preference, session, client, max_await_time_ms=None):
342343
self.db = db
343344
self.coll = coll
344345
self.ntoreturn = ntoreturn
345346
self.cursor_id = cursor_id
346347
self.codec_options = codec_options
348+
self.read_preference = read_preference
347349
self.session = session
348350
self.client = client
349351
self.max_await_time_ms = max_await_time_ms
@@ -364,7 +366,7 @@ def as_command(self, sock_info):
364366
self.max_await_time_ms)
365367

366368
if self.session:
367-
self.session._apply_to(cmd, False)
369+
self.session._apply_to(cmd, False, self.read_preference)
368370
sock_info.send_cluster_time(cmd, self.session, self.client)
369371
self.__as_command = cmd, self.db
370372
return self.__as_command

pymongo/mongo_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,15 +1475,15 @@ def list_databases(self, session=None, **kwargs):
14751475
"""
14761476
cmd = SON([("listDatabases", 1)])
14771477
cmd.update(kwargs)
1478-
res = self._database_default_options(
1479-
"admin").command(cmd, session=session)
1478+
admin = self._database_default_options("admin")
1479+
res = admin.command(cmd, session=session)
14801480
# listDatabases doesn't return a cursor (yet). Fake one.
14811481
cursor = {
14821482
"id": 0,
14831483
"firstBatch": res["databases"],
14841484
"ns": "admin.$cmd",
14851485
}
1486-
return CommandCursor(self.admin["$cmd"], cursor, None)
1486+
return CommandCursor(admin["$cmd"], cursor, None)
14871487

14881488
def list_database_names(self, session=None):
14891489
"""Get a list of the names of all databases on the connected server.

pymongo/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ def command(self, dbname, spec, slave_ok=False,
502502
# Ensure command name remains in first place.
503503
spec = SON(spec)
504504
if session:
505-
session._apply_to(spec, retryable_write)
505+
session._apply_to(spec, retryable_write, read_preference)
506506
self.send_cluster_time(spec, session, client)
507507
listeners = self.listeners if publish_events else None
508508
try:

test/test_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,6 +1130,7 @@ def test_stale_getmore(self):
11301130
client._send_message_with_response(
11311131
operation=message._GetMore('pymongo_test', 'collection',
11321132
101, 1234, client.codec_options,
1133+
ReadPreference.PRIMARY,
11331134
None, client),
11341135
address=('not-a-member', 27017))
11351136

0 commit comments

Comments
 (0)