diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 3d5bb8a329af4..ecf50dc14634b 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -26,7 +26,7 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use node_primitives::Block; use node_runtime::RuntimeApi; use sc_service::{ - config::{Role, Configuration}, error::{Error as ServiceError}, + config::{Configuration}, error::{Error as ServiceError}, RpcHandlers, TaskManager, }; use sp_inherents::InherentDataProviders; @@ -258,21 +258,10 @@ pub fn new_full_base( } // Spawn authority discovery module. - if matches!(role, Role::Authority{..} | Role::Sentry {..}) { - let (sentries, authority_discovery_role) = match role { - sc_service::config::Role::Authority { ref sentry_nodes } => ( - sentry_nodes.clone(), - sc_authority_discovery::Role::Authority ( - keystore_container.keystore(), - ), - ), - sc_service::config::Role::Sentry {..} => ( - vec![], - sc_authority_discovery::Role::Sentry, - ), - _ => unreachable!("Due to outer matches! constraint; qed.") - }; - + if role.is_authority() { + let authority_discovery_role = sc_authority_discovery::Role::PublishAndDiscover( + keystore_container.keystore(), + ); let dht_event_stream = network.event_stream("authority-discovery") .filter_map(|e| async move { match e { Event::Dht(e) => Some(e), @@ -281,7 +270,6 @@ pub fn new_full_base( let (authority_discovery_worker, _service) = sc_authority_discovery::new_worker_and_service( client.clone(), network.clone(), - sentries, Box::pin(dht_event_stream), authority_discovery_role, prometheus_registry.clone(), diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs index 42cf120d70f8a..2d789d1e6a083 100644 --- a/client/authority-discovery/src/lib.rs +++ b/client/authority-discovery/src/lib.rs @@ -32,7 +32,7 @@ use futures::channel::{mpsc, oneshot}; use futures::Stream; use sc_client_api::blockchain::HeaderBackend; -use sc_network::{config::MultiaddrWithPeerId, DhtEvent, Multiaddr, PeerId}; +use sc_network::{DhtEvent, Multiaddr, PeerId}; use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId}; use sp_runtime::traits::Block as BlockT; use sp_api::ProvideRuntimeApi; @@ -44,10 +44,11 @@ mod tests; mod worker; /// Create a new authority discovery [`Worker`] and [`Service`]. +/// +/// See the struct documentation of each for more details. pub fn new_worker_and_service( client: Arc, network: Arc, - sentry_nodes: Vec, dht_event_rx: DhtEventStream, role: Role, prometheus_registry: Option, @@ -62,7 +63,7 @@ where let (to_worker, from_service) = mpsc::channel(0); let worker = Worker::new( - from_service, client, network, sentry_nodes, dht_event_rx, role, prometheus_registry, + from_service, client, network, dht_event_rx, role, prometheus_registry, ); let service = Service::new(to_worker); diff --git a/client/authority-discovery/src/service.rs b/client/authority-discovery/src/service.rs index ed0205d262fc6..7eabeb3daf52e 100644 --- a/client/authority-discovery/src/service.rs +++ b/client/authority-discovery/src/service.rs @@ -43,12 +43,12 @@ impl Service { /// Returns `None` if no entry was present or connection to the /// [`crate::Worker`] failed. /// - /// [`Multiaddr`]s returned always include a [`PeerId`] via a - /// [`libp2p::core::multiaddr:Protocol::P2p`] component. [`Multiaddr`]s - /// might differ in their [`PeerId`], e.g. when each [`Multiaddr`] - /// represents a different sentry node. This might change once support for - /// sentry nodes is removed (see - /// https://github.com/paritytech/substrate/issues/6845). + /// Note: [`Multiaddr`]s returned always include a [`PeerId`] via a + /// [`libp2p::core::multiaddr:Protocol::P2p`] component. Equality of + /// [`PeerId`]s across [`Multiaddr`]s returned by a single call is not + /// enforced today, given that there are still authorities out there + /// publishing the addresses of their sentry nodes on the DHT. In the future + /// this guarantee can be provided. pub async fn get_addresses_by_authority_id(&mut self, authority: AuthorityId) -> Option> { let (tx, rx) = oneshot::channel(); diff --git a/client/authority-discovery/src/tests.rs b/client/authority-discovery/src/tests.rs index 88aad0af0696b..414ffc1e3f394 100644 --- a/client/authority-discovery/src/tests.rs +++ b/client/authority-discovery/src/tests.rs @@ -55,9 +55,8 @@ fn get_addresses_and_authority_id() { let (mut worker, mut service) = new_worker_and_service( test_api, network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(key_store.into()), + Role::PublishAndDiscover(key_store.into()), None, ); worker.inject_addresses(remote_authority_id.clone(), vec![remote_addr.clone()]); diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs index f204b3adf9bb5..1a0a59f8c49ff 100644 --- a/client/authority-discovery/src/worker.rs +++ b/client/authority-discovery/src/worker.rs @@ -29,7 +29,6 @@ use futures_timer::Delay; use addr_cache::AddrCache; use async_trait::async_trait; use codec::Decode; -use either::Either; use libp2p::{core::multiaddr, multihash::Multihash}; use log::{debug, error, log_enabled}; use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register}; @@ -37,7 +36,6 @@ use prost::Message; use rand::{seq::SliceRandom, thread_rng}; use sc_client_api::blockchain::HeaderBackend; use sc_network::{ - config::MultiaddrWithPeerId, DhtEvent, ExHashT, Multiaddr, @@ -73,68 +71,47 @@ const MAX_ADDRESSES_PER_AUTHORITY: usize = 10; /// Maximum number of in-flight DHT lookups at any given point in time. const MAX_IN_FLIGHT_LOOKUPS: usize = 8; -/// Role an authority discovery module can run as. +/// Role an authority discovery [`Worker`] can run as. pub enum Role { - /// Actual authority as well as a reference to its key store. - Authority(Arc), - /// Sentry node that guards an authority. - /// - /// No reference to its key store needed, as sentry nodes don't have an identity to sign - /// addresses with in the first place. - Sentry, + /// Publish own addresses and discover addresses of others. + PublishAndDiscover(Arc), + /// Discover addresses of others. + Discover, } -/// A [`Worker`] makes a given authority discoverable and discovers other -/// authorities. -/// -/// The [`Worker`] implements the Future trait. By -/// polling [`Worker`] an authority: + +/// An authority discovery [`Worker`] can publish the local node's addresses as well as discover +/// those of other nodes via a Kademlia DHT. /// -/// 1. **Makes itself discoverable** +/// When constructed with [`Role::PublishAndDiscover`] a [`Worker`] will /// -/// 1. Retrieves its external addresses (including peer id) or the ones of -/// its sentry nodes. +/// 1. Retrieve its external addresses (including peer id). /// -/// 2. Signs the above. +/// 2. Get the list of keys owned by the local node participating in the current authority set. /// -/// 3. Puts the signature and the addresses on the libp2p Kademlia DHT. +/// 3. Sign the addresses with the keys. /// +/// 4. Put addresses and signature as a record with the authority id as a key on a Kademlia DHT. /// -/// 2. **Discovers other authorities** +/// When constructed with either [`Role::PublishAndDiscover`] or [`Role::Publish`] a [`Worker`] will /// -/// 1. Retrieves the current and next set of authorities. +/// 1. Retrieve the current and next set of authorities. /// -/// 2. Starts DHT queries for the ids of the authorities. +/// 2. Start DHT queries for the ids of the authorities. /// -/// 3. Validates the signatures of the retrieved key value pairs. +/// 3. Validate the signatures of the retrieved key value pairs. /// -/// 4. Adds the retrieved external addresses as priority nodes to the -/// peerset. +/// 4. Add the retrieved external addresses as priority nodes to the +/// network peerset. /// -/// When run as a sentry node, the [`Worker`] does not publish -/// any addresses to the DHT but still discovers validators and sentry nodes of -/// validators, i.e. only step 2 (Discovers other authorities) is executed. -pub struct Worker -where - Block: BlockT + 'static, - Network: NetworkProvider, - Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, - >::Api: AuthorityDiscoveryApi, -{ - /// Channel receiver for messages send by an [`Service`]. +/// 5. Allow querying of the collected addresses via the [`crate::Service`]. +pub struct Worker { + /// Channel receiver for messages send by a [`Service`]. from_service: Fuse>, client: Arc, network: Arc, - /// List of sentry node public addresses. - // - // There are 3 states: - // - None: No addresses were specified. - // - Some(vec![]): Addresses were specified, but none could be parsed as proper - // Multiaddresses. - // - Some(vec![a, b, c, ...]): Valid addresses were specified. - sentry_nodes: Option>, /// Channel we receive Dht events on. dht_event_rx: DhtEventStream, @@ -169,15 +146,11 @@ where AuthorityDiscoveryApi, DhtEventStream: Stream + Unpin, { - /// Return a new [`Worker`]. - /// - /// Note: When specifying `sentry_nodes` this module will not advertise the public addresses of - /// the node itself but only the public addresses of its sentry nodes. + /// Construct a [`Worker`]. pub(crate) fn new( from_service: mpsc::Receiver, client: Arc, network: Arc, - sentry_nodes: Vec, dht_event_rx: DhtEventStream, role: Role, prometheus_registry: Option, @@ -207,12 +180,6 @@ where query_interval_duration, ); - let sentry_nodes = if !sentry_nodes.is_empty() { - Some(sentry_nodes.into_iter().map(|ma| ma.concat()).collect::>()) - } else { - None - }; - let addr_cache = AddrCache::new(); let metrics = match prometheus_registry { @@ -232,7 +199,6 @@ where from_service: from_service.fuse(), client, network, - sentry_nodes, dht_event_rx, publish_interval, query_interval, @@ -313,33 +279,23 @@ where } fn addresses_to_publish(&self) -> impl ExactSizeIterator { - match &self.sentry_nodes { - Some(addrs) => Either::Left(addrs.clone().into_iter()), - None => { - let peer_id: Multihash = self.network.local_peer_id().into(); - Either::Right( - self.network.external_addresses() - .into_iter() - .map(move |a| { - if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) { - a - } else { - a.with(multiaddr::Protocol::P2p(peer_id.clone())) - } - }), - ) - } - } + let peer_id: Multihash = self.network.local_peer_id().into(); + self.network.external_addresses() + .into_iter() + .map(move |a| { + if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) { + a + } else { + a.with(multiaddr::Protocol::P2p(peer_id.clone())) + } + }) } - /// Publish either our own or if specified the public addresses of our sentry nodes. + /// Publish own public addresses. async fn publish_ext_addresses(&mut self) -> Result<()> { let key_store = match &self.role { - Role::Authority(key_store) => key_store, - // Only authority nodes can put addresses (their own or the ones of their sentry nodes) - // on the Dht. Sentry nodes don't have a known identity to authenticate such addresses, - // thus `publish_ext_addresses` becomes a no-op. - Role::Sentry => return Ok(()), + Role::PublishAndDiscover(key_store) => key_store, + Role::Discover => return Ok(()), }; let addresses = self.addresses_to_publish(); @@ -394,12 +350,12 @@ where let id = BlockId::hash(self.client.info().best_hash); let local_keys = match &self.role { - Role::Authority(key_store) => { + Role::PublishAndDiscover(key_store) => { key_store.sr25519_public_keys( key_types::AUTHORITY_DISCOVERY ).await.into_iter().collect::>() }, - Role::Sentry => HashSet::new(), + Role::Discover => HashSet::new(), }; let mut authorities = self @@ -798,13 +754,7 @@ impl Metrics { // Helper functions for unit testing. #[cfg(test)] -impl Worker -where - Block: BlockT + 'static, - Network: NetworkProvider, - Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, - >::Api: AuthorityDiscoveryApi, -{ +impl Worker { pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec) { self.addr_cache.insert(authority, addresses); } diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs index 98177f45729db..ef78735a9b12b 100644 --- a/client/authority-discovery/src/worker/tests.rs +++ b/client/authority-discovery/src/worker/tests.rs @@ -303,9 +303,8 @@ fn new_registers_metrics() { from_service, test_api, network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(key_store.into()), + Role::PublishAndDiscover(key_store.into()), Some(registry.clone()), ); @@ -332,9 +331,8 @@ fn triggers_dht_get_query() { from_service, test_api, network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(key_store.into()), + Role::PublishAndDiscover(key_store.into()), None, ); @@ -381,9 +379,8 @@ fn publish_discover_cycle() { from_service, test_api, network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(key_store.into()), + Role::PublishAndDiscover(key_store.into()), None, ); @@ -412,9 +409,8 @@ fn publish_discover_cycle() { from_service, test_api, network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(key_store.into()), + Role::PublishAndDiscover(key_store.into()), None, ); @@ -442,6 +438,7 @@ fn publish_discover_cycle() { pool.run(); } + /// Don't terminate when sender side of service channel is dropped. Terminate when network event /// stream terminates. #[test] @@ -458,9 +455,8 @@ fn terminate_when_event_stream_terminates() { from_service, test_api, network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(key_store.into()), + Role::PublishAndDiscover(key_store.into()), None, ).run(); futures::pin_mut!(worker); @@ -485,7 +481,8 @@ fn terminate_when_event_stream_terminates() { "Expect the authority discovery module to terminate once the \ sending side of the dht event channel is closed.", ); - });} + }); +} #[test] fn dont_stop_polling_dht_event_stream_after_bogus_event() { @@ -520,9 +517,8 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { from_service, test_api, network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(Arc::new(key_store)), + Role::PublishAndDiscover(Arc::new(key_store)), None, ); @@ -569,79 +565,6 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { }); } -/// In the scenario of a validator publishing the address of its sentry node to -/// the DHT, said sentry node should not add its own Multiaddr to the -/// peerset "authority" priority group. -#[test] -fn never_add_own_address_to_priority_group() { - let validator_key_store = KeyStore::new(); - let validator_public = block_on(validator_key_store - .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)) - .unwrap(); - - let sentry_network: Arc = Arc::new(Default::default()); - - let sentry_multiaddr = { - let peer_id = sentry_network.local_peer_id(); - let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333".parse().unwrap(); - - address.with(multiaddr::Protocol::P2p(peer_id.into())) - }; - - // Address of some other sentry node of `validator`. - let random_multiaddr = { - let peer_id = PeerId::random(); - let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); - - address.with(multiaddr::Protocol::P2p( - peer_id.into(), - )) - }; - - let dht_event = block_on(build_dht_event( - vec![sentry_multiaddr, random_multiaddr.clone()], - validator_public.into(), - &validator_key_store, - )); - - let (_dht_event_tx, dht_event_rx) = channel(1); - let sentry_test_api = Arc::new(TestApi { - // Make sure the sentry node identifies its validator as an authority. - authorities: vec![validator_public.into()], - }); - - let (_to_worker, from_service) = mpsc::channel(0); - let mut sentry_worker = Worker::new( - from_service, - sentry_test_api, - sentry_network.clone(), - vec![], - Box::pin(dht_event_rx), - Role::Sentry, - None, - ); - - block_on(sentry_worker.refill_pending_lookups_queue()).unwrap(); - sentry_worker.start_new_lookups(); - - sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap(); - block_on(sentry_worker.set_priority_group()).unwrap(); - - assert_eq!( - sentry_network.set_priority_group_call.lock().unwrap().len(), 1, - "Expect authority discovery to set the priority set.", - ); - - assert_eq!( - sentry_network.set_priority_group_call.lock().unwrap()[0], - ( - "authorities".to_string(), - HashSet::from_iter(vec![random_multiaddr.clone()].into_iter(),) - ), - "Expect authority discovery to only add `random_multiaddr`." - ); -} - #[test] fn limit_number_of_addresses_added_to_cache_per_authority() { let remote_key_store = KeyStore::new(); @@ -670,9 +593,8 @@ fn limit_number_of_addresses_added_to_cache_per_authority() { from_service, Arc::new(TestApi { authorities: vec![remote_public.into()] }), Arc::new(TestNetwork::default()), - vec![], Box::pin(dht_event_rx), - Role::Sentry, + Role::Discover, None, ); @@ -713,7 +635,6 @@ fn do_not_cache_addresses_without_peer_id() { let (_dht_event_tx, dht_event_rx) = channel(1); let local_test_api = Arc::new(TestApi { - // Make sure the sentry node identifies its validator as an authority. authorities: vec![remote_public.into()], }); let local_network: Arc = Arc::new(Default::default()); @@ -724,9 +645,8 @@ fn do_not_cache_addresses_without_peer_id() { from_service, local_test_api, local_network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(Arc::new(local_key_store)), + Role::PublishAndDiscover(Arc::new(local_key_store)), None, ); @@ -759,9 +679,8 @@ fn addresses_to_publish_adds_p2p() { authorities: vec![], }), network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(Arc::new(KeyStore::new())), + Role::PublishAndDiscover(Arc::new(KeyStore::new())), Some(prometheus_endpoint::Registry::new()), ); @@ -794,9 +713,8 @@ fn addresses_to_publish_respects_existing_p2p_protocol() { authorities: vec![], }), network.clone(), - vec![], Box::pin(dht_event_rx), - Role::Authority(Arc::new(KeyStore::new())), + Role::PublishAndDiscover(Arc::new(KeyStore::new())), Some(prometheus_endpoint::Registry::new()), ); @@ -836,9 +754,8 @@ fn lookup_throttling() { from_service, Arc::new(TestApi { authorities: remote_public_keys.clone() }), network.clone(), - vec![], dht_event_rx.boxed(), - Role::Sentry, + Role::Discover, Some(default_registry().clone()), );