Skip to content

Commit ecab1c9

Browse files
author
Luke Lovett
committed
PYTHON-1090, PYTHON-1098 - Use sane codec options when reading write responses.
When reading a write response from the server, we now use the 'replace' unicode_decode_error_handler and 'dict' as the document_class.
1 parent 8df6b7c commit ecab1c9

File tree

5 files changed

+122
-22
lines changed

5 files changed

+122
-22
lines changed

pymongo/bulk.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,10 @@ class _Bulk(object):
203203
def __init__(self, collection, ordered, bypass_document_validation):
204204
"""Initialize a _Bulk instance.
205205
"""
206-
self.collection = collection
206+
self.collection = collection.with_options(
207+
codec_options=collection.codec_options._replace(
208+
unicode_decode_error_handler='replace',
209+
document_class=dict))
207210
self.ordered = ordered
208211
self.ops = []
209212
self.name = "%s.%s" % (collection.database.name, collection.name)

pymongo/collection.py

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ def __init__(self, database, name, create=False, codec_options=None,
163163
if create or kwargs:
164164
self.__create(kwargs)
165165

166+
self.__write_response_codec_options = self.codec_options._replace(
167+
unicode_decode_error_handler='replace',
168+
document_class=dict)
169+
166170
def _socket_for_reads(self):
167171
return self.__database.client._socket_for_reads(self.read_preference)
168172

@@ -506,17 +510,19 @@ def _insert_one(
506510
if bypass_doc_val and sock_info.max_wire_version >= 4:
507511
command['bypassDocumentValidation'] = True
508512
# Insert command.
509-
result = sock_info.command(self.__database.name,
510-
command,
511-
codec_options=self.codec_options,
512-
check_keys=check_keys)
513+
result = sock_info.command(
514+
self.__database.name,
515+
command,
516+
codec_options=self.__write_response_codec_options,
517+
check_keys=check_keys)
513518
_check_write_command_response([(0, result)])
514519
else:
515520
# Legacy OP_INSERT.
516521
self._legacy_write(
517522
sock_info, 'insert', command, acknowledged, op_id,
518523
bypass_doc_val, message.insert, self.__full_name, [doc],
519-
check_keys, acknowledged, concern, False, self.codec_options)
524+
check_keys, acknowledged, concern, False,
525+
self.__write_response_codec_options)
520526
if not isinstance(doc, RawBSONDocument):
521527
return doc.get('_id')
522528

@@ -575,13 +581,13 @@ def gen():
575581
# Batched insert command.
576582
results = message._do_batched_write_command(
577583
self.database.name + ".$cmd", message._INSERT, command,
578-
gen(), check_keys, self.codec_options, bwc)
584+
gen(), check_keys, self.__write_response_codec_options, bwc)
579585
_check_write_command_response(results)
580586
else:
581587
# Legacy batched OP_INSERT.
582588
message._do_batched_insert(self.__full_name, gen(), check_keys,
583589
acknowledged, concern, not ordered,
584-
self.codec_options, bwc)
590+
self.__write_response_codec_options, bwc)
585591
return ids
586592

587593
def insert_one(self, document, bypass_document_validation=False):
@@ -704,9 +710,10 @@ def _update(self, sock_info, criteria, document, upsert=False,
704710

705711
# The command result has to be published for APM unmodified
706712
# so we make a shallow copy here before adding updatedExisting.
707-
result = sock_info.command(self.__database.name,
708-
command,
709-
codec_options=self.codec_options).copy()
713+
result = sock_info.command(
714+
self.__database.name,
715+
command,
716+
codec_options=self.__write_response_codec_options).copy()
710717
_check_write_command_response([(0, result)])
711718
# Add the updatedExisting field for compatibility.
712719
if result.get('n') and 'upserted' not in result:
@@ -725,7 +732,7 @@ def _update(self, sock_info, criteria, document, upsert=False,
725732
sock_info, 'update', command, acknowledged, op_id,
726733
bypass_doc_val, message.update, self.__full_name, upsert,
727734
multi, criteria, document, acknowledged, concern, check_keys,
728-
self.codec_options)
735+
self.__write_response_codec_options)
729736

730737
def replace_one(self, filter, replacement, upsert=False,
731738
bypass_document_validation=False):
@@ -911,17 +918,19 @@ def _delete(
911918

912919
if sock_info.max_wire_version > 1 and acknowledged:
913920
# Delete command.
914-
result = sock_info.command(self.__database.name,
915-
command,
916-
codec_options=self.codec_options)
921+
result = sock_info.command(
922+
self.__database.name,
923+
command,
924+
codec_options=self.__write_response_codec_options)
917925
_check_write_command_response([(0, result)])
918926
return result
919927
else:
920928
# Legacy OP_DELETE.
921929
return self._legacy_write(
922930
sock_info, 'delete', command, acknowledged, op_id,
923931
False, message.delete, self.__full_name, criteria,
924-
acknowledged, concern, self.codec_options, int(not multi))
932+
acknowledged, concern, self.__write_response_codec_options,
933+
int(not multi))
925934

926935
def delete_one(self, filter):
927936
"""Delete a single document matching the filter.
@@ -1192,11 +1201,11 @@ def parallel_scan(self, num_cursors):
11921201
def _count(self, cmd):
11931202
"""Internal count helper."""
11941203
with self._socket_for_reads() as (sock_info, slave_ok):
1195-
res = self._command(sock_info, cmd, slave_ok,
1196-
allowable_errors=["ns missing"],
1197-
codec_options=self.codec_options._replace(
1198-
document_class=dict),
1199-
read_concern=self.read_concern)
1204+
res = self._command(
1205+
sock_info, cmd, slave_ok,
1206+
allowable_errors=["ns missing"],
1207+
codec_options=self.__write_response_codec_options,
1208+
read_concern=self.read_concern)
12001209
if res.get("errmsg", "") == "ns missing":
12011210
return 0
12021211
return int(res["n"])

pymongo/helpers.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838

3939
_UUNDER = u("_")
4040

41+
_UNICODE_REPLACE_CODEC_OPTIONS = CodecOptions(
42+
unicode_decode_error_handler='replace')
43+
4144

4245
def _gen_index_name(keys):
4346
"""Generate an index name from the set of fields it is over."""
@@ -87,7 +90,9 @@ def _index_document(index_list):
8790
return index
8891

8992

90-
def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
93+
def _unpack_response(response,
94+
cursor_id=None,
95+
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS):
9196
"""Unpack a response from the database.
9297
9398
Check the response for errors and unpack, returning a dictionary

test/test_collection.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,75 @@ def test_duplicate_key_error(self):
11161116
self.assertIsNotNone(context.exception.details)
11171117
self.assertEqual(1, db.test.count())
11181118

1119+
def test_write_error_text_handling(self):
1120+
db = self.db
1121+
db.drop_collection("test")
1122+
1123+
db.test.create_index("text", unique=True)
1124+
1125+
# Test workaround for SERVER-24007
1126+
data = (b'a\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1127+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1128+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1129+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1130+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1131+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1132+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1133+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1134+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1135+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1136+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1137+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1138+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1139+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1140+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1141+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1142+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1143+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1144+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1145+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1146+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1147+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1148+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1149+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1150+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1151+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1152+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83'
1153+
b'\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83')
1154+
1155+
text = data.decode("utf8")
1156+
db.test.insert_one({"text": text})
1157+
1158+
# Should raise DuplicateKeyError, not InvalidBSON
1159+
self.assertRaises(DuplicateKeyError,
1160+
db.test.insert_one,
1161+
{"text": text})
1162+
1163+
self.assertRaises(DuplicateKeyError,
1164+
db.test.insert,
1165+
{"text": text})
1166+
1167+
self.assertRaises(DuplicateKeyError,
1168+
db.test.insert,
1169+
[{"text": text}])
1170+
1171+
self.assertRaises(DuplicateKeyError,
1172+
db.test.replace_one,
1173+
{"_id": ObjectId()},
1174+
{"text": text},
1175+
upsert=True)
1176+
1177+
self.assertRaises(DuplicateKeyError,
1178+
db.test.update,
1179+
{"_id": ObjectId()},
1180+
{"text": text},
1181+
upsert=True)
1182+
1183+
# Should raise BulkWriteError, not InvalidBSON
1184+
self.assertRaises(BulkWriteError,
1185+
db.test.insert_many,
1186+
[{"text": text}])
1187+
11191188
def test_wtimeout(self):
11201189
# Ensure setting wtimeout doesn't disable write concern altogether.
11211190
# See SERVER-12596.

test/test_raw_bson.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,17 @@ def test_raw_bson_document_embedded(self):
111111
uuid_representation=JAVA_LEGACY)).find_one()
112112
self.assertEqual(rbd['embedded'][0]['_id'],
113113
result['embedded'][0]['_id'])
114+
115+
@client_context.require_connection
116+
def test_write_response_raw_bson(self):
117+
coll = self.client.get_database(
118+
'pymongo_test',
119+
codec_options=CodecOptions(document_class=RawBSONDocument)).test_raw
120+
121+
# No Exceptions raised while handling write response.
122+
coll.insert_one(self.document)
123+
coll.delete_one(self.document)
124+
coll.insert_many([self.document])
125+
coll.delete_many(self.document)
126+
coll.update_one(self.document, {'$set': {'a': 'b'}}, upsert=True)
127+
coll.update_many(self.document, {'$set': {'b': 'c'}})

0 commit comments

Comments
 (0)