Skip to content

Commit 4730209

Browse files
committed
PYTHON-1339 Retryable multi-statement writes.
MongoClient with retryWrites=true works when the cluster does not support retryable writes.
1 parent 9d7b4c4 commit 4730209

File tree

9 files changed

+1170
-200
lines changed

9 files changed

+1170
-200
lines changed

doc/changelog.rst

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ Highlights include:
3131
:meth:`~pymongo.database.Database.list_collection_names`.
3232
- Support for mongodb+srv:// URIs. See
3333
:class:`~pymongo.mongo_client.MongoClient` for details.
34-
- Index management helpers (
35-
:meth:`~pymongo.collection.Collection.create_index`,
34+
- Index management helpers
35+
(:meth:`~pymongo.collection.Collection.create_index`,
3636
:meth:`~pymongo.collection.Collection.create_indexes`,
3737
:meth:`~pymongo.collection.Collection.drop_index`,
3838
:meth:`~pymongo.collection.Collection.drop_indexes`,
3939
:meth:`~pymongo.collection.Collection.reindex`) now support maxTimeMS.
40+
- Support for retryable writes and the ``retryWrites`` URI option. See
41+
:class:`~pymongo.mongo_client.MongoClient` for details.
4042

4143
Deprecations:
4244

pymongo/bulk.py

Lines changed: 97 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
from pymongo.collation import validate_collation_or_none
2929
from pymongo.errors import (BulkWriteError,
3030
ConfigurationError,
31+
ConnectionFailure,
3132
InvalidOperation,
32-
OperationFailure)
33+
OperationFailure,
34+
ServerSelectionTimeoutError)
3335
from pymongo.message import (_INSERT, _UPDATE, _DELETE,
3436
_do_batched_insert,
3537
_do_batched_write_command,
@@ -64,6 +66,7 @@ def __init__(self, op_type):
6466
self.op_type = op_type
6567
self.index_map = []
6668
self.ops = []
69+
self.idx_offset = 0
6770

6871
def index(self, idx):
6972
"""Get the original index of an operation in this run.
@@ -145,6 +148,10 @@ def __init__(self, collection, ordered, bypass_document_validation):
145148
self.bypass_doc_val = bypass_document_validation
146149
self.uses_collation = False
147150
self.uses_array_filters = False
151+
self.is_retryable = True
152+
self.retrying = False
153+
# Extra state so that we know where to pick up on a retry attempt.
154+
self.current_run = None
148155

149156
def add_insert(self, document):
150157
"""Add an insert document to the list of ops.
@@ -169,6 +176,9 @@ def add_update(self, selector, update, multi=False, upsert=False,
169176
if array_filters is not None:
170177
self.uses_array_filters = True
171178
cmd['arrayFilters'] = array_filters
179+
if multi:
180+
# A bulk_write containing an update_many is not retryable.
181+
self.is_retryable = False
172182
self.ops.append((_UPDATE, cmd))
173183

174184
def add_replace(self, selector, replacement, upsert=False,
@@ -192,6 +202,9 @@ def add_delete(self, selector, limit, collation=None):
192202
if collation is not None:
193203
self.uses_collation = True
194204
cmd['collation'] = collation
205+
if limit == _DELETE_ALL:
206+
# A bulk_write containing a delete_many is not retryable.
207+
self.is_retryable = False
195208
self.ops.append((_DELETE, cmd))
196209

197210
def gen_ordered(self):
@@ -220,7 +233,70 @@ def gen_unordered(self):
220233
if run.ops:
221234
yield run
222235

223-
def execute_command(self, sock_info, generator, write_concern, session):
236+
def _execute_command(self, generator, write_concern, session,
237+
sock_info, op_id, retryable, full_result):
238+
if sock_info.max_wire_version < 5 and self.uses_collation:
239+
raise ConfigurationError(
240+
'Must be connected to MongoDB 3.4+ to use a collation.')
241+
if sock_info.max_wire_version < 6 and self.uses_array_filters:
242+
raise ConfigurationError(
243+
'Must be connected to MongoDB 3.6+ to use arrayFilters.')
244+
245+
db_name = self.collection.database.name
246+
client = self.collection.database.client
247+
listeners = client._event_listeners
248+
249+
if not self.current_run:
250+
self.current_run = next(generator)
251+
run = self.current_run
252+
253+
# sock_info.command validates the session, but we use
254+
# sock_info.write_command.
255+
sock_info.validate_session(client, session)
256+
while run:
257+
cmd = SON([(_COMMANDS[run.op_type], self.collection.name),
258+
('ordered', self.ordered)])
259+
if write_concern.document:
260+
cmd['writeConcern'] = write_concern.document
261+
if self.bypass_doc_val and sock_info.max_wire_version >= 4:
262+
cmd['bypassDocumentValidation'] = True
263+
if session:
264+
cmd['lsid'] = session._use_lsid()
265+
bwc = _BulkWriteContext(db_name, cmd, sock_info, op_id,
266+
listeners, session)
267+
268+
results = []
269+
while run.idx_offset < len(run.ops):
270+
if session and retryable:
271+
cmd['txnNumber'] = session._transaction_id()
272+
client._send_cluster_time(cmd, session)
273+
check_keys = run.op_type == _INSERT
274+
ops = islice(run.ops, run.idx_offset, None)
275+
# Run as many ops as possible.
276+
request_id, msg, to_send = _do_batched_write_command(
277+
self.namespace, run.op_type, cmd, ops, check_keys,
278+
self.collection.codec_options, bwc)
279+
if not to_send:
280+
raise InvalidOperation("cannot do an empty bulk write")
281+
result = bwc.write_command(request_id, msg, to_send)
282+
client._receive_cluster_time(result, session)
283+
results.append((run.idx_offset, result))
284+
# We're no longer in a retry once a command succeeds.
285+
self.retrying = False
286+
if self.ordered and "writeErrors" in result:
287+
break
288+
run.idx_offset += len(to_send)
289+
290+
_merge_command(run, full_result, results)
291+
292+
# We're supposed to continue if errors are
293+
# at the write concern level (e.g. wtimeout)
294+
if self.ordered and full_result['writeErrors']:
295+
break
296+
# Reset our state
297+
self.current_run = run = next(generator, None)
298+
299+
def execute_command(self, generator, write_concern, session):
224300
"""Execute using write commands.
225301
"""
226302
# nModified is only reported for write commands, not legacy ops.
@@ -235,51 +311,16 @@ def execute_command(self, sock_info, generator, write_concern, session):
235311
"upserted": [],
236312
}
237313
op_id = _randint()
238-
db_name = self.collection.database.name
239-
client = self.collection.database.client
240-
listeners = client._event_listeners
241314

242-
with self.collection.database.client._tmp_session(session) as s:
243-
# sock_info.command validates the session, but we use
244-
# sock_info.write_command.
245-
sock_info.validate_session(client, s)
246-
for run in generator:
247-
cmd = SON([(_COMMANDS[run.op_type], self.collection.name),
248-
('ordered', self.ordered)])
249-
if write_concern.document:
250-
cmd['writeConcern'] = write_concern.document
251-
if self.bypass_doc_val and sock_info.max_wire_version >= 4:
252-
cmd['bypassDocumentValidation'] = True
253-
if s:
254-
cmd['lsid'] = s._use_lsid()
255-
bwc = _BulkWriteContext(db_name, cmd, sock_info, op_id,
256-
listeners, s)
257-
258-
results = []
259-
idx_offset = 0
260-
while idx_offset < len(run.ops):
261-
check_keys = run.op_type == _INSERT
262-
ops = islice(run.ops, idx_offset, None)
263-
# Run as many ops as possible.
264-
client._send_cluster_time(cmd, s)
265-
request_id, msg, to_send = _do_batched_write_command(
266-
self.namespace, run.op_type, cmd, ops, check_keys,
267-
self.collection.codec_options, bwc)
268-
if not to_send:
269-
raise InvalidOperation("cannot do an empty bulk write")
270-
result = bwc.write_command(request_id, msg, to_send)
271-
client._receive_cluster_time(result, s)
272-
results.append((idx_offset, result))
273-
if self.ordered and "writeErrors" in result:
274-
break
275-
idx_offset += len(to_send)
276-
277-
_merge_command(run, full_result, results)
278-
279-
# We're supposed to continue if errors are
280-
# at the write concern level (e.g. wtimeout)
281-
if self.ordered and full_result['writeErrors']:
282-
break
315+
def retryable_bulk(session, sock_info, retryable):
316+
self._execute_command(
317+
generator, write_concern, session, sock_info, op_id,
318+
retryable, full_result)
319+
320+
client = self.collection.database.client
321+
with client._tmp_session(session) as s:
322+
client._retry_with_session(
323+
self.is_retryable, retryable_bulk, s, self)
283324

284325
if full_result["writeErrors"] or full_result["writeConcernErrors"]:
285326
if full_result['writeErrors']:
@@ -309,6 +350,12 @@ def execute_insert_no_results(self, sock_info, run, op_id, acknowledged):
309350
def execute_no_results(self, sock_info, generator):
310351
"""Execute all operations, returning no results (w=0).
311352
"""
353+
if self.uses_collation:
354+
raise ConfigurationError(
355+
'Collation is unsupported for unacknowledged writes.')
356+
if self.uses_array_filters:
357+
raise ConfigurationError(
358+
'arrayFilters is unsupported for unacknowledged writes.')
312359
# Cannot have both unacknowledged write and bypass document validation.
313360
if self.bypass_doc_val and sock_info.max_wire_version >= 4:
314361
raise OperationFailure("Cannot set bypass_document_validation with"
@@ -378,25 +425,11 @@ def execute(self, write_concern, session):
378425
generator = self.gen_unordered()
379426

380427
client = self.collection.database.client
381-
with client._socket_for_writes() as sock_info:
382-
if sock_info.max_wire_version < 5 and self.uses_collation:
383-
raise ConfigurationError(
384-
'Must be connected to MongoDB 3.4+ to use a collation.')
385-
if sock_info.max_wire_version < 6 and self.uses_array_filters:
386-
raise ConfigurationError(
387-
'Must be connected to MongoDB 3.6+ to use arrayFilters.')
388-
if not write_concern.acknowledged:
389-
if self.uses_collation:
390-
raise ConfigurationError(
391-
'Collation is unsupported for unacknowledged writes.')
392-
if self.uses_array_filters:
393-
raise ConfigurationError(
394-
'arrayFilters is unsupported for unacknowledged '
395-
'writes.')
428+
if not write_concern.acknowledged:
429+
with client._socket_for_writes() as sock_info:
396430
self.execute_no_results(sock_info, generator)
397-
else:
398-
return self.execute_command(
399-
sock_info, generator, write_concern, session)
431+
else:
432+
return self.execute_command(generator, write_concern, session)
400433

401434

402435
class BulkUpsertOperation(object):

pymongo/collection.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -738,8 +738,7 @@ def gen():
738738

739739
blk = _Bulk(self, ordered, bypass_document_validation)
740740
blk.ops = [doc for doc in gen()]
741-
with self.__database.client._tmp_session(session) as s:
742-
blk.execute(self.write_concern.document, session=s)
741+
blk.execute(self.write_concern.document, session=session)
743742
return InsertManyResult(inserted_ids, self.write_concern.acknowledged)
744743

745744
def _update(self, sock_info, criteria, document, upsert=False,

0 commit comments

Comments
 (0)