Skip to content

Commit 4a9c65e

Browse files
committed
PYTHON-1329 - OP_MSG unacknowledged bulk writes
1 parent f8883df commit 4a9c65e

File tree

3 files changed

+134
-40
lines changed

3 files changed

+134
-40
lines changed

pymongo/_cmessagemodule.c

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1238,36 +1238,44 @@ _batched_op_msg(
12381238
while ((doc = PyIter_Next(iterator)) != NULL) {
12391239
int cur_doc_begin = buffer_get_position(buffer);
12401240
int cur_size;
1241-
int enough_data = 0;
1242-
int enough_documents = 0;
1241+
int doc_too_large = 0;
1242+
int unacked_doc_too_large = 0;
12431243
if (!write_dict(state->_cbson, buffer, doc, check_keys,
12441244
&options, 1)) {
12451245
goto cmditerfail;
12461246
}
1247-
/* We have enough data, return this batch. */
1248-
enough_data = (buffer_get_position(buffer) > max_message_size);
1249-
enough_documents = (idx >= max_write_batch_size);
1250-
if (enough_data || enough_documents) {
1251-
cur_size = buffer_get_position(buffer) - cur_doc_begin;
1252-
1253-
/* This single document is too large for the message. */
1254-
if (!idx) {
1255-
if (op == _INSERT) {
1256-
_set_document_too_large(cur_size, max_bson_size);
1257-
} else {
1258-
PyObject* DocumentTooLarge = _error("DocumentTooLarge");
1259-
if (DocumentTooLarge) {
1260-
/*
1261-
* There's nothing intelligent we can say
1262-
* about size for update and remove.
1263-
*/
1264-
PyErr_SetString(DocumentTooLarge,
1265-
"operation document too large");
1266-
Py_DECREF(DocumentTooLarge);
1267-
}
1247+
cur_size = buffer_get_position(buffer) - cur_doc_begin;
1248+
1249+
/* Does the first document exceed max_message_size? */
1250+
doc_too_large = (idx == 0 && (buffer_get_position(buffer) > max_message_size));
1251+
/* When OP_MSG is used unacknowledged we have to check
1252+
* document size client side or applications won't be notified.
1253+
* Otherwise we let the server deal with documents that are too large
1254+
* since ordered=False causes those documents to be skipped instead of
1255+
* halting the bulk write operation.
1256+
* */
1257+
unacked_doc_too_large = (!ack && cur_size > max_bson_size);
1258+
if (doc_too_large || unacked_doc_too_large) {
1259+
if (op == _INSERT) {
1260+
_set_document_too_large(cur_size, max_bson_size);
1261+
} else {
1262+
PyObject* DocumentTooLarge = _error("DocumentTooLarge");
1263+
if (DocumentTooLarge) {
1264+
/*
1265+
* There's nothing intelligent we can say
1266+
* about size for update and delete.
1267+
*/
1268+
PyErr_Format(
1269+
DocumentTooLarge,
1270+
"%s command document too large",
1271+
(op == _UPDATE) ? "update": "delete");
1272+
Py_DECREF(DocumentTooLarge);
12681273
}
1269-
goto cmditerfail;
12701274
}
1275+
goto cmditerfail;
1276+
}
1277+
/* We have enough data, return this batch. */
1278+
if (buffer_get_position(buffer) > max_message_size) {
12711279
/*
12721280
* Roll the existing buffer back to the beginning
12731281
* of the last document encoded.
@@ -1280,6 +1288,10 @@ _batched_op_msg(
12801288
}
12811289
Py_CLEAR(doc);
12821290
idx += 1;
1291+
/* We have enough documents, return this batch. */
1292+
if (idx == max_write_batch_size) {
1293+
break;
1294+
}
12831295
}
12841296
Py_DECREF(iterator);
12851297

@@ -1580,10 +1592,12 @@ _batched_write_command(
15801592
if (DocumentTooLarge) {
15811593
/*
15821594
* There's nothing intelligent we can say
1583-
* about size for update and remove.
1595+
* about size for update and delete.
15841596
*/
1585-
PyErr_SetString(DocumentTooLarge,
1586-
"command document too large");
1597+
PyErr_Format(
1598+
DocumentTooLarge,
1599+
"%s command document too large",
1600+
(op == _UPDATE) ? "update": "delete");
15871601
Py_DECREF(DocumentTooLarge);
15881602
}
15891603
}

pymongo/bulk.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,68 @@ def execute_insert_no_results(self, sock_info, run, op_id, acknowledged):
358358
self.collection.full_name, run.ops, True, acknowledged, concern,
359359
not self.ordered, self.collection.codec_options, bwc)
360360

361+
def execute_op_msg_no_results(self, sock_info, generator):
362+
"""Execute write commands with OP_MSG and w=0 writeConcern, unordered.
363+
"""
364+
db_name = self.collection.database.name
365+
client = self.collection.database.client
366+
listeners = client._event_listeners
367+
op_id = _randint()
368+
369+
if not self.current_run:
370+
self.current_run = next(generator)
371+
run = self.current_run
372+
373+
while run:
374+
cmd = SON([(_COMMANDS[run.op_type], self.collection.name),
375+
('ordered', False),
376+
('writeConcern', {'w': 0})])
377+
bwc = _BulkWriteContext(db_name, cmd, sock_info, op_id,
378+
listeners, None)
379+
380+
while run.idx_offset < len(run.ops):
381+
check_keys = run.op_type == _INSERT
382+
ops = islice(run.ops, run.idx_offset, None)
383+
# Run as many ops as possible.
384+
request_id, msg, to_send = _do_bulk_write_command(
385+
self.namespace, run.op_type, cmd, ops, check_keys,
386+
self.collection.codec_options, bwc)
387+
if not to_send:
388+
raise InvalidOperation("cannot do an empty bulk write")
389+
run.idx_offset += len(to_send)
390+
# Though this isn't strictly a "legacy" write, the helper
391+
# handles publishing commands and sending our message
392+
# without receiving a result. Send 0 for max_doc_size
393+
# to disable size checking. Size checking is handled while
394+
# the documents are encoded to BSON.
395+
bwc.legacy_write(request_id, msg, 0, False, to_send)
396+
self.current_run = run = next(generator, None)
397+
398+
def execute_command_no_results(self, sock_info, generator):
399+
"""Execute write commands with OP_MSG and w=0 WriteConcern, ordered.
400+
"""
401+
full_result = {
402+
"writeErrors": [],
403+
"writeConcernErrors": [],
404+
"nInserted": 0,
405+
"nUpserted": 0,
406+
"nMatched": 0,
407+
"nModified": 0,
408+
"nRemoved": 0,
409+
"upserted": [],
410+
}
411+
# Ordered bulk writes have to be acknowledged so that we stop
412+
# processing at the first error, even when the application
413+
# specified unacknowledged writeConcern.
414+
write_concern = WriteConcern()
415+
op_id = _randint()
416+
try:
417+
self._execute_command(
418+
generator, write_concern, None,
419+
sock_info, op_id, False, full_result)
420+
except OperationFailure:
421+
pass
422+
361423
def execute_no_results(self, sock_info, generator):
362424
"""Execute all operations, returning no results (w=0).
363425
"""
@@ -367,10 +429,17 @@ def execute_no_results(self, sock_info, generator):
367429
if self.uses_array_filters:
368430
raise ConfigurationError(
369431
'arrayFilters is unsupported for unacknowledged writes.')
370-
# Cannot have both unacknowledged write and bypass document validation.
432+
# Cannot have both unacknowledged writes and bypass document validation.
371433
if self.bypass_doc_val and sock_info.max_wire_version >= 4:
372434
raise OperationFailure("Cannot set bypass_document_validation with"
373435
" unacknowledged write concern")
436+
437+
# OP_MSG
438+
if sock_info.max_wire_version > 5:
439+
if self.ordered:
440+
return self.execute_command_no_results(sock_info, generator)
441+
return self.execute_op_msg_no_results(sock_info, generator)
442+
374443
coll = self.collection
375444
# If ordered is True we have to send GLE or use write
376445
# commands so we can abort on the first error.

pymongo/message.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -957,8 +957,8 @@ def _raise_document_too_large(operation, doc_size, max_size):
957957
" bytes." % (doc_size, max_size))
958958
else:
959959
# There's nothing intelligent we can say
960-
# about size for update and remove
961-
raise DocumentTooLarge("command document too large")
960+
# about size for update and delete
961+
raise DocumentTooLarge("%r command document too large" % (operation,))
962962

963963

964964
def _do_batched_insert(collection_name, docs, check_keys,
@@ -1089,18 +1089,29 @@ def _batched_op_msg_impl(
10891089
for doc in docs:
10901090
# Encode the current operation
10911091
value = _dict_to_bson(doc, check_keys, opts)
1092-
# Is there enough room to add this document?
1093-
enough_data = (buf.tell() + len(value)) >= max_message_size
1094-
enough_documents = (idx >= max_write_batch_size)
1095-
if enough_data or enough_documents:
1096-
if not idx:
1097-
write_op = "insert" if operation == _INSERT else None
1098-
_raise_document_too_large(
1099-
write_op, len(value), max_bson_size)
1092+
doc_length = len(value)
1093+
new_message_size = buf.tell() + doc_length
1094+
# Does first document exceed max_message_size?
1095+
doc_too_large = (idx == 0 and (new_message_size > max_message_size))
1096+
# When OP_MSG is used unacknowleged we have to check
1097+
# document size client side or applications won't be notified.
1098+
# Otherwise we let the server deal with documents that are too large
1099+
# since ordered=False causes those documents to be skipped instead of
1100+
# halting the bulk write operation.
1101+
unacked_doc_too_large = (not ack and (doc_length > max_bson_size))
1102+
if doc_too_large or unacked_doc_too_large:
1103+
write_op = list(_FIELD_MAP.keys())[operation]
1104+
_raise_document_too_large(
1105+
write_op, len(value), max_bson_size)
1106+
# We have enough data, return this batch.
1107+
if new_message_size > max_message_size:
11001108
break
11011109
buf.write(value)
11021110
to_send.append(doc)
11031111
idx += 1
1112+
# We have enough documents, return this batch.
1113+
if idx == max_write_batch_size:
1114+
break
11041115

11051116
# Write type 1 section size
11061117
length = buf.tell()
@@ -1305,7 +1316,7 @@ def _batched_write_command_impl(
13051316
enough_documents = (idx >= max_write_batch_size)
13061317
if enough_data or enough_documents:
13071318
if not idx:
1308-
write_op = "insert" if operation == _INSERT else None
1319+
write_op = list(_FIELD_MAP.keys())[operation]
13091320
_raise_document_too_large(
13101321
write_op, len(value), max_bson_size)
13111322
break
@@ -1434,7 +1445,7 @@ def __init__(self, flags, payload_document):
14341445
self.payload_document = payload_document
14351446

14361447
def raw_response(self, cursor_id=None):
1437-
raise NotImplemented
1448+
raise NotImplementedError
14381449

14391450
def unpack_response(self, cursor_id=None,
14401451
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS):

0 commit comments

Comments
 (0)