Skip to content

Commit 6b3f522

Browse files
committed
Replica set testing scripts PYTHON-196
1 parent 8a30993 commit 6b3f522

File tree

2 files changed

+371
-0
lines changed

2 files changed

+371
-0
lines changed

test/replica/replset_tools.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import os
2+
import random
3+
import shutil
4+
import signal
5+
import socket
6+
import subprocess
7+
import sys
8+
import time
9+
10+
import pymongo
11+
12+
home = os.environ.get('HOME')
13+
default_dbpath = os.path.join(home, 'data', 'pymongo_replica_set')
14+
dbpath = os.environ.get('DBPATH', default_dbpath)
15+
default_logpath = os.path.join(home, 'log', 'pymongo_replica_set')
16+
logpath = os.environ.get('LOGPATH', default_logpath)
17+
hostname = socket.gethostname()
18+
port = int(os.environ.get('DBPORT', 27017))
19+
mongod = os.environ.get('MONGOD', 'mongod')
20+
set_name = os.environ.get('SETNAME', 'repl0')
21+
22+
nodes = {}
23+
24+
def kill_members(members, sig):
25+
for member in members:
26+
try:
27+
proc = nodes[member]['proc']
28+
# Not sure if cygwin makes sense here...
29+
if sys.platform in ('win32', 'cygwin'):
30+
os.kill(proc.pid, signal.CTRL_C_EVENT)
31+
else:
32+
os.kill(proc.pid, sig)
33+
except OSError:
34+
pass # already dead
35+
36+
def kill_all_members():
37+
kill_members(nodes.keys(), 2)
38+
39+
def wait_for(proc, port):
40+
trys = 0
41+
while proc.poll() is None and trys < 40: # ~10 seconds
42+
trys += 1
43+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
44+
try:
45+
try:
46+
s.connect((hostname, port))
47+
return True
48+
except (IOError, socket.error):
49+
time.sleep(0.25)
50+
finally:
51+
s.close()
52+
53+
kill_all_members()
54+
return False
55+
56+
def start_replica_set(num_members=3, with_arbiter=True, fresh=True):
57+
if fresh:
58+
if os.path.exists(dbpath):
59+
shutil.rmtree(dbpath)
60+
members = []
61+
for i in xrange(num_members):
62+
cur_port = port + i
63+
host = '%s:%d' % (hostname, cur_port)
64+
members.append({'_id': i, 'host': host})
65+
path = os.path.join(dbpath, 'db' + str(i))
66+
if not os.path.exists(path):
67+
os.makedirs(path)
68+
member_logpath = os.path.join(logpath, 'db' + str(i) + '.log')
69+
if not os.path.exists(os.path.dirname(member_logpath)):
70+
os.makedirs(os.path.dirname(member_logpath))
71+
cmd = [mongod,
72+
'--dbpath', path,
73+
'--port', str(cur_port),
74+
'--replSet', set_name,
75+
'--journal', '--oplogSize', '1',
76+
'--logpath', member_logpath]
77+
proc = subprocess.Popen(cmd,
78+
stdout=subprocess.PIPE,
79+
stderr=subprocess.STDOUT)
80+
nodes[host] = {'proc': proc, 'cmd': cmd}
81+
res = wait_for(proc, cur_port)
82+
if not res:
83+
return None
84+
if with_arbiter:
85+
members[-1]['arbiterOnly'] = True
86+
config = {'_id': set_name, 'members': members}
87+
primary = members[0]['host']
88+
c = pymongo.Connection(primary)
89+
try:
90+
c.admin.command('replSetInitiate', config)
91+
except:
92+
# Already initialized from a previous run?
93+
pass
94+
95+
# Wait for all members to come online
96+
expected_secondaries = num_members - 1
97+
expected_arbiters = 0
98+
if with_arbiter:
99+
expected_secondaries -= 1
100+
expected_arbiters = 1
101+
while True:
102+
time.sleep(2)
103+
try:
104+
if (len(get_primary()) == 1 and
105+
len(get_secondaries()) == expected_secondaries and
106+
len(get_arbiters()) == expected_arbiters):
107+
break
108+
except pymongo.errors.AutoReconnect:
109+
# Keep waiting
110+
pass
111+
return primary, set_name
112+
113+
def get_members_in_state(state):
114+
c = pymongo.Connection(nodes.keys(), slave_okay=True)
115+
status = c.admin.command('replSetGetStatus')
116+
members = status['members']
117+
return [k['name'] for k in members if k['state'] == state]
118+
119+
def get_primary():
120+
return get_members_in_state(1)
121+
122+
def get_random_secondary():
123+
secondaries = get_members_in_state(2)
124+
if len(secondaries):
125+
return [secondaries[random.randrange(0, len(secondaries))]]
126+
return secondaries
127+
128+
def get_secondaries():
129+
return get_members_in_state(2)
130+
131+
def get_arbiters():
132+
return get_members_in_state(7)
133+
134+
def kill_primary(sig=2):
135+
primary = get_primary()
136+
kill_members(primary, sig)
137+
return primary
138+
139+
def kill_secondary(sig=2):
140+
secondary = get_random_secondary()
141+
kill_members(secondary, sig)
142+
return secondary
143+
144+
def kill_all_secondaries(sig=2):
145+
secondaries = get_secondaries()
146+
kill_members(secondaries, sig)
147+
return secondaries
148+
149+
def stepdown_primary():
150+
primary = get_primary()
151+
if primary:
152+
c = pymongo.Connection(primary)
153+
c.admin.command('replSetStepDown', 10)
154+
155+
def restart_members(members):
156+
restarted = []
157+
for member in members:
158+
cmd = nodes[member]['cmd']
159+
proc = subprocess.Popen(cmd,
160+
stdout=subprocess.PIPE,
161+
stderr=subprocess.STDOUT)
162+
nodes[member]['proc'] = proc
163+
res = wait_for(proc, int(member.split(':')[1]))
164+
if res:
165+
restarted.append(member)
166+
return restarted
167+

test/replica/test_replica_set.py

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
import time
2+
import unittest
3+
4+
import replset_tools
5+
6+
from nose.plugins.skip import SkipTest
7+
8+
from pymongo import (Connection,
9+
ReplicaSetConnection,
10+
ReadPreference)
11+
from pymongo.errors import AutoReconnect
12+
13+
14+
class TestReadPreference(unittest.TestCase):
15+
16+
def setUp(self):
17+
res = replset_tools.start_replica_set(num_members=3,
18+
with_arbiter=True)
19+
self.seed, self.name = res
20+
21+
def test_read_preference(self):
22+
c = ReplicaSetConnection(self.seed, replicaSet=self.name)
23+
self.assertTrue(bool(len(c.secondaries)))
24+
db = c.pymongo_test
25+
db.test.remove({}, safe=True, w=len(c.secondaries))
26+
27+
# Force replication...
28+
db.test.insert({'foo': 'bar'}, safe=True, w=len(c.secondaries))
29+
30+
# Test PRIMARY
31+
for _ in xrange(10):
32+
cursor = db.test.find()
33+
cursor.next()
34+
self.assertEqual(cursor._Cursor__connection_id, c.primary)
35+
36+
# Test SECONDARY with a secondary
37+
db.read_preference = ReadPreference.SECONDARY
38+
for _ in xrange(10):
39+
cursor = db.test.find()
40+
cursor.next()
41+
self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
42+
43+
# Test SECONDARY_ONLY with a secondary
44+
db.read_preference = ReadPreference.SECONDARY_ONLY
45+
for _ in xrange(10):
46+
cursor = db.test.find()
47+
cursor.next()
48+
self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
49+
50+
# Test SECONDARY with no secondary
51+
killed = replset_tools.kill_all_secondaries()
52+
db.read_preference = ReadPreference.SECONDARY
53+
for _ in xrange(10):
54+
cursor = db.test.find()
55+
cursor.next()
56+
self.assertEqual(cursor._Cursor__connection_id, c.primary)
57+
58+
# Test SECONDARY_ONLY with no secondary
59+
db.read_preference = ReadPreference.SECONDARY_ONLY
60+
for _ in xrange(10):
61+
cursor = db.test.find()
62+
self.assertRaises(AutoReconnect, cursor.next)
63+
64+
def tearDown(self):
65+
replset_tools.kill_all_members()
66+
67+
class TestHealthMonitor(unittest.TestCase):
68+
69+
def setUp(self):
70+
res = replset_tools.start_replica_set(num_members=3,
71+
with_arbiter=False)
72+
self.seed, self.name = res
73+
74+
def test_primary_failure(self):
75+
c = ReplicaSetConnection(self.seed, replicaSet=self.name)
76+
self.assertTrue(bool(len(c.secondaries)))
77+
primary = c.primary
78+
secondaries = c.secondaries
79+
80+
def primary_changed():
81+
for i in xrange(20):
82+
if c.primary != primary:
83+
return True
84+
time.sleep(1)
85+
return False
86+
87+
killed = replset_tools.kill_primary()
88+
self.assertTrue(bool(len(killed)))
89+
self.assertTrue(primary_changed())
90+
self.assertTrue(secondaries != c.secondaries)
91+
92+
def test_secondary_failure(self):
93+
c = ReplicaSetConnection(self.seed, replicaSet=self.name)
94+
self.assertTrue(bool(len(c.secondaries)))
95+
primary = c.primary
96+
secondaries = c.secondaries
97+
98+
def readers_changed():
99+
for i in xrange(20):
100+
if c.secondaries != secondaries:
101+
return True
102+
time.sleep(1)
103+
return False
104+
105+
killed = replset_tools.kill_secondary()
106+
self.assertTrue(bool(len(killed)))
107+
self.assertEqual(primary, c.primary)
108+
self.assertTrue(readers_changed())
109+
secondaries = c.secondaries
110+
111+
replset_tools.restart_members(killed)
112+
self.assertEqual(primary, c.primary)
113+
self.assertTrue(readers_changed())
114+
115+
def test_primary_stepdown(self):
116+
c = ReplicaSetConnection(self.seed, replicaSet=self.name)
117+
self.assertTrue(bool(len(c.secondaries)))
118+
primary = c.primary
119+
secondaries = c.secondaries
120+
121+
def primary_changed():
122+
for i in xrange(20):
123+
if c.primary != primary:
124+
return True
125+
time.sleep(1)
126+
return False
127+
128+
replset_tools.stepdown_primary()
129+
self.assertTrue(primary_changed())
130+
self.assertTrue(secondaries != c.secondaries)
131+
132+
def tearDown(self):
133+
replset_tools.kill_all_members()
134+
135+
class TestWritesWithFailover(unittest.TestCase):
136+
137+
def setUp(self):
138+
res = replset_tools.start_replica_set(num_members=3,
139+
with_arbiter=False)
140+
self.seed, self.name = res
141+
142+
def test_writes_with_failover(self):
143+
c = ReplicaSetConnection(self.seed, replicaSet=self.name)
144+
primary = c.primary
145+
db = c.pymongo_test
146+
db.test.remove({}, safe=True, w=len(c.secondaries))
147+
db.test.insert({'foo': 'bar'}, safe=True, w=len(c.secondaries))
148+
self.assertEqual('bar', db.test.find_one()['foo'])
149+
150+
def try_write():
151+
for i in xrange(20):
152+
try:
153+
db.test.insert({'bar': 'baz'}, safe=True)
154+
return True
155+
except AutoReconnect:
156+
time.sleep(1)
157+
return False
158+
159+
killed = replset_tools.kill_primary(9)
160+
self.assertTrue(bool(len(killed)))
161+
self.assertTrue(try_write())
162+
self.assertTrue(primary != c.primary)
163+
self.assertEqual('baz', db.test.find_one({'bar': 'baz'})['bar'])
164+
165+
def tearDown(self):
166+
replset_tools.kill_all_members()
167+
168+
class TestReadWithFailover(unittest.TestCase):
169+
170+
def setUp(self):
171+
res = replset_tools.start_replica_set(num_members=3,
172+
with_arbiter=False)
173+
self.seed, self.name = res
174+
175+
def test_read_with_failover(self):
176+
c = ReplicaSetConnection(self.seed, replicaSet=self.name)
177+
self.assertTrue(bool(len(c.secondaries)))
178+
179+
def iter_cursor(cursor):
180+
for doc in cursor:
181+
pass
182+
return True
183+
184+
db = c.pymongo_test
185+
db.test.remove({}, safe=True, w=len(c.secondaries))
186+
# Force replication
187+
db.test.insert([{'foo': i} for i in xrange(10)],
188+
safe=True, w=len(c.secondaries))
189+
self.assertEqual(10, db.test.count())
190+
191+
db.read_preference = ReadPreference.SECONDARY
192+
cursor = db.test.find().batch_size(5)
193+
cursor.next()
194+
self.assertEqual(5, cursor._Cursor__retrieved)
195+
killed = replset_tools.kill_primary()
196+
# Primary failure shouldn't interrupt the cursor
197+
self.assertTrue(iter_cursor(cursor))
198+
self.assertEqual(10, cursor._Cursor__retrieved)
199+
200+
def tearDown(self):
201+
replset_tools.kill_all_members()
202+
203+
if __name__ == '__main__':
204+
unittest.main()

0 commit comments

Comments
 (0)