Skip to content

Commit e7ddc29

Browse files
committed
PYTHON-1781 Raise a client side error when attempting a sharded transaction
1 parent ea62ce5 commit e7ddc29

File tree

8 files changed

+56
-7
lines changed

8 files changed

+56
-7
lines changed

doc/changelog.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ Changes in Version 3.8.0.dev0
3333
the :class:`~bson.codec_options.TypeCodec` and
3434
:class:`~bson.codec_options.TypeRegistry` APIs. For more information, see
3535
the :doc:`custom type example <examples/custom_type>`.
36-
36+
- Attempting a multi-document transaction on a sharded cluster now raises a
37+
:exc:`~pymongo.errors.ConfigurationError`.
3738

3839
Issues Resolved
3940
...............

pymongo/bulk.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,8 @@ def _execute_command(self, generator, write_concern, session,
281281
if retryable and not self.started_retryable_write:
282282
session._start_retryable_write()
283283
self.started_retryable_write = True
284-
session._apply_to(cmd, retryable, ReadPreference.PRIMARY)
284+
session._apply_to(cmd, retryable, ReadPreference.PRIMARY,
285+
sock_info)
285286
sock_info.send_cluster_time(cmd, session, client)
286287
check_keys = run.op_type == _INSERT
287288
ops = islice(run.ops, run.idx_offset, None)

pymongo/client_session.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
from bson.py3compat import abc, reraise_instance
9191
from bson.timestamp import Timestamp
9292

93-
from pymongo import monotonic
93+
from pymongo import monotonic, __version__
9494
from pymongo.errors import (ConfigurationError,
9595
ConnectionFailure,
9696
InvalidOperation,
@@ -263,6 +263,10 @@ def _reraise_with_unknown_commit(exc):
263263
64, # WriteConcernFailed
264264
])
265265

266+
_MONGOS_NOT_SUPPORTED_MSG = (
267+
'PyMongo %s does not support running multi-document transactions on '
268+
'sharded clusters') % (__version__,)
269+
266270

267271
class ClientSession(object):
268272
"""A session for ordering sequential operations."""
@@ -356,6 +360,9 @@ def start_transaction(self, read_concern=None, write_concern=None,
356360
"""
357361
self._check_ended()
358362

363+
if self._client._is_mongos_non_blocking():
364+
raise ConfigurationError(_MONGOS_NOT_SUPPORTED_MSG)
365+
359366
if self._in_transaction:
360367
raise InvalidOperation("Transaction already in progress")
361368

@@ -534,7 +541,7 @@ def _txn_read_preference(self):
534541
return self._transaction.opts.read_preference
535542
return None
536543

537-
def _apply_to(self, command, is_retryable, read_preference):
544+
def _apply_to(self, command, is_retryable, read_preference, sock_info):
538545
self._check_ended()
539546

540547
self._server_session.last_use = monotonic.time()
@@ -548,6 +555,9 @@ def _apply_to(self, command, is_retryable, read_preference):
548555
return
549556

550557
if self._in_transaction:
558+
if sock_info.is_mongos:
559+
raise ConfigurationError(_MONGOS_NOT_SUPPORTED_MSG)
560+
551561
if read_preference != ReadPreference.PRIMARY:
552562
raise InvalidOperation(
553563
'read preference in a transaction must be primary, not: '

pymongo/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ def as_command(self, sock_info):
289289
cmd = SON([('explain', cmd)])
290290
session = self.session
291291
if session:
292-
session._apply_to(cmd, False, self.read_preference)
292+
session._apply_to(cmd, False, self.read_preference, sock_info)
293293
# Explain does not support readConcern.
294294
if (not explain and session.options.causal_consistency
295295
and session.operation_time is not None
@@ -379,7 +379,7 @@ def as_command(self, sock_info):
379379
self.max_await_time_ms)
380380

381381
if self.session:
382-
self.session._apply_to(cmd, False, self.read_preference)
382+
self.session._apply_to(cmd, False, self.read_preference, sock_info)
383383
sock_info.send_cluster_time(cmd, self.session, self.client)
384384
self._as_command = cmd, self.db
385385
return self._as_command

pymongo/mongo_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1530,6 +1530,9 @@ def _process_periodic_tasks(self):
15301530
except Exception:
15311531
helpers._handle_exception()
15321532

1533+
def _is_mongos_non_blocking(self):
1534+
return self._topology.is_mongos_non_blocking()
1535+
15331536
def __start_session(self, implicit, **kwargs):
15341537
# Driver Sessions Spec: "If startSession is called when multiple users
15351538
# are authenticated drivers MUST raise an error with the error message

pymongo/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ def command(self, dbname, spec, slave_ok=False,
560560
'Must be connected to MongoDB 3.4+ to use a collation.')
561561

562562
if session:
563-
session._apply_to(spec, retryable_write, read_preference)
563+
session._apply_to(spec, retryable_write, read_preference, self)
564564
self.send_cluster_time(spec, session, client)
565565
listeners = self.listeners if publish_events else None
566566
unacknowledged = write_concern and not write_concern.acknowledged

pymongo/topology.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from pymongo import periodic_executor
3131
from pymongo.pool import PoolOptions
3232
from pymongo.topology_description import (updated_topology_description,
33+
SERVER_TYPE,
3334
TOPOLOGY_TYPE,
3435
TopologyDescription)
3536
from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError
@@ -451,6 +452,22 @@ def return_server_session(self, server_session, lock):
451452
# Called from a __del__ method, can't use a lock.
452453
self._session_pool.return_server_session_no_lock(server_session)
453454

455+
def is_mongos_non_blocking(self):
456+
"""Return if we are connected to a Mongos without blocking.
457+
458+
If the state is unknown, return False.
459+
"""
460+
with self._lock:
461+
if not self._opened:
462+
return False
463+
if self._description.topology_type == TOPOLOGY_TYPE.Sharded:
464+
return True
465+
server_descriptions = self._description.apply_selector(
466+
writable_server_selector, None)
467+
if not server_descriptions:
468+
return False
469+
return server_descriptions[0].server_type == SERVER_TYPE.Mongos
470+
454471
def _new_selection(self):
455472
"""A Selection object, initially including all known servers.
456473

test/test_transactions.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,23 @@ def set_fail_point(self, command_args):
8686
cmd.update(command_args)
8787
self.client.admin.command(cmd)
8888

89+
@client_context.require_mongos
90+
@client_context.require_version_min(4, 0)
91+
def test_transactions_not_supported(self):
92+
with self.client.start_session() as s:
93+
with self.assertRaisesRegex(
94+
ConfigurationError,
95+
'does not support running multi-document transactions on '
96+
'sharded clusters'):
97+
s.start_transaction()
98+
self.client.close()
99+
with s.start_transaction():
100+
with self.assertRaisesRegex(
101+
ConfigurationError,
102+
'does not support running multi-document transactions '
103+
'on sharded clusters'):
104+
self.client.test.test.insert_one({}, session=s)
105+
89106
@client_context.require_transactions
90107
def test_transaction_options_validation(self):
91108
default_options = TransactionOptions()

0 commit comments

Comments
 (0)