Skip to content

Commit d96a44b

Browse files
committed
Ensure the driver can connect when invalid peer hosts are in system.peers
1 parent 6ca04f9 commit d96a44b

File tree

3 files changed

+71
-23
lines changed

3 files changed

+71
-23
lines changed

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
======
33
Not released
44

5+
Features
6+
--------
7+
* Ensure the driver can connect when invalid peer hosts are in system.peers (PYTHON-1260)
8+
59
Others
610
------
711
* Drop Python 3.4 support (PYTHON-1220)

cassandra/cluster.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3788,12 +3788,14 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
37883788
# any new nodes, so we need this additional check. (See PYTHON-90)
37893789
should_rebuild_token_map = force_token_rebuild or self._cluster.metadata.partitioner is None
37903790
for row in peers_result:
3791+
if not self._is_valid_peer(row):
3792+
log.warning(
3793+
"Found an invalid row for peer (%s). Ignoring host." %
3794+
_NodeInfo.get_broadcast_rpc_address(row))
3795+
continue
3796+
37913797
endpoint = self._cluster.endpoint_factory.create(row)
37923798

3793-
tokens = row.get("tokens", None)
3794-
if 'tokens' in row and not tokens: # it was selected, but empty
3795-
log.warning("Excluding host (%s) with no tokens in system.peers table of %s." % (endpoint, connection.endpoint))
3796-
continue
37973799
if endpoint in found_hosts:
37983800
log.warning("Found multiple hosts with the same endpoint (%s). Excluding peer %s", endpoint, row.get("peer"))
37993801
continue
@@ -3820,6 +3822,7 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38203822
host.dse_workload = row.get("workload")
38213823
host.dse_workloads = row.get("workloads")
38223824

3825+
tokens = row.get("tokens", None)
38233826
if partitioner and tokens and self._token_meta_enabled:
38243827
token_map[host] = tokens
38253828

@@ -3834,6 +3837,12 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38343837
log.debug("[control connection] Rebuilding token map due to topology changes")
38353838
self._cluster.metadata.rebuild_token_map(partitioner, token_map)
38363839

3840+
@staticmethod
3841+
def _is_valid_peer(row):
3842+
return bool(_NodeInfo.get_broadcast_rpc_address(row) and row.get("host_id") and
3843+
row.get("data_center") and row.get("rack") and
3844+
('tokens' not in row or row.get('tokens')))
3845+
38373846
def _update_location_info(self, host, datacenter, rack):
38383847
if host.datacenter == datacenter and host.rack == rack:
38393848
return False

tests/unit/test_control_connection.py

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ def __init__(self):
127127
]
128128

129129
self.peer_results = [
130-
["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens"],
131-
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"]],
132-
["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"]]]
130+
["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"],
131+
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], "uuid1"],
132+
["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"], "uuid2"]]
133133
]
134134

135135
self.peer_results_v2 = [
136-
["native_address", "native_port", "peer", "peer_port", "schema_version", "data_center", "rack", "tokens"],
137-
[["192.168.1.1", 9042, "10.0.0.1", 7042, "a", "dc1", "rack1", ["1", "101", "201"]],
138-
["192.168.1.2", 9042, "10.0.0.2", 7040, "a", "dc1", "rack1", ["2", "102", "202"]]]
136+
["native_address", "native_port", "peer", "peer_port", "schema_version", "data_center", "rack", "tokens", "host_id"],
137+
[["192.168.1.1", 9042, "10.0.0.1", 7042, "a", "dc1", "rack1", ["1", "101", "201"], "uuid1"],
138+
["192.168.1.2", 9042, "10.0.0.2", 7040, "a", "dc1", "rack1", ["2", "102", "202"], "uuid2"]]
139139
]
140140
self.wait_for_responses = Mock(return_value=_node_meta_results(self.local_results, self.peer_results))
141141

@@ -155,18 +155,18 @@ def sleep(self, amount):
155155
class ControlConnectionTest(unittest.TestCase):
156156

157157
_matching_schema_preloaded_results = _node_meta_results(
158-
local_results=(["schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens"],
159-
[["a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"]]]),
160-
peer_results=(["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens"],
161-
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"]],
162-
["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"]]]))
158+
local_results=(["schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens", "host_id"],
159+
[["a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"], "uuid1"]]),
160+
peer_results=(["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"],
161+
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], "uuid2"],
162+
["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"], "uuid3"]]))
163163

164164
_nonmatching_schema_preloaded_results = _node_meta_results(
165-
local_results=(["schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens"],
166-
[["a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"]]]),
167-
peer_results=(["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens"],
168-
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"]],
169-
["192.168.1.2", "10.0.0.2", "b", "dc1", "rack1", ["2", "102", "202"]]]))
165+
local_results=(["schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens", "host_id"],
166+
[["a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"], "uuid1"]]),
167+
peer_results=(["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"],
168+
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], "uuid2"],
169+
["192.168.1.2", "10.0.0.2", "b", "dc1", "rack1", ["2", "102", "202"], "uuid3"]]))
170170

171171
def setUp(self):
172172
self.cluster = MockCluster()
@@ -275,6 +275,40 @@ def test_refresh_nodes_and_tokens(self):
275275

276276
self.assertEqual(self.connection.wait_for_responses.call_count, 1)
277277

278+
def test_refresh_nodes_and_tokens_with_invalid_peers(self):
279+
def refresh_and_validate_added_hosts():
280+
self.connection.wait_for_responses = Mock(return_value=_node_meta_results(
281+
self.connection.local_results, self.connection.peer_results))
282+
self.control_connection.refresh_node_list_and_token_map()
283+
self.assertEqual(1, len(self.cluster.added_hosts)) # only one valid peer found
284+
285+
# peersV1
286+
del self.connection.peer_results[:]
287+
self.connection.peer_results.extend([
288+
["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"],
289+
[["192.168.1.3", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], 'uuid5'],
290+
# all others are invalid
291+
[None, None, "a", "dc1", "rack1", ["1", "101", "201"], 'uuid1'],
292+
["192.168.1.7", "10.0.0.1", "a", None, "rack1", ["1", "101", "201"], 'uuid2'],
293+
["192.168.1.6", "10.0.0.1", "a", "dc1", None, ["1", "101", "201"], 'uuid3'],
294+
["192.168.1.5", "10.0.0.1", "a", "dc1", "rack1", None, 'uuid4'],
295+
["192.168.1.4", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], None]]])
296+
refresh_and_validate_added_hosts()
297+
298+
# peersV2
299+
del self.cluster.added_hosts[:]
300+
del self.connection.peer_results[:]
301+
self.connection.peer_results.extend([
302+
["native_address", "native_port", "peer", "peer_port", "schema_version", "data_center", "rack", "tokens", "host_id"],
303+
[["192.168.1.4", 9042, "10.0.0.1", 7042, "a", "dc1", "rack1", ["1", "101", "201"], "uuid1"],
304+
# all others are invalid
305+
[None, 9042, None, 7040, "a", "dc1", "rack1", ["2", "102", "202"], "uuid2"],
306+
["192.168.1.5", 9042, "10.0.0.2", 7040, "a", None, "rack1", ["2", "102", "202"], "uuid2"],
307+
["192.168.1.5", 9042, "10.0.0.2", 7040, "a", "dc1", None, ["2", "102", "202"], "uuid2"],
308+
["192.168.1.5", 9042, "10.0.0.2", 7040, "a", "dc1", "rack1", None, "uuid2"],
309+
["192.168.1.5", 9042, "10.0.0.2", 7040, "a", "dc1", "rack1", ["2", "102", "202"], None]]])
310+
refresh_and_validate_added_hosts()
311+
278312
def test_refresh_nodes_and_tokens_uses_preloaded_results_if_given(self):
279313
"""
280314
refresh_nodes_and_tokens uses preloaded results if given for shared table queries
@@ -311,14 +345,15 @@ def test_refresh_nodes_and_tokens_no_partitioner(self):
311345

312346
def test_refresh_nodes_and_tokens_add_host(self):
313347
self.connection.peer_results[1].append(
314-
["192.168.1.3", "10.0.0.3", "a", "dc1", "rack1", ["3", "103", "203"]]
348+
["192.168.1.3", "10.0.0.3", "a", "dc1", "rack1", ["3", "103", "203"], "uuid3"]
315349
)
316350
self.cluster.scheduler.schedule = lambda delay, f, *args, **kwargs: f(*args, **kwargs)
317351
self.control_connection.refresh_node_list_and_token_map()
318352
self.assertEqual(1, len(self.cluster.added_hosts))
319353
self.assertEqual(self.cluster.added_hosts[0].address, "192.168.1.3")
320354
self.assertEqual(self.cluster.added_hosts[0].datacenter, "dc1")
321355
self.assertEqual(self.cluster.added_hosts[0].rack, "rack1")
356+
self.assertEqual(self.cluster.added_hosts[0].host_id, "uuid3")
322357

323358
def test_refresh_nodes_and_tokens_remove_host(self):
324359
del self.connection.peer_results[1][1]
@@ -482,7 +517,7 @@ def test_refresh_nodes_and_tokens_add_host_detects_port(self):
482517
del self.connection.peer_results[:]
483518
self.connection.peer_results.extend(self.connection.peer_results_v2)
484519
self.connection.peer_results[1].append(
485-
["192.168.1.3", 555, "10.0.0.3", 666, "a", "dc1", "rack1", ["3", "103", "203"]]
520+
["192.168.1.3", 555, "10.0.0.3", 666, "a", "dc1", "rack1", ["3", "103", "203"], "uuid3"]
486521
)
487522
self.connection.wait_for_responses = Mock(return_value=_node_meta_results(
488523
self.connection.local_results, self.connection.peer_results))
@@ -502,7 +537,7 @@ def test_refresh_nodes_and_tokens_add_host_detects_invalid_port(self):
502537
del self.connection.peer_results[:]
503538
self.connection.peer_results.extend(self.connection.peer_results_v2)
504539
self.connection.peer_results[1].append(
505-
["192.168.1.3", -1, "10.0.0.3", 0, "a", "dc1", "rack1", ["3", "103", "203"]]
540+
["192.168.1.3", -1, "10.0.0.3", 0, "a", "dc1", "rack1", ["3", "103", "203"], "uuid3"]
506541
)
507542
self.connection.wait_for_responses = Mock(return_value=_node_meta_results(
508543
self.connection.local_results, self.connection.peer_results))

0 commit comments

Comments
 (0)