Skip to content

Commit 4f3b646

Browse files
committed
PYTHON-1598 Fix transaction write concern inheritance
1 parent bc5a6e0 commit 4f3b646

File tree

8 files changed

+113
-40
lines changed

8 files changed

+113
-40
lines changed

pymongo/bulk.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,7 @@ def execute(self, write_concern, session):
497497
raise InvalidOperation('Bulk operations can '
498498
'only be executed once.')
499499
self.executed = True
500-
write_concern = (WriteConcern(**write_concern) if
501-
write_concern else self.collection.write_concern)
500+
write_concern = write_concern or self.collection.write_concern
502501
session = _validate_session_write_concern(session, write_concern)
503502

504503
if self.ordered:
@@ -690,5 +689,5 @@ def execute(self, write_concern=None):
690689
execution.
691690
"""
692691
if write_concern is not None:
693-
validate_is_mapping("write_concern", write_concern)
692+
write_concern = WriteConcern(**write_concern)
694693
return self.__bulk.execute(write_concern, session=None)

pymongo/client_session.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -548,11 +548,6 @@ def _apply_to(self, command, is_retryable, read_preference):
548548
return
549549

550550
if self._in_transaction:
551-
# TODO: hack
552-
name = next(iter(command))
553-
if name not in ('commitTransaction', 'abortTransaction'):
554-
command.pop('writeConcern', None)
555-
556551
if read_preference != ReadPreference.PRIMARY:
557552
raise InvalidOperation(
558553
'read preference in a transaction must be primary, not: '

pymongo/collection.py

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ def __create(self, options, collation, session):
254254
with self._socket_for_writes() as sock_info:
255255
self._command(
256256
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
257-
write_concern=self.write_concern,
257+
write_concern=self._write_concern_for(session),
258258
collation=collation, session=session)
259259

260260
def __getattr__(self, name):
@@ -493,7 +493,8 @@ def bulk_write(self, requests, ordered=True,
493493
except AttributeError:
494494
raise TypeError("%r is not a valid request" % (request,))
495495

496-
bulk_api_result = blk.execute(self.write_concern.document, session)
496+
write_concern = self._write_concern_for(session)
497+
bulk_api_result = blk.execute(write_concern, session)
497498
if bulk_api_result is not None:
498499
return BulkWriteResult(bulk_api_result, True)
499500
return BulkWriteResult({}, False)
@@ -633,11 +634,11 @@ def gen():
633634
ids.append(doc.get('_id'))
634635
yield doc
635636

636-
concern = (write_concern or self.write_concern).document
637+
write_concern = write_concern or self._write_concern_for(session)
637638
blk = _Bulk(self, ordered, bypass_doc_val)
638639
blk.ops = [(message._INSERT, doc) for doc in gen()]
639640
try:
640-
blk.execute(concern, session=session)
641+
blk.execute(write_concern, session=session)
641642
except BulkWriteError as bwe:
642643
_raise_last_error(bwe.details)
643644
return ids
@@ -684,11 +685,13 @@ def insert_one(self, document, bypass_document_validation=False,
684685
if not (isinstance(document, RawBSONDocument) or "_id" in document):
685686
document["_id"] = ObjectId()
686687

688+
write_concern = self._write_concern_for(session)
687689
return InsertOneResult(
688690
self._insert(document,
691+
write_concern=write_concern,
689692
bypass_doc_val=bypass_document_validation,
690693
session=session),
691-
self.write_concern.acknowledged)
694+
write_concern.acknowledged)
692695

693696
def insert_many(self, documents, ordered=True,
694697
bypass_document_validation=False, session=None):
@@ -744,10 +747,11 @@ def gen():
744747
inserted_ids.append(document["_id"])
745748
yield (message._INSERT, document)
746749

750+
write_concern = self._write_concern_for(session)
747751
blk = _Bulk(self, ordered, bypass_document_validation)
748752
blk.ops = [doc for doc in gen()]
749-
blk.execute(self.write_concern.document, session=session)
750-
return InsertManyResult(inserted_ids, self.write_concern.acknowledged)
753+
blk.execute(write_concern, session=session)
754+
return InsertManyResult(inserted_ids, write_concern.acknowledged)
751755

752756
def _update(self, sock_info, criteria, document, upsert=False,
753757
check_keys=True, multi=False, manipulate=False,
@@ -912,12 +916,14 @@ def replace_one(self, filter, replacement, upsert=False,
912916
common.validate_is_mapping("filter", filter)
913917
common.validate_ok_for_replace(replacement)
914918

919+
write_concern = self._write_concern_for(session)
915920
return UpdateResult(
916921
self._update_retryable(
917922
filter, replacement, upsert,
923+
write_concern=write_concern,
918924
bypass_doc_val=bypass_document_validation,
919925
collation=collation, session=session),
920-
self.write_concern.acknowledged)
926+
write_concern.acknowledged)
921927

922928
def update_one(self, filter, update, upsert=False,
923929
bypass_document_validation=False,
@@ -979,13 +985,15 @@ def update_one(self, filter, update, upsert=False,
979985
common.validate_ok_for_update(update)
980986
common.validate_list_or_none('array_filters', array_filters)
981987

988+
write_concern = self._write_concern_for(session)
982989
return UpdateResult(
983990
self._update_retryable(
984991
filter, update, upsert, check_keys=False,
992+
write_concern=write_concern,
985993
bypass_doc_val=bypass_document_validation,
986994
collation=collation, array_filters=array_filters,
987995
session=session),
988-
self.write_concern.acknowledged)
996+
write_concern.acknowledged)
989997

990998
def update_many(self, filter, update, upsert=False, array_filters=None,
991999
bypass_document_validation=False, collation=None,
@@ -1047,13 +1055,15 @@ def update_many(self, filter, update, upsert=False, array_filters=None,
10471055
common.validate_ok_for_update(update)
10481056
common.validate_list_or_none('array_filters', array_filters)
10491057

1058+
write_concern = self._write_concern_for(session)
10501059
return UpdateResult(
10511060
self._update_retryable(
10521061
filter, update, upsert, check_keys=False, multi=True,
1062+
write_concern=write_concern,
10531063
bypass_doc_val=bypass_document_validation,
10541064
collation=collation, array_filters=array_filters,
10551065
session=session),
1056-
self.write_concern.acknowledged)
1066+
write_concern.acknowledged)
10571067

10581068
def drop(self, session=None):
10591069
"""Alias for :meth:`~pymongo.database.Database.drop_collection`.
@@ -1173,10 +1183,13 @@ def delete_one(self, filter, collation=None, session=None):
11731183
11741184
.. versionadded:: 3.0
11751185
"""
1186+
write_concern = self._write_concern_for(session)
11761187
return DeleteResult(
11771188
self._delete_retryable(
1178-
filter, False, collation=collation, session=session),
1179-
self.write_concern.acknowledged)
1189+
filter, False,
1190+
write_concern=write_concern,
1191+
collation=collation, session=session),
1192+
write_concern.acknowledged)
11801193

11811194
def delete_many(self, filter, collation=None, session=None):
11821195
"""Delete one or more documents matching the filter.
@@ -1208,10 +1221,13 @@ def delete_many(self, filter, collation=None, session=None):
12081221
12091222
.. versionadded:: 3.0
12101223
"""
1224+
write_concern = self._write_concern_for(session)
12111225
return DeleteResult(
12121226
self._delete_retryable(
1213-
filter, True, collation=collation, session=session),
1214-
self.write_concern.acknowledged)
1227+
filter, True,
1228+
write_concern=write_concern,
1229+
collation=collation, session=session),
1230+
write_concern.acknowledged)
12151231

12161232
def find_one(self, filter=None, *args, **kwargs):
12171233
"""Get a single document from the database.
@@ -1809,7 +1825,7 @@ def gen_indexes():
18091825
self._command(
18101826
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
18111827
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
1812-
write_concern=self.write_concern,
1828+
write_concern=self._write_concern_for(session),
18131829
session=session)
18141830
return names
18151831

@@ -1840,7 +1856,7 @@ def __create_index(self, keys, index_options, session, **kwargs):
18401856
self._command(
18411857
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
18421858
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
1843-
write_concern=self.write_concern,
1859+
write_concern=self._write_concern_for(session),
18441860
session=session)
18451861

18461862
def create_index(self, keys, session=None, **kwargs):
@@ -2059,7 +2075,7 @@ def drop_index(self, index_or_name, session=None, **kwargs):
20592075
cmd,
20602076
read_preference=ReadPreference.PRIMARY,
20612077
allowable_errors=["ns not found"],
2062-
write_concern=self.write_concern,
2078+
write_concern=self._write_concern_for(session),
20632079
session=session)
20642080

20652081
def reindex(self, session=None, **kwargs):
@@ -2268,7 +2284,7 @@ def _aggregate(self, pipeline, cursor_class, first_batch_size, session,
22682284
else:
22692285
read_concern = None
22702286
if 'writeConcern' not in cmd and dollar_out:
2271-
write_concern = self.write_concern
2287+
write_concern = self._write_concern_for(session)
22722288
else:
22732289
write_concern = None
22742290

@@ -2588,7 +2604,7 @@ def rename(self, new_name, session=None, **kwargs):
25882604
new_name = "%s.%s" % (self.__database.name, new_name)
25892605
cmd = SON([("renameCollection", self.__full_name), ("to", new_name)])
25902606
cmd.update(kwargs)
2591-
write_concern = self._write_concern_for_cmd(cmd)
2607+
write_concern = self._write_concern_for_cmd(cmd, session)
25922608

25932609
with self._socket_for_writes() as sock_info:
25942610
with self.__database.client._tmp_session(session) as s:
@@ -2724,7 +2740,7 @@ def map_reduce(self, map, reduce, out, full_response=False, session=None,
27242740
else:
27252741
read_concern = None
27262742
if 'writeConcern' not in cmd and not inline:
2727-
write_concern = self.write_concern
2743+
write_concern = self._write_concern_for(session)
27282744
else:
27292745
write_concern = None
27302746

@@ -2798,12 +2814,12 @@ def inline_map_reduce(self, map, reduce, full_response=False, session=None,
27982814
else:
27992815
return res.get("results")
28002816

2801-
def _write_concern_for_cmd(self, cmd):
2817+
def _write_concern_for_cmd(self, cmd, session):
28022818
raw_wc = cmd.get('writeConcern')
28032819
if raw_wc is not None:
28042820
return WriteConcern(**raw_wc)
28052821
else:
2806-
return self.write_concern
2822+
return self._write_concern_for(session)
28072823

28082824
def __find_and_modify(self, filter, projection, sort, upsert=None,
28092825
return_document=ReturnDocument.BEFORE,
@@ -2827,15 +2843,15 @@ def __find_and_modify(self, filter, projection, sort, upsert=None,
28272843
common.validate_boolean("upsert", upsert)
28282844
cmd["upsert"] = upsert
28292845

2830-
write_concern = self._write_concern_for_cmd(cmd)
2846+
write_concern = self._write_concern_for_cmd(cmd, session)
28312847

28322848
def _find_and_modify(session, sock_info, retryable_write):
28332849
if array_filters is not None:
28342850
if sock_info.max_wire_version < 6:
28352851
raise ConfigurationError(
28362852
'Must be connected to MongoDB 3.6+ to use '
28372853
'arrayFilters.')
2838-
if not self.write_concern.acknowledged:
2854+
if not write_concern.acknowledged:
28392855
raise ConfigurationError(
28402856
'arrayFilters is unsupported for unacknowledged '
28412857
'writes.')
@@ -3250,7 +3266,7 @@ def find_and_modify(self, query={}, update=None,
32503266
cmd = SON([("findAndModify", self.__name)])
32513267
cmd.update(kwargs)
32523268

3253-
write_concern = self._write_concern_for_cmd(cmd)
3269+
write_concern = self._write_concern_for_cmd(cmd, None)
32543270

32553271
def _find_and_modify(session, sock_info, retryable_write):
32563272
if (sock_info.max_wire_version >= 4 and

pymongo/common.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from pymongo.read_concern import ReadConcern
3434
from pymongo.read_preferences import _MONGOS_MODES, _ServerMode
3535
from pymongo.ssl_support import validate_cert_reqs
36-
from pymongo.write_concern import WriteConcern
36+
from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern
3737

3838
try:
3939
from collections import OrderedDict
@@ -679,6 +679,14 @@ def write_concern(self):
679679
"""
680680
return self.__write_concern
681681

682+
def _write_concern_for(self, session):
683+
"""Read only access to the write concern of this instance or session.
684+
"""
685+
# Override this operation's write concern with the transaction's.
686+
if session and session._in_transaction:
687+
return DEFAULT_WRITE_CONCERN
688+
return self.write_concern
689+
682690
@property
683691
def read_preference(self):
684692
"""Read only access to the read preference of this instance.

pymongo/database.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from pymongo.message import _first_batch
3333
from pymongo.read_preferences import ReadPreference
3434
from pymongo.son_manipulator import SONManipulator
35-
from pymongo.write_concern import WriteConcern
35+
from pymongo.write_concern import DEFAULT_WRITE_CONCERN
3636

3737

3838
_INDEX_REGEX = {"name": {"$regex": r"^(?!.*\$)"}}
@@ -749,7 +749,7 @@ def drop_collection(self, name_or_collection, session=None):
749749
return self._command(
750750
sock_info, 'drop', value=_unicode(name),
751751
allowable_errors=['ns not found'],
752-
write_concern=self.write_concern,
752+
write_concern=self._write_concern_for(session),
753753
parse_write_concern_error=True,
754754
session=session)
755755

@@ -1362,7 +1362,7 @@ def __init__(self, database):
13621362

13631363
if not database.write_concern.acknowledged:
13641364
database = database.client.get_database(
1365-
database.name, write_concern=WriteConcern())
1365+
database.name, write_concern=DEFAULT_WRITE_CONCERN)
13661366
# can't just assign it since we've overridden __setattr__
13671367
object.__setattr__(self, "_db", database)
13681368

pymongo/mongo_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
from pymongo.topology import Topology
7272
from pymongo.topology_description import TOPOLOGY_TYPE
7373
from pymongo.settings import TopologySettings
74-
from pymongo.write_concern import WriteConcern
74+
from pymongo.write_concern import DEFAULT_WRITE_CONCERN
7575

7676

7777
class MongoClient(common.BaseObject):
@@ -1716,7 +1716,7 @@ def drop_database(self, name_or_database, session=None):
17161716
sock_info,
17171717
"dropDatabase",
17181718
read_preference=ReadPreference.PRIMARY,
1719-
write_concern=self.write_concern,
1719+
write_concern=self._write_concern_for(session),
17201720
parse_write_concern_error=True,
17211721
session=session)
17221722

@@ -1802,7 +1802,7 @@ def _database_default_options(self, name):
18021802
return self.get_database(
18031803
name, codec_options=DEFAULT_CODEC_OPTIONS,
18041804
read_preference=ReadPreference.PRIMARY,
1805-
write_concern=WriteConcern())
1805+
write_concern=DEFAULT_WRITE_CONCERN)
18061806

18071807
@property
18081808
def is_locked(self):

pymongo/write_concern.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,6 @@ def __eq__(self, other):
117117

118118
def __ne__(self, other):
119119
return self.__document != other.document
120+
121+
122+
DEFAULT_WRITE_CONCERN = WriteConcern()

0 commit comments

Comments
 (0)