Skip to content

Commit bed1ab0

Browse files
author
A. Jesse Jiryu Davis
committed
Unpin member when read preference changes PYTHON-431
1 parent 554d1d9 commit bed1ab0

File tree

3 files changed

+88
-19
lines changed

3 files changed

+88
-19
lines changed

pymongo/mongo_replica_set_client.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -770,18 +770,24 @@ def __update_pools(self):
770770
def __schedule_refresh(self):
771771
self.__monitor.schedule_refresh()
772772

773-
def __pin_host(self, host):
773+
def __pin_host(self, host, mode, tag_sets, latency):
774774
# After first successful read in a request, continue reading from same
775775
# member until read preferences change, host goes down, or
776776
# end_request(). This offers a small assurance that reads won't jump
777777
# around in time.
778778
self.__threadlocal.host = host
779+
self.__threadlocal.read_preference = (mode, tag_sets, latency)
780+
781+
def __keep_pinned_host(self, mode, tag_sets, latency):
782+
# If read preferences have changed, return False
783+
return getattr(self.__threadlocal, 'read_preference', None) == (
784+
mode, tag_sets, latency)
779785

780786
def __pinned_host(self):
781787
return getattr(self.__threadlocal, 'host', None)
782788

783789
def __unpin_host(self):
784-
self.__threadlocal.host = None
790+
self.__threadlocal.host = self.__threadlocal.read_preference = None
785791

786792
def __reset_pinned_hosts(self):
787793
if self.__use_greenlets:
@@ -1158,7 +1164,7 @@ def _send_message_with_response(self, msg, _connection_to_use=None,
11581164
mode = ReadPreference.PRIMARY
11591165
tag_sets = [{}]
11601166

1161-
secondary_acceptable_latency_ms = kwargs.get(
1167+
latency = kwargs.get(
11621168
'secondary_acceptable_latency_ms',
11631169
self.secondary_acceptable_latency_ms)
11641170

@@ -1176,12 +1182,18 @@ def _send_message_with_response(self, msg, _connection_to_use=None,
11761182
self.disconnect()
11771183
raise
11781184

1185+
# To provide some monotonic consistency, we use the same member as
1186+
# long as this thread is in a request and all reads use the same
1187+
# mode, tags, and latency. The member gets unpinned if pref changes,
1188+
# if member changes state, if we detect a failover and call
1189+
# __reset_pinned_hosts(), or if this thread calls end_request().
11791190
errors = []
11801191
pinned_member = self.__members.get(self.__pinned_host())
11811192
if (pinned_member
11821193
and pinned_member.matches_mode(mode)
11831194
and pinned_member.matches_tag_sets(tag_sets)
11841195
and pinned_member.up
1196+
and self.__keep_pinned_host(mode, tag_sets, latency)
11851197
):
11861198
try:
11871199
return (
@@ -1204,7 +1216,7 @@ def _send_message_with_response(self, msg, _connection_to_use=None,
12041216
members=members,
12051217
mode=mode,
12061218
tag_sets=tag_sets,
1207-
latency=secondary_acceptable_latency_ms)
1219+
latency=latency)
12081220

12091221
if not member:
12101222
# Ran out of members to try
@@ -1218,7 +1230,8 @@ def _send_message_with_response(self, msg, _connection_to_use=None,
12181230
# Success
12191231
if self.in_request():
12201232
# Keep reading from this member in this thread / greenlet
1221-
self.__pin_host(member.host)
1233+
# unless read preference changes
1234+
self.__pin_host(member.host, mode, tag_sets, latency)
12221235
return member.host, response
12231236
except AutoReconnect, why:
12241237
errors.append(str(why))

test/high_availability/test_ha.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
# a replica set. Thus each method asserts everything we want to assert for a
2020
# given replica-set configuration.
2121

22+
import itertools
2223
import time
2324
import unittest
2425

@@ -28,10 +29,12 @@
2829

2930
from pymongo import (ReplicaSetConnection,
3031
ReadPreference)
31-
from pymongo.mongo_replica_set_client import Member, Monitor
32+
from pymongo.mongo_replica_set_client import (
33+
Member, Monitor, MongoReplicaSetClient)
3234
from pymongo.mongo_client import _partition_node
3335
from pymongo.connection import Connection
3436
from pymongo.errors import AutoReconnect, OperationFailure, ConnectionFailure
37+
from pymongo.read_preferences import modes
3538

3639
from test import utils
3740
from test.utils import one
@@ -726,6 +729,63 @@ def unpartition_node(node):
726729

727730
self.clear_ping_times()
728731

732+
def test_pinning(self):
733+
# To make the code terser, copy modes into local scope
734+
PRIMARY = ReadPreference.PRIMARY
735+
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
736+
SECONDARY = ReadPreference.SECONDARY
737+
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
738+
NEAREST = ReadPreference.NEAREST
739+
740+
c = MongoReplicaSetClient(
741+
self.seed, replicaSet=self.name, use_greenlets=use_greenlets,
742+
auto_start_request=True)
743+
744+
# Verify that changing the mode unpins the member. We'll try it for
745+
# every relevant change of mode.
746+
for mode0, mode1 in itertools.permutations(
747+
(PRIMARY, SECONDARY, SECONDARY_PREFERRED, NEAREST), 2
748+
):
749+
# Try reading and then changing modes and reading again, see if we
750+
# read from a different host
751+
for _ in range(1000):
752+
# pin to this host
753+
host = utils.read_from_which_host(c, mode0)
754+
# unpin?
755+
new_host = utils.read_from_which_host(c, mode1)
756+
if host != new_host:
757+
# Reading with a different mode unpinned, hooray!
758+
break
759+
else:
760+
self.fail(
761+
"Changing from mode %s to mode %s never unpinned" % (
762+
modes[mode0], modes[mode1]))
763+
764+
# Now verify changing the tag_sets unpins the member.
765+
tags0 = [{'a': 'a'}, {}]
766+
tags1 = [{'a': 'x'}, {}]
767+
for _ in range(10000):
768+
host = utils.read_from_which_host(c, NEAREST, tags0)
769+
new_host = utils.read_from_which_host(c, NEAREST, tags1)
770+
if host != new_host:
771+
break
772+
else:
773+
self.fail(
774+
"Changing from tags %s to tags %s never unpinned" % (
775+
tags0, tags1))
776+
777+
# Finally, verify changing the secondary_acceptable_latency_ms unpins
778+
# the member.
779+
for _ in range(10000):
780+
host = utils.read_from_which_host(c, SECONDARY, None, 15)
781+
new_host = utils.read_from_which_host(c, SECONDARY, None, 20)
782+
if host != new_host:
783+
break
784+
else:
785+
self.fail(
786+
"Changing secondary_acceptable_latency_ms from 15 to 20"
787+
" never unpinned")
788+
729789
def tearDown(self):
730790
self.c.close()
731791
ha_tools.kill_all_members()

test/test_replica_set_connection.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -905,23 +905,19 @@ def test_pinned_member(self):
905905
self.assertTrue(host in conn.secondaries)
906906
assertReadFrom(self, conn, host, ReadPreference.SECONDARY)
907907

908-
# Oddly, we expect PRIMARY_PREFERRED to keep reading from secondary,
909-
# since the secondary is pinned and "matches" the preference.
910-
assertReadFrom(self, conn, host, ReadPreference.PRIMARY_PREFERRED)
911-
912-
# Repin
913-
primary = read_from_which_host(conn, ReadPreference.PRIMARY)
914-
self.assertEqual(conn.primary, primary)
915-
assertReadFrom(self, conn, primary, ReadPreference.NEAREST)
908+
# Changing any part of read preference (mode, tag_sets, latency)
909+
# unpins the current host and pins to a new one
910+
primary = conn.primary
916911
assertReadFrom(self, conn, primary, ReadPreference.PRIMARY_PREFERRED)
917912

918-
# Since the we're pinned to primary we still use it
919-
assertReadFrom(self, conn, primary, ReadPreference.SECONDARY_PREFERRED)
913+
host = read_from_which_host(conn, ReadPreference.NEAREST)
914+
assertReadFrom(self, conn, host, ReadPreference.NEAREST)
920915

921-
# Repin again
922-
host = read_from_which_host(conn, ReadPreference.SECONDARY)
916+
assertReadFrom(self, conn, primary, ReadPreference.PRIMARY_PREFERRED)
917+
918+
host = read_from_which_host(conn, ReadPreference.SECONDARY_PREFERRED)
923919
self.assertTrue(host in conn.secondaries)
924-
assertReadFrom(self, conn, host, ReadPreference.SECONDARY)
920+
assertReadFrom(self, conn, host, ReadPreference.SECONDARY_PREFERRED)
925921

926922
# Unpin
927923
conn.end_request()

0 commit comments

Comments
 (0)