Skip to content

Commit f61b0e4

Browse files
committed
PYTHON-684 - Ignore wnote/jnote from legacy servers.
Stop unnecessarily raising OperationFailure in the Bulk API when a pre-2.6 server returns a result with a wnote or jnote field.
1 parent 7d55d77 commit f61b0e4

File tree

2 files changed

+104
-121
lines changed

2 files changed

+104
-121
lines changed

pymongo/bulk.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,6 @@ def _make_error(index, code, errmsg, operation):
8282
def _merge_legacy(run, full_result, result, index):
8383
"""Merge a result from a legacy opcode into the full results.
8484
"""
85-
# MongoDB 2.6 returns {'ok': 0, 'code': 2, ...} if the j write
86-
# concern option is used with --nojournal or w > 1 is used with
87-
# a standalone mongod instance. Raise immediately here for
88-
# consistency when talking to older servers. Since these are
89-
# configuration errors related to write concern the entire batch
90-
# will fail.
91-
note = result.get("jnote", result.get("wnote"))
92-
if note:
93-
raise OperationFailure(note, _BAD_VALUE, result)
94-
9585
affected = result.get('n', 0)
9686

9787
errmsg = result.get("errmsg", result.get("err", ""))

test/test_bulk.py

Lines changed: 104 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -903,30 +903,58 @@ def test_fsync_and_j(self):
903903
OperationFailure,
904904
batch.execute, {'fsync': True, 'j': True})
905905

906-
def test_j_without_journal(self):
907-
client = self.coll.database.connection
908-
if not server_started_with_nojournal(client):
909-
raise SkipTest("Need mongod started with --nojournal")
906+
def test_write_concern_failure_ordered(self):
907+
if not self.is_repl:
908+
raise SkipTest("Need a replica set to test.")
910909

911-
# Using j=True without journaling is a hard failure.
910+
# Ensure we don't raise on wnote.
912911
batch = self.coll.initialize_ordered_bulk_op()
913-
batch.insert({})
914-
self.assertRaises(OperationFailure, batch.execute, {'j': True})
915-
916-
def test_write_concern_failure_ordered(self):
912+
batch.find({"something": "that does not exist"}).remove()
913+
self.assertTrue(batch.execute({"w": self.w}))
917914

918915
batch = self.coll.initialize_ordered_bulk_op()
919916
batch.insert({'a': 1})
920917
batch.insert({'a': 2})
921918

922-
# Using w > 1 with no replication is a hard failure.
923-
if not self.is_repl:
924-
self.assertRaises(OperationFailure,
925-
batch.execute, {'w': 5, 'wtimeout': 1})
926-
927919
# Replication wtimeout is a 'soft' error.
928920
# It shouldn't stop batch processing.
921+
try:
922+
batch.execute({'w': self.w + 1, 'wtimeout': 1})
923+
except BulkWriteError, exc:
924+
result = exc.details
925+
self.assertEqual(exc.code, 65)
929926
else:
927+
self.fail("Error not raised")
928+
929+
self.assertEqualResponse(
930+
{'nMatched': 0,
931+
'nModified': 0,
932+
'nUpserted': 0,
933+
'nInserted': 2,
934+
'nRemoved': 0,
935+
'upserted': [],
936+
'writeErrors': []},
937+
result)
938+
939+
# When talking to legacy servers there will be a
940+
# write concern error for each operation.
941+
self.assertTrue(len(result['writeConcernErrors']) > 0)
942+
943+
failed = result['writeConcernErrors'][0]
944+
self.assertEqual(64, failed['code'])
945+
self.assertTrue(isinstance(failed['errmsg'], basestring))
946+
947+
self.coll.remove()
948+
self.coll.ensure_index('a', unique=True)
949+
950+
# Fail due to write concern support as well
951+
# as duplicate key error on ordered batch.
952+
try:
953+
batch = self.coll.initialize_ordered_bulk_op()
954+
batch.insert({'a': 1})
955+
batch.find({'a': 3}).upsert().replace_one({'b': 1})
956+
batch.insert({'a': 1})
957+
batch.insert({'a': 2})
930958
try:
931959
batch.execute({'w': self.w + 1, 'wtimeout': 1})
932960
except BulkWriteError, exc:
@@ -938,74 +966,66 @@ def test_write_concern_failure_ordered(self):
938966
self.assertEqualResponse(
939967
{'nMatched': 0,
940968
'nModified': 0,
941-
'nUpserted': 0,
942-
'nInserted': 2,
969+
'nUpserted': 1,
970+
'nInserted': 1,
943971
'nRemoved': 0,
944-
'upserted': [],
945-
'writeErrors': []},
972+
'upserted': [{'index': 1, '_id': '...'}],
973+
'writeErrors': [
974+
{'index': 2,
975+
'code': 11000,
976+
'errmsg': '...',
977+
'op': {'_id': '...', 'a': 1}}]},
946978
result)
947979

948-
# When talking to legacy servers there will be a
949-
# write concern error for each operation.
950-
self.assertTrue(len(result['writeConcernErrors']) > 0)
951-
952-
failed = result['writeConcernErrors'][0]
953-
self.assertEqual(64, failed['code'])
954-
self.assertTrue(isinstance(failed['errmsg'], basestring))
955-
956-
self.coll.remove()
957-
self.coll.ensure_index('a', unique=True)
958-
959-
# Fail due to write concern support as well
960-
# as duplicate key error on ordered batch.
961-
try:
962-
batch = self.coll.initialize_ordered_bulk_op()
963-
batch.insert({'a': 1})
964-
batch.find({'a': 3}).upsert().replace_one({'b': 1})
965-
batch.insert({'a': 1})
966-
batch.insert({'a': 2})
967-
try:
968-
batch.execute({'w': self.w + 1, 'wtimeout': 1})
969-
except BulkWriteError, exc:
970-
result = exc.details
971-
self.assertEqual(exc.code, 65)
972-
else:
973-
self.fail("Error not raised")
974-
975-
self.assertEqualResponse(
976-
{'nMatched': 0,
977-
'nModified': 0,
978-
'nUpserted': 1,
979-
'nInserted': 1,
980-
'nRemoved': 0,
981-
'upserted': [{'index': 1, '_id': '...'}],
982-
'writeErrors': [
983-
{'index': 2,
984-
'code': 11000,
985-
'errmsg': '...',
986-
'op': {'_id': '...', 'a': 1}}]},
987-
result)
988-
989-
self.assertEqual(2, len(result['writeConcernErrors']))
990-
failed = result['writeErrors'][0]
991-
self.assertTrue("duplicate" in failed['errmsg'])
992-
finally:
993-
self.coll.drop_index([('a', 1)])
980+
self.assertEqual(2, len(result['writeConcernErrors']))
981+
failed = result['writeErrors'][0]
982+
self.assertTrue("duplicate" in failed['errmsg'])
983+
finally:
984+
self.coll.drop_index([('a', 1)])
994985

995986
def test_write_concern_failure_unordered(self):
987+
if not self.is_repl:
988+
raise SkipTest("Need a replica set to test.")
989+
990+
# Ensure we don't raise on wnote.
991+
batch = self.coll.initialize_ordered_bulk_op()
992+
batch.find({"something": "that does not exist"}).remove()
993+
self.assertTrue(batch.execute({"w": self.w}))
996994

997995
batch = self.coll.initialize_unordered_bulk_op()
998996
batch.insert({'a': 1})
999997
batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, 'b': 1}})
1000998
batch.insert({'a': 2})
1001999

1002-
# Using w > 1 with no replication is a hard failure.
1003-
if not self.is_repl:
1004-
self.assertRaises(OperationFailure,
1005-
batch.execute, {'w': 5, 'wtimeout': 1})
10061000
# Replication wtimeout is a 'soft' error.
10071001
# It shouldn't stop batch processing.
1002+
try:
1003+
batch.execute({'w': self.w + 1, 'wtimeout': 1})
1004+
except BulkWriteError, exc:
1005+
result = exc.details
1006+
self.assertEqual(exc.code, 65)
10081007
else:
1008+
self.fail("Error not raised")
1009+
1010+
self.assertEqual(2, result['nInserted'])
1011+
self.assertEqual(1, result['nUpserted'])
1012+
self.assertEqual(0, len(result['writeErrors']))
1013+
# When talking to legacy servers there will be a
1014+
# write concern error for each operation.
1015+
self.assertTrue(len(result['writeConcernErrors']) > 1)
1016+
1017+
self.coll.remove()
1018+
self.coll.ensure_index('a', unique=True)
1019+
1020+
# Fail due to write concern support as well
1021+
# as duplicate key error on unordered batch.
1022+
try:
1023+
batch = self.coll.initialize_unordered_bulk_op()
1024+
batch.insert({'a': 1})
1025+
batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3,
1026+
'b': 1}})
1027+
batch.insert({'a': 1})
1028+
batch.insert({'a': 2})
10091029
try:
10101030
batch.execute({'w': self.w + 1, 'wtimeout': 1})
10111031
except BulkWriteError, exc:
@@ -1016,54 +1036,27 @@ def test_write_concern_failure_unordered(self):
10161036

10171037
self.assertEqual(2, result['nInserted'])
10181038
self.assertEqual(1, result['nUpserted'])
1019-
self.assertEqual(0, len(result['writeErrors']))
1039+
self.assertEqual(1, len(result['writeErrors']))
10201040
# When talking to legacy servers there will be a
10211041
# write concern error for each operation.
10221042
self.assertTrue(len(result['writeConcernErrors']) > 1)
10231043

1024-
self.coll.remove()
1025-
self.coll.ensure_index('a', unique=True)
1044+
failed = result['writeErrors'][0]
1045+
self.assertEqual(2, failed['index'])
1046+
self.assertEqual(11000, failed['code'])
1047+
self.assertTrue(isinstance(failed['errmsg'], basestring))
1048+
self.assertEqual(1, failed['op']['a'])
10261049

1027-
# Fail due to write concern support as well
1028-
# as duplicate key error on unordered batch.
1029-
try:
1030-
batch = self.coll.initialize_unordered_bulk_op()
1031-
batch.insert({'a': 1})
1032-
batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3,
1033-
'b': 1}})
1034-
batch.insert({'a': 1})
1035-
batch.insert({'a': 2})
1036-
try:
1037-
batch.execute({'w': self.w + 1, 'wtimeout': 1})
1038-
except BulkWriteError, exc:
1039-
result = exc.details
1040-
self.assertEqual(exc.code, 65)
1041-
else:
1042-
self.fail("Error not raised")
1043-
1044-
self.assertEqual(2, result['nInserted'])
1045-
self.assertEqual(1, result['nUpserted'])
1046-
self.assertEqual(1, len(result['writeErrors']))
1047-
# When talking to legacy servers there will be a
1048-
# write concern error for each operation.
1049-
self.assertTrue(len(result['writeConcernErrors']) > 1)
1050-
1051-
failed = result['writeErrors'][0]
1052-
self.assertEqual(2, failed['index'])
1053-
self.assertEqual(11000, failed['code'])
1054-
self.assertTrue(isinstance(failed['errmsg'], basestring))
1055-
self.assertEqual(1, failed['op']['a'])
1056-
1057-
failed = result['writeConcernErrors'][0]
1058-
self.assertEqual(64, failed['code'])
1059-
self.assertTrue(isinstance(failed['errmsg'], basestring))
1060-
1061-
upserts = result['upserted']
1062-
self.assertEqual(1, len(upserts))
1063-
self.assertEqual(1, upserts[0]['index'])
1064-
self.assertTrue(upserts[0].get('_id'))
1065-
finally:
1066-
self.coll.drop_index([('a', 1)])
1050+
failed = result['writeConcernErrors'][0]
1051+
self.assertEqual(64, failed['code'])
1052+
self.assertTrue(isinstance(failed['errmsg'], basestring))
1053+
1054+
upserts = result['upserted']
1055+
self.assertEqual(1, len(upserts))
1056+
self.assertEqual(1, upserts[0]['index'])
1057+
self.assertTrue(upserts[0].get('_id'))
1058+
finally:
1059+
self.coll.drop_index([('a', 1)])
10671060

10681061

10691062
class TestBulkNoResults(BulkTestBase):

0 commit comments

Comments
 (0)