Skip to content

Commit 65a41da

Browse files
author
Mike Dirolf
committed
always use master for commands in MasterSlaveConnection instances
1 parent 6461bdc commit 65a41da

File tree

6 files changed

+41
-12
lines changed

6 files changed

+41
-12
lines changed

pymongo/collection.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ def remove(self, spec_or_object_id):
261261
self._send_message(2006, _ZERO + bson.BSON.from_dict(spec))
262262

263263
def find_one(self, spec_or_object_id=None, fields=None, slave_okay=None,
264-
_sock=None):
264+
_sock=None, _must_use_master=False):
265265
"""Get a single object from the database.
266266
267267
Raises TypeError if the argument is of an improper type. Returns a
@@ -286,7 +286,8 @@ def find_one(self, spec_or_object_id=None, fields=None, slave_okay=None,
286286
spec = SON({"_id": spec})
287287

288288
for result in self.find(spec, limit=-1, fields=fields,
289-
slave_okay=slave_okay, _sock=_sock):
289+
slave_okay=slave_okay, _sock=_sock,
290+
_must_use_master=_must_use_master):
290291
return result
291292
return None
292293

@@ -309,7 +310,7 @@ def _fields_list_to_dict(self, fields):
309310

310311
def find(self, spec=None, fields=None, skip=0, limit=0,
311312
slave_okay=None, timeout=True, snapshot=False,
312-
_sock=None):
313+
_sock=None, _must_use_master=False):
313314
"""Query the database.
314315
315316
The `spec` argument is a prototype document that all results must
@@ -376,7 +377,7 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
376377
fields = self._fields_list_to_dict(fields)
377378

378379
return Cursor(self, spec, fields, skip, limit, slave_okay, timeout, snapshot,
379-
_sock=_sock)
380+
_sock=_sock, _must_use_master=_must_use_master)
380381

381382
def count(self):
382383
"""Get the number of documents in this collection.

pymongo/connection.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,9 @@ def receive(length):
509509

510510
__hack_socket_lock = threading.Lock()
511511

512-
def _receive_message(self, operation, data, _sock=None):
512+
# we just ignore _must_use_master here: it's only relavant for
513+
# MasterSlaveConnection instances.
514+
def _receive_message(self, operation, data, _sock=None, _must_use_master=False):
513515
"""Receive a message from Mongo.
514516
515517
Sends the given message and returns the response.

pymongo/cursor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class Cursor(object):
3737
"""
3838

3939
def __init__(self, collection, spec, fields, skip, limit, slave_okay,
40-
timeout, snapshot=False, _sock=None):
40+
timeout, snapshot=False, _sock=None, _must_use_master=False):
4141
"""Create a new cursor.
4242
4343
Should not be called directly by application developers.
@@ -54,6 +54,7 @@ def __init__(self, collection, spec, fields, skip, limit, slave_okay,
5454
self.__explain = False
5555
self.__hint = None
5656
self.__socket = _sock
57+
self.__must_use_master = _must_use_master
5758

5859
self.__data = []
5960
self.__id = None
@@ -290,7 +291,8 @@ def _refresh(self):
290291

291292
def send_message(operation, message):
292293
db = self.__collection.database()
293-
kwargs = {"_sock": self.__socket}
294+
kwargs = {"_sock": self.__socket,
295+
"_must_use_master": self.__must_use_master}
294296
if self.__connection_id is not None:
295297
kwargs["_connection_to_use"] = self.__connection_id
296298

pymongo/database.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,9 @@ def _fix_outgoing(self, son, collection):
182182
def _command(self, command, allowable_errors=[], check=True, sock=None):
183183
"""Issue a DB command.
184184
"""
185-
result = self["$cmd"].find_one(command, _sock=sock)
185+
result = self["$cmd"].find_one(command, _sock=sock,
186+
_must_use_master=True)
187+
186188
if check and result["ok"] != 1:
187189
if result["errmsg"] in allowable_errors:
188190
return result
@@ -193,7 +195,7 @@ def _command(self, command, allowable_errors=[], check=True, sock=None):
193195
def collection_names(self):
194196
"""Get a list of all the collection names in this database.
195197
"""
196-
results = self["system.namespaces"].find()
198+
results = self["system.namespaces"].find(_must_use_master=True)
197199
names = [r["name"] for r in results]
198200
names = [n[len(self.__name) + 1:] for n in names
199201
if n.startswith(self.__name + ".")]

pymongo/master_slave_connection.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def _send_message(self, operation, data, _connection_to_use=None):
114114
# that getmore operations can be sent to the same instance on which
115115
# the cursor actually resides...
116116
def _receive_message(self, operation, data,
117-
_sock=None, _connection_to_use=None):
117+
_sock=None, _connection_to_use=None, _must_use_master=False):
118118
"""Receive a message from Mongo.
119119
120120
Sends the given message and returns a (connection_id, response) pair.
@@ -135,7 +135,10 @@ def _receive_message(self, operation, data,
135135
# for now just load-balance randomly among slaves only...
136136
connection_id = random.randrange(0, len(self.__slaves))
137137

138-
if self.__in_request or connection_id == -1:
138+
# _must_use_master is set for commands, which must be sent to the
139+
# master instance. any queries in a request must be sent to the
140+
# master since that is where writes go.
141+
if _must_use_master or self.__in_request or connection_id == -1:
139142
return (-1, self.__master._receive_message(operation, data, _sock))
140143

141144
return (connection_id,

test/test_master_slave_connection.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
from nose.plugins.skip import SkipTest
2424

2525
from pymongo.errors import ConnectionFailure, InvalidName
26+
from pymongo.errors import CollectionInvalid, OperationFailure
2627
from pymongo.database import Database
2728
from pymongo.connection import Connection
29+
from pymongo.collection import Collection
2830
from pymongo.master_slave_connection import MasterSlaveConnection
2931

3032

@@ -145,6 +147,23 @@ def test_insert_find_one_no_slaves(self):
145147
count += 1
146148
self.failIf(count)
147149

150+
# This was failing because commands were being sent to the slaves
151+
def test_create_collection(self):
152+
self.connection.drop_database('pymongo_test')
153+
154+
collection = self.db.create_collection('test')
155+
self.assert_(isinstance(collection, Collection))
156+
157+
self.assertRaises(CollectionInvalid, self.db.create_collection, 'test')
158+
159+
# Believe this was failing for the same reason...
160+
def test_unique_index(self):
161+
self.connection.drop_database('pymongo_test')
162+
self.db.test.create_index('username', unique=True)
163+
164+
self.db.test.save({'username': 'mike'}, safe=True)
165+
self.assertRaises(OperationFailure, self.db.test.save, {'username': 'mike'}, safe=True)
166+
148167
# NOTE this test is non-deterministic, but I expect
149168
# some failures unless the db is pulling instantaneously...
150169
def test_insert_find_one_with_slaves(self):
@@ -170,7 +189,7 @@ def test_insert_find_one_with_pause(self):
170189

171190
self.db.test.remove({})
172191
self.db.test.insert({"x": 5586})
173-
time.sleep(6)
192+
time.sleep(7)
174193
for _ in range(10):
175194
try:
176195
if 5586 != self.db.test.find_one()["x"]:

0 commit comments

Comments
 (0)