diff --git a/Cargo.lock b/Cargo.lock index 1b57cd34bbc21..29886a032feab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1653,12 +1653,7 @@ dependencies = [ [[package]] name = "linked-hash-map" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] -name = "linked-hash-map" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -1666,7 +1661,7 @@ name = "linked_hash_set" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1696,10 +1691,10 @@ dependencies = [ [[package]] name = "lru-cache" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3710,7 +3705,7 @@ dependencies = [ "kvdb-memorydb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", "kvdb-rocksdb 0.1.4 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 1.0.0", @@ -3952,10 +3947,10 @@ dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "fork-tree 1.0.0", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", - "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "linked_hash_set 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4037,16 +4032,12 @@ dependencies = [ name = "substrate-peerset" version = "1.0.0" dependencies = [ - "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -5264,13 +5255,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum libp2p-yamux 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbb8d08cb536a964727e77b868a026c6d92993f08e387d49163565575a478d9" "checksum librocksdb-sys 5.14.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b9024327233e7fac7982440f73301c00046d438c5b1011e8f4e394226ce19007" "checksum libsecp256k1 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "688e8d65e495567c2c35ea0001b26b9debf0b4ea11f8cccc954233b75fc3428a" -"checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939" -"checksum linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70fb39025bc7cdd76305867c4eccf2f2dcf6e9a57f5b21a93e1c2d86cd03ec9e" +"checksum linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ae91b68aebc4ddb91978b11a1b02ddd8602a05ec19002801c5666000e05e0f83" "checksum linked_hash_set 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7c91c4c7bbeb4f2f7c4e5be11e6a05bd6830bc37249c47ce1ad86ad453ff9c" "checksum lock_api 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c" "checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" "checksum log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c84ec4b527950aa83a329754b01dbe3f58361d1c5efacd1f6d68c494d08a17c6" -"checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21" +"checksum lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" "checksum make-cmd 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a8ca8afbe8af1785e09636acb5a41e08a765f5f0340568716c18a8700ba3c0d3" "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum memchr 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e1dd4eaac298c32ce07eb6ed9242eda7d82955b9170b7d6db59b2e02cc63fcb8" diff --git a/core/network-libp2p/src/behaviour.rs b/core/network-libp2p/src/behaviour.rs index f93665ce765ce..e00fba5219950 100644 --- a/core/network-libp2p/src/behaviour.rs +++ b/core/network-libp2p/src/behaviour.rs @@ -58,7 +58,7 @@ impl Behaviour { local_public_key: PublicKey, protocol: RegisteredProtocol, known_addresses: Vec<(PeerId, Multiaddr)>, - peerset: substrate_peerset::PeersetMut, + peerset: substrate_peerset::Peerset, enable_mdns: bool, ) -> Self { let identify = { diff --git a/core/network-libp2p/src/custom_proto/behaviour.rs b/core/network-libp2p/src/custom_proto/behaviour.rs index 91dd6b29bc2db..8e8870e941ba6 100644 --- a/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/core/network-libp2p/src/custom_proto/behaviour.rs @@ -31,7 +31,7 @@ pub struct CustomProto { protocol: RegisteredProtocol, /// Receiver for instructions about who to connect to or disconnect from. - peerset: substrate_peerset::PeersetMut, + peerset: substrate_peerset::Peerset, /// List of peers in our state. peers: FnvHashMap, @@ -175,7 +175,7 @@ impl CustomProto { /// Creates a `CustomProtos`. pub fn new( protocol: RegisteredProtocol, - peerset: substrate_peerset::PeersetMut, + peerset: substrate_peerset::Peerset, ) -> Self { CustomProto { protocol, @@ -213,7 +213,7 @@ impl CustomProto { // DisabledPendingEnable => Disabled. PeerState::DisabledPendingEnable { open, connected_point, timer } => { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id); + self.peerset.dropped(peer_id.clone()); let banned_until = Some(if let Some(ban) = ban { cmp::max(timer.deadline(), Instant::now() + ban) } else { @@ -225,7 +225,7 @@ impl CustomProto { // Enabled => Disabled. PeerState::Enabled { open, connected_point } => { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id); + self.peerset.dropped(peer_id.clone()); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), @@ -484,7 +484,7 @@ impl CustomProto { debug!(target: "sub-libp2p", "PSM => Accept({:?}, {:?}): Obsolete incoming, sending back dropped", index, incoming.peer_id); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", incoming.peer_id); - self.peerset.dropped(&incoming.peer_id); + self.peerset.dropped(incoming.peer_id.clone()); return } @@ -662,7 +662,7 @@ where debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was disabled \ (through {:?}) but pending enable", peer_id, endpoint); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id); + self.peerset.dropped(peer_id.clone()); self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer.deadline() }); if open { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); @@ -679,7 +679,7 @@ where debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was enabled \ (through {:?})", peer_id, endpoint); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id); + self.peerset.dropped(peer_id.clone()); if open { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); @@ -730,7 +730,7 @@ where until: Instant::now() + Duration::from_secs(5) }; debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id) + self.peerset.dropped(peer_id.clone()) }, // We can still get dial failures even if we are already connected to the node, diff --git a/core/network-libp2p/src/service_task.rs b/core/network-libp2p/src/service_task.rs index 37c4b05eaa3b9..d9d65e6af86d5 100644 --- a/core/network-libp2p/src/service_task.rs +++ b/core/network-libp2p/src/service_task.rs @@ -38,7 +38,7 @@ use std::time::Duration; pub fn start_service( config: NetworkConfiguration, registered_custom: RegisteredProtocol, -) -> Result<(Service, Arc), IoError> +) -> Result<(Service, substrate_peerset::PeersetHandle), IoError> where TMessage: CustomMessage + Send + 'static { if let Some(ref path) = config.net_config_path { @@ -72,7 +72,7 @@ where TMessage: CustomMessage + Send + 'static { } // Build the peerset. - let (peerset, peerset_receiver) = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig { + let (peerset, peerset_handle) = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig { in_peers: config.in_peers, out_peers: config.out_peers, bootnodes, @@ -88,7 +88,7 @@ where TMessage: CustomMessage + Send + 'static { // Build the swarm. let (mut swarm, bandwidth) = { let user_agent = format!("{} ({})", config.client_version, config.node_name); - let behaviour = Behaviour::new(user_agent, local_public, registered_custom, known_addresses, peerset_receiver, config.enable_mdns); + let behaviour = Behaviour::new(user_agent, local_public, registered_custom, known_addresses, peerset, config.enable_mdns); let (transport, bandwidth) = transport::build_transport(local_identity); (Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth) }; @@ -116,7 +116,7 @@ where TMessage: CustomMessage + Send + 'static { injected_events: Vec::new(), }; - Ok((service, peerset)) + Ok((service, peerset_handle)) } /// Event produced by the service. diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 8ca4f05259042..597f361d2bf04 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -25,7 +25,7 @@ use parking_lot::{Mutex, RwLock}; use network_libp2p::{ProtocolId, NetworkConfiguration, Severity}; use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; use network_libp2p::{multiaddr, RegisteredProtocol, NetworkState}; -use peerset::Peerset; +use peerset::PeersetHandle; use consensus::import_queue::{ImportQueue, Link}; use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; @@ -143,7 +143,7 @@ pub struct Service> { network: Arc>>>, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which /// nodes it should be connected to or not. - peerset: Arc, + peerset: PeersetHandle, /// Protocol sender protocol_sender: Sender>, /// Sender for messages to the background service task, and handle for the background thread. @@ -360,7 +360,7 @@ impl> ManageNetwork for Service } fn remove_reserved_peer(&self, peer: PeerId) { - self.peerset.remove_reserved_peer(&peer); + self.peerset.remove_reserved_peer(peer); } fn add_reserved_peer(&self, peer: String) -> Result<(), String> { @@ -474,7 +474,7 @@ fn start_thread( network_port: NetworkPort, config: NetworkConfiguration, registered: RegisteredProtocol>, -) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>>>, Arc), Error> { +) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>>>, PeersetHandle), Error> { // Start the main service. let (service, peerset) = match start_service(config, registered) { Ok((service, peerset)) => (Arc::new(Mutex::new(service)), peerset), diff --git a/core/peerset/Cargo.toml b/core/peerset/Cargo.toml index 12f27d84efe09..1b505682d85cc 100644 --- a/core/peerset/Cargo.toml +++ b/core/peerset/Cargo.toml @@ -8,13 +8,9 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] -fnv = "1.0" futures = "0.1" libp2p = { version = "0.6.0", default-features = false } +linked-hash-map = "0.5" log = "0.4" -parking_lot = "0.7" -rand = "0.6" -serde = "1.0.70" -serde_derive = "1.0.70" +lru-cache = "0.1.2" serde_json = "1.0.24" -tokio-io = "0.1" diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index f49cbd6f12da0..9c6389a139c95 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -17,39 +17,138 @@ //! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be //! connected to. -use std::collections::HashSet; -use futures::{prelude::*, sync::mpsc}; +mod slots; + +use std::collections::VecDeque; +use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; -use parking_lot::Mutex; -use std::sync::Arc; +use linked_hash_map::LinkedHashMap; use log::trace; - +use lru_cache::LruCache; +use slots::{SlotType, SlotState, Slots}; pub use serde_json::Value; -/// Shared part of the peer set manager (PSM). Distributed around the code. -pub struct Peerset { - tx: mpsc::UnboundedSender, - inner: Mutex, +const PEERSET_SCORES_CACHE_SIZE: usize = 1000; + +/// FIFO-ordered list of nodes that we know exist, but we are not connected to. +#[derive(Debug, Default)] +struct Discovered { + /// Nodes we should connect to first. + reserved: LinkedHashMap, + /// All remaining nodes. + common: LinkedHashMap, +} + +impl Discovered { + /// Returns true if we already know given node. + fn contains(&self, peer_id: &PeerId) -> bool { + self.reserved.contains_key(peer_id) || self.common.contains_key(peer_id) + } + + /// Returns true if given node is reserved. + fn is_reserved(&self, peer_id: &PeerId) -> bool { + self.reserved.contains_key(peer_id) + } + + /// Adds new peer of a given type. + fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) { + if !self.contains(&peer_id) { + match slot_type { + SlotType::Common => self.common.insert(peer_id, ()), + SlotType::Reserved => self.reserved.insert(peer_id, ()), + }; + } + } + + /// Pops the oldest peer from the list. + fn pop_peer(&mut self, reserved_only: bool) -> Option<(PeerId, SlotType)> { + if let Some((peer_id, _)) = self.reserved.pop_front() { + return Some((peer_id, SlotType::Reserved)); + } + + if reserved_only { + return None; + } + + self.common.pop_front() + .map(|(peer_id, _)| (peer_id, SlotType::Common)) + } + + /// Marks the node as not reserved. + fn mark_not_reserved(&mut self, peer_id: &PeerId) { + if let Some(_) = self.reserved.remove(peer_id) { + self.common.insert(peer_id.clone(), ()); + } + } + + /// Removes the node from the list. + fn remove_peer(&mut self, peer_id: &PeerId) { + self.reserved.remove(peer_id); + self.common.remove(peer_id); + } } -struct Inner { - /// List of nodes that we know exist but we are not connected to. +#[derive(Debug)] +struct PeersetData { + /// List of nodes that we know exist, but we are not connected to. /// Elements in this list must never be in `out_slots` or `in_slots`. - discovered: Vec, - /// List of reserved nodes. - reserved: HashSet, + discovered: Discovered, /// If true, we only accept reserved nodes. reserved_only: bool, - /// Node slots for outgoing connections. Each slot contains either `None` if the node is free, - /// or `Some` if it is assigned to a peer. - out_slots: Vec>, - /// Node slots for incoming connections. Each slot contains either `None` if the node is free, - /// or `Some` if it is assigned to a peer. - in_slots: Vec>, + /// Node slots for outgoing connections. + out_slots: Slots, + /// Node slots for incoming connections. + in_slots: Slots, + /// List of node scores. + scores: LruCache, } -/// Message that can be sent by the peer set manager (PSM). #[derive(Debug)] +enum Action { + AddReservedPeer(PeerId), + RemoveReservedPeer(PeerId), + SetReservedOnly(bool), + ReportPeer(PeerId, i32), +} + +/// Shared handle to the peer set manager (PSM). Distributed around the code. +#[derive(Debug, Clone)] +pub struct PeersetHandle { + tx: mpsc::UnboundedSender, +} + +impl PeersetHandle { + /// Adds a new reserved peer. The peerset will make an effort to always remain connected to + /// this peer. + /// + /// Has no effect if the node was already a reserved peer. + /// + /// > **Note**: Keep in mind that the networking has to know an address for this node, + /// > otherwise it will not be able to connect to it. + pub fn add_reserved_peer(&self, peer_id: PeerId) { + let _ = self.tx.unbounded_send(Action::AddReservedPeer(peer_id)); + } + + /// Remove a previously-added reserved peer. + /// + /// Has no effect if the node was not a reserved peer. + pub fn remove_reserved_peer(&self, peer_id: PeerId) { + let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(peer_id)); + } + + /// Sets whether or not the peerset only has connections . + pub fn set_reserved_only(&self, reserved: bool) { + let _ = self.tx.unbounded_send(Action::SetReservedOnly(reserved)); + } + + /// Reports an adjustment to the reputation of the given peer. + pub fn report_peer(&self, peer_id: PeerId, score_diff: i32) { + let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); + } +} + +/// Message that can be sent by the peer set manager (PSM). +#[derive(Debug, PartialEq)] pub enum Message { /// Request to open a connection to the given peer. From the point of view of the PSM, we are /// immediately connected. @@ -104,130 +203,158 @@ pub struct PeersetConfig { /// /// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never /// errors. -pub struct PeersetMut { - parent: Arc, - rx: mpsc::UnboundedReceiver, +#[derive(Debug)] +pub struct Peerset { + data: PeersetData, + rx: mpsc::UnboundedReceiver, + message_queue: VecDeque, } impl Peerset { /// Builds a new peerset from the given configuration. - pub fn from_config(config: PeersetConfig) -> (Arc, PeersetMut) { + pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) { let (tx, rx) = mpsc::unbounded(); - let mut inner = Inner { - discovered: config.bootnodes.into_iter().collect(), - reserved: Default::default(), + let data = PeersetData { + discovered: Default::default(), reserved_only: config.reserved_only, - out_slots: (0 .. config.out_peers).map(|_| None).collect(), - in_slots: (0 .. config.in_peers).map(|_| None).collect(), + out_slots: Slots::new(config.out_peers), + in_slots: Slots::new(config.in_peers), + scores: LruCache::new(PEERSET_SCORES_CACHE_SIZE), }; - alloc_slots(&mut inner, &tx); - - let peerset = Arc::new(Peerset { + let handle = PeersetHandle { tx, - inner: Mutex::new(inner), - }); + }; - let rx = PeersetMut { - parent: peerset.clone(), + let mut peerset = Peerset { + data, rx, + message_queue: VecDeque::new(), }; - for reserved in config.reserved_nodes { - peerset.add_reserved_peer(reserved); + for peer_id in config.reserved_nodes { + peerset.data.discovered.add_peer(peer_id, SlotType::Reserved); } - (peerset, rx) - } - - /// Adds a new reserved peer. The peerset will make an effort to always remain connected to - /// this peer. - /// - /// Has no effect if the node was already a reserved peer. - /// - /// > **Note**: Keep in mind that the networking has to know an address for this node, - /// > otherwise it will not be able to connect to it. - pub fn add_reserved_peer(&self, peer_id: PeerId) { - let mut inner = self.inner.lock(); - if !inner.reserved.insert(peer_id.clone()) { - // Immediately return if this peer was already in the list. - return; + for peer_id in config.bootnodes { + peerset.data.discovered.add_peer(peer_id, SlotType::Common); } + peerset.alloc_slots(); + (peerset, handle) + } + + fn on_add_reserved_peer(&mut self, peer_id: PeerId) { // Nothing more to do if we're already connected. - if inner.out_slots.iter().chain(inner.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) { + if self.data.in_slots.contains(&peer_id) { + self.data.in_slots.mark_reserved(&peer_id); return; } - // Assign a slot for this reserved peer. - if let Some(pos) = inner.out_slots.iter().position(|s| s.as_ref().map(|n| !inner.reserved.contains(n)).unwrap_or(true)) { - let _ = self.tx.unbounded_send(Message::Connect(peer_id.clone())); - inner.out_slots[pos] = Some(peer_id); - - } else { - // All slots are filled with reserved peers. - if inner.discovered.iter().all(|p| *p != peer_id) { - inner.discovered.push(peer_id); + match self.data.out_slots.add_peer(peer_id, SlotType::Reserved) { + SlotState::Added(peer_id) => { + // reserved node may have been previously stored as normal node in discovered list + self.data.discovered.remove_peer(&peer_id); + + // notify that connection has been made + self.message_queue.push_back(Message::Connect(peer_id)); + return; + }, + SlotState::Swaped { removed, added } => { + // reserved node may have been previously stored as normal node in discovered list + self.data.discovered.remove_peer(&added); + // let's add the peer we disconnected from to the discovered list again + self.data.discovered.add_peer(removed.clone(), SlotType::Common); + // swap connections + self.message_queue.push_back(Message::Drop(removed)); + self.message_queue.push_back(Message::Connect(added)); + } + SlotState::AlreadyConnected(_) | SlotState::Upgraded(_) => { + return; + } + SlotState::MaxConnections(peer_id) => { + self.data.discovered.add_peer(peer_id, SlotType::Reserved); + return; } } } - /// Remove a previously-added reserved peer. - /// - /// Has no effect if the node was not a reserved peer. - pub fn remove_reserved_peer(&self, peer_id: &PeerId) { - let mut inner = self.inner.lock(); - inner.reserved.remove(peer_id); + fn on_remove_reserved_peer(&mut self, peer_id: PeerId) { + self.data.in_slots.mark_not_reserved(&peer_id); + self.data.out_slots.mark_not_reserved(&peer_id); + self.data.discovered.mark_not_reserved(&peer_id); + if self.data.reserved_only { + if self.data.in_slots.clear_slot(&peer_id) || self.data.out_slots.clear_slot(&peer_id) { + // insert peer back into discovered list + self.data.discovered.add_peer(peer_id.clone(), SlotType::Common); + self.message_queue.push_back(Message::Drop(peer_id)); + // call alloc_slots again, cause we may have some reserved peers in discovered list + // waiting for the slot that was just cleared + self.alloc_slots(); + } + } } - /// Sets whether or not the peerset only has connections . - pub fn set_reserved_only(&self, reserved_only: bool) { - let mut inner = self.inner.lock(); - let inner = &mut *inner; // Fixes a borrowing issue. - inner.reserved_only = reserved_only; - + fn on_set_reserved_only(&mut self, reserved_only: bool) { // Disconnect non-reserved nodes. - if reserved_only { - for slot in inner.out_slots.iter_mut().chain(inner.in_slots.iter_mut()) { - if let Some(peer) = slot.as_ref() { - if inner.reserved.contains(peer) { - continue; - } - - let _ = self.tx.unbounded_send(Message::Drop(peer.clone())); - } - - *slot = None; + self.data.reserved_only = reserved_only; + if self.data.reserved_only { + for peer_id in self.data.in_slots.clear_common_slots().into_iter().chain(self.data.out_slots.clear_common_slots().into_iter()) { + // insert peer back into discovered list + self.data.discovered.add_peer(peer_id.clone(), SlotType::Common); + self.message_queue.push_back(Message::Drop(peer_id)); } + } else { + self.alloc_slots(); } } - /// Reports an adjustement to the reputation of the given peer. - pub fn report_peer(&self, _peer_id: &PeerId, _score_diff: i32) { - // This is not implemented in this dummy implementation. - } -} - -fn alloc_slots(inner: &mut Inner, tx: &mpsc::UnboundedSender) { - if inner.reserved_only { - return; - } + fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) { + let score = match self.data.scores.get_mut(&peer_id) { + Some(score) => { + *score = score.saturating_add(score_diff); + *score + }, + None => { + self.data.scores.insert(peer_id.clone(), score_diff); + score_diff + } + }; - for slot in inner.out_slots.iter_mut() { - if slot.is_some() { - continue; + if score < 0 { + // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method + if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { + self.data.in_slots.clear_slot(&peer_id); + self.data.out_slots.clear_slot(&peer_id); + self.message_queue.push_back(Message::Drop(peer_id)); + } } + } - if !inner.discovered.is_empty() { - let elem = inner.discovered.remove(0); - *slot = Some(elem.clone()); - let _ = tx.unbounded_send(Message::Connect(elem)); + fn alloc_slots(&mut self) { + while let Some((peer_id, slot_type)) = self.data.discovered.pop_peer(self.data.reserved_only) { + match self.data.out_slots.add_peer(peer_id, slot_type) { + SlotState::Added(peer_id) => { + self.message_queue.push_back(Message::Connect(peer_id)); + }, + SlotState::Swaped { removed, added } => { + // insert peer back into discovered list + self.data.discovered.add_peer(removed.clone(), SlotType::Common); + self.message_queue.push_back(Message::Drop(removed)); + self.message_queue.push_back(Message::Connect(added)); + } + SlotState::Upgraded(_) | SlotState::AlreadyConnected(_) => { + // TODO: we should never reach this point + }, + SlotState::MaxConnections(peer_id) => { + self.data.discovered.add_peer(peer_id, slot_type); + break; + }, + } } } -} -impl PeersetMut { /// Indicate that we received an incoming connection. Must be answered either with /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. /// @@ -237,21 +364,51 @@ impl PeersetMut { /// /// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the /// peerset is already connected to, in which case it must not answer. - pub fn incoming(&self, peer_id: PeerId, index: IncomingIndex) { - let mut inner = self.parent.inner.lock(); - trace!("Incoming {}\nin_slots={:?}\nout_slots={:?}", peer_id, inner.in_slots, inner.out_slots); - if inner.out_slots.iter().chain(inner.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) { - return + pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { + trace!("Incoming {}\nin_slots={:?}\nout_slots={:?}", peer_id, self.data.in_slots, self.data.out_slots); + // if `reserved_only` is set, but this peer is not a part of our discovered list, + // a) it is not reserved, so we reject the connection + // b) we are already connected to it, so we reject the connection + if self.data.reserved_only && !self.data.discovered.is_reserved(&peer_id) { + self.message_queue.push_back(Message::Reject(index)); + return; + } + + // check if we are already connected to this peer + if self.data.out_slots.contains(&peer_id) { + // we are already connected. in this case we do not answer + return; } - if let Some(pos) = inner.in_slots.iter().position(|s| s.is_none()) { - inner.in_slots[pos] = Some(peer_id); - let _ = self.parent.tx.unbounded_send(Message::Accept(index)); + let slot_type = if self.data.reserved_only { + SlotType::Reserved } else { - if inner.discovered.iter().all(|p| *p != peer_id) { - inner.discovered.push(peer_id); - } - let _ = self.parent.tx.unbounded_send(Message::Reject(index)); + SlotType::Common + }; + + match self.data.in_slots.add_peer(peer_id, slot_type) { + SlotState::Added(peer_id) => { + // reserved node may have been previously stored as normal node in discovered list + self.data.discovered.remove_peer(&peer_id); + self.message_queue.push_back(Message::Accept(index)); + return; + }, + SlotState::Swaped { removed, added } => { + // reserved node may have been previously stored as normal node in discovered list + self.data.discovered.remove_peer(&added); + // swap connections. + self.message_queue.push_back(Message::Drop(removed)); + self.message_queue.push_back(Message::Accept(index)); + }, + SlotState::AlreadyConnected(_) | SlotState::Upgraded(_) => { + // we are already connected. in this case we do not answer + return; + }, + SlotState::MaxConnections(peer_id) => { + self.data.discovered.add_peer(peer_id, slot_type); + self.message_queue.push_back(Message::Reject(index)); + return; + }, } } @@ -259,48 +416,35 @@ impl PeersetMut { /// /// Must only be called after the PSM has either generated a `Connect` message with this /// `PeerId`, or accepted an incoming connection with this `PeerId`. - pub fn dropped(&self, peer_id: &PeerId) { - let mut inner = self.parent.inner.lock(); - let inner = &mut *inner; // Fixes a borrowing issue. - trace!("Dropping {}\nin_slots={:?}\nout_slots={:?}", peer_id, inner.in_slots, inner.out_slots); - + pub fn dropped(&mut self, peer_id: PeerId) { + trace!("Dropping {}\nin_slots={:?}\nout_slots={:?}", peer_id, self.data.in_slots, self.data.out_slots); // Automatically connect back if reserved. - if inner.reserved.contains(peer_id) { - let _ = self.parent.tx.unbounded_send(Message::Connect(peer_id.clone())); - return + if self.data.in_slots.is_connected_and_reserved(&peer_id) || self.data.out_slots.is_connected_and_reserved(&peer_id) { + self.message_queue.push_back(Message::Connect(peer_id)); + return; } // Otherwise, free the slot. - for slot in inner.out_slots.iter_mut().chain(inner.in_slots.iter_mut()) { - if slot.as_ref() == Some(peer_id) { - *slot = None; - break; - } - } + self.data.in_slots.clear_slot(&peer_id); + self.data.out_slots.clear_slot(&peer_id); // Note: in this dummy implementation we consider that peers never expire. As soon as we // are disconnected from a peer, we try again. - if inner.discovered.iter().all(|p| p != peer_id) { - inner.discovered.push(peer_id.clone()); - } - alloc_slots(inner, &self.parent.tx); + self.data.discovered.add_peer(peer_id, SlotType::Common); + self.alloc_slots(); } /// Adds a discovered peer id to the PSM. /// /// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility /// > of the PSM to remove `PeerId`s that fail to dial too often. - pub fn discovered(&self, peer_id: PeerId) { - let mut inner = self.parent.inner.lock(); - - if inner.out_slots.iter().chain(inner.in_slots.iter()).any(|p| p.as_ref() == Some(&peer_id)) { + pub fn discovered(&mut self, peer_id: PeerId) { + if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { return; } - if inner.discovered.iter().all(|p| *p != peer_id) { - inner.discovered.push(peer_id); - } - alloc_slots(&mut inner, &self.parent.tx); + self.data.discovered.add_peer(peer_id, SlotType::Common); + self.alloc_slots(); } /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. @@ -309,11 +453,281 @@ impl PeersetMut { } } -impl Stream for PeersetMut { +impl Stream for Peerset { type Item = Message; type Error = (); fn poll(&mut self) -> Poll, Self::Error> { - self.rx.poll() + loop { + if let Some(message) = self.message_queue.pop_front() { + return Ok(Async::Ready(Some(message))); + } + match try_ready!(self.rx.poll()) { + None => return Ok(Async::Ready(None)), + Some(action) => match action { + Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id), + Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id), + Action::SetReservedOnly(reserved) => self.on_set_reserved_only(reserved), + Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff), + } + } + } + } +} + +#[cfg(test)] +mod tests { + use libp2p::PeerId; + use futures::prelude::*; + use super::{PeersetConfig, Peerset, Message, IncomingIndex}; + + fn assert_messages(mut peerset: Peerset, messages: Vec) -> Peerset { + for expected_message in messages { + let (message, p) = next_message(peerset).expect("expected message"); + assert_eq!(message, expected_message); + peerset = p; + } + assert!(peerset.message_queue.is_empty()); + peerset + } + + fn next_message(peerset: Peerset) -> Result<(Message, Peerset), ()> { + let (next, peerset) = peerset.into_future() + .wait() + .map_err(|_| ())?; + let message = next.ok_or_else(|| ())?; + Ok((message, peerset)) + } + + #[test] + fn test_peerset_from_config_with_bootnodes() { + let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 2, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], + reserved_only: false, + reserved_nodes: Vec::new(), + }; + + let (peerset, _handle) = Peerset::from_config(config); + + assert_messages(peerset, vec![ + Message::Connect(bootnode), + Message::Connect(bootnode2), + ]); + } + + #[test] + fn test_peerset_from_config_with_reserved_nodes() { + let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); + let reserved_peer = PeerId::random(); + let reserved_peer2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 3, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], + reserved_only: false, + reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], + }; + + let (peerset, _handle) = Peerset::from_config(config); + + assert_messages(peerset, vec![ + Message::Connect(reserved_peer), + Message::Connect(reserved_peer2), + Message::Connect(bootnode) + ]); + } + + #[test] + fn test_peerset_add_reserved_peer() { + let bootnode = PeerId::random(); + let reserved_peer = PeerId::random(); + let reserved_peer2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 2, + bootnodes: vec![bootnode], + reserved_only: true, + reserved_nodes: Vec::new(), + }; + + let (peerset, handle) = Peerset::from_config(config); + handle.add_reserved_peer(reserved_peer.clone()); + handle.add_reserved_peer(reserved_peer2.clone()); + + assert_messages(peerset, vec![ + Message::Connect(reserved_peer), + Message::Connect(reserved_peer2) + ]); + } + + #[test] + fn test_peerset_remove_reserved_peer() { + let reserved_peer = PeerId::random(); + let reserved_peer2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 2, + bootnodes: vec![], + reserved_only: false, + reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], + }; + + let (peerset, handle) = Peerset::from_config(config); + handle.remove_reserved_peer(reserved_peer.clone()); + + let peerset = assert_messages(peerset, vec![ + Message::Connect(reserved_peer.clone()), + Message::Connect(reserved_peer2.clone()), + ]); + + handle.set_reserved_only(true); + handle.remove_reserved_peer(reserved_peer2.clone()); + + assert_messages(peerset, vec![ + Message::Drop(reserved_peer), + Message::Drop(reserved_peer2), + ]); + } + + #[test] + fn test_peerset_set_reserved_only() { + let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); + let reserved_peer = PeerId::random(); + let reserved_peer2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 4, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], + reserved_only: false, + reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], + }; + + let (peerset, handle) = Peerset::from_config(config); + handle.set_reserved_only(true); + handle.set_reserved_only(false); + + assert_messages(peerset, vec![ + Message::Connect(reserved_peer), + Message::Connect(reserved_peer2), + Message::Connect(bootnode.clone()), + Message::Connect(bootnode2.clone()), + Message::Drop(bootnode.clone()), + Message::Drop(bootnode2.clone()), + Message::Connect(bootnode), + Message::Connect(bootnode2), + ]); + } + + #[test] + fn test_peerset_report_peer() { + let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 1, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], + reserved_only: false, + reserved_nodes: Vec::new(), + }; + + let (peerset, handle) = Peerset::from_config(config); + handle.report_peer(bootnode2, -1); + handle.report_peer(bootnode.clone(), -1); + + assert_messages(peerset, vec![ + Message::Connect(bootnode.clone()), + Message::Drop(bootnode) + ]); + } + + #[test] + fn test_peerset_incoming() { + let bootnode = PeerId::random(); + let incoming = PeerId::random(); + let incoming2 = PeerId::random(); + let incoming3 = PeerId::random(); + let ii = IncomingIndex(1); + let ii2 = IncomingIndex(2); + let ii3 = IncomingIndex(3); + let ii4 = IncomingIndex(3); + let config = PeersetConfig { + in_peers: 2, + out_peers: 1, + bootnodes: vec![bootnode.clone()], + reserved_only: false, + reserved_nodes: Vec::new(), + }; + + let (mut peerset, _handle) = Peerset::from_config(config); + peerset.incoming(incoming.clone(), ii); + peerset.incoming(incoming.clone(), ii4); + peerset.incoming(incoming2.clone(), ii2); + peerset.incoming(incoming3.clone(), ii3); + + assert_messages(peerset, vec![ + Message::Connect(bootnode.clone()), + Message::Accept(ii), + Message::Accept(ii2), + Message::Reject(ii3), + ]); + } + + #[test] + fn test_peerset_dropped() { + let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); + let reserved_peer = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 2, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], + reserved_only: false, + reserved_nodes: vec![reserved_peer.clone()], + }; + + let (peerset, _handle) = Peerset::from_config(config); + + let mut peerset = assert_messages(peerset, vec![ + Message::Connect(reserved_peer.clone()), + Message::Connect(bootnode.clone()), + ]); + + peerset.dropped(reserved_peer.clone()); + peerset.dropped(bootnode); + + let _peerset = assert_messages(peerset, vec![ + Message::Connect(reserved_peer), + Message::Connect(bootnode2), + ]); + } + + #[test] + fn test_peerset_discovered() { + let bootnode = PeerId::random(); + let discovered = PeerId::random(); + let discovered2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 2, + bootnodes: vec![bootnode.clone()], + reserved_only: false, + reserved_nodes: vec![], + }; + + let (mut peerset, _handle) = Peerset::from_config(config); + peerset.discovered(discovered.clone()); + peerset.discovered(discovered.clone()); + peerset.discovered(discovered2); + + assert_messages(peerset, vec![ + Message::Connect(bootnode), + Message::Connect(discovered), + ]); } } diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs new file mode 100644 index 0000000000000..7fa655d7ff608 --- /dev/null +++ b/core/peerset/src/slots.rs @@ -0,0 +1,148 @@ +// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::mem; +use libp2p::PeerId; +use linked_hash_map::LinkedHashMap; + +/// Describes the nature of connection with a given peer. +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum SlotType { + /// Reserved peer is a peer we should always stay connected to. + Reserved, + /// Common peer is a type of peer that we stay connected to only if it's + /// useful for us. + Common, +} + +/// Descibes the result of `add_peer` action. +pub enum SlotState { + /// Returned when `add_peer` successfully adds a peer to the slot. + Added(PeerId), + /// Returned we already have a connection to a given peer, but it is upgraded from + /// `Common` to `Reserved`. + Upgraded(PeerId), + /// Returned when we should removed a common peer to make space for a reserved peer. + Swaped { + /// Peer we should disconnect from. + removed: PeerId, + /// Peer we should connect to. + added: PeerId, + }, + /// Error returned when we are already connected to this peer. + AlreadyConnected(PeerId), + /// Error returned when max number of connections has been already established. + MaxConnections(PeerId), +} + +/// Contains all information about group of slots. +#[derive(Debug)] +pub struct Slots { + max_slots: usize, + /// Nodes and their type. We use `LinkedHashMap` to make this data structure more predictable + slots: LinkedHashMap, +} + +impl Slots { + /// Creates a group of slots with a limited size. + pub fn new(max_slots: u32) -> Self { + let max_slots = max_slots as usize; + Slots { + max_slots, + slots: LinkedHashMap::with_capacity(max_slots), + } + } + + /// Returns true if one of the slots contains given peer. + pub fn contains(&self, peer_id: &PeerId) -> bool { + self.slots.contains_key(peer_id) + } + + /// Tries to find a slot for a given peer and returns `SlotState`. + pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> SlotState { + if let Some(st) = self.slots.get_mut(&peer_id) { + if *st == SlotType::Common && slot_type == SlotType::Reserved { + *st = SlotType::Reserved; + return SlotState::Upgraded(peer_id); + } else { + return SlotState::AlreadyConnected(peer_id); + } + } + + if self.slots.len() == self.max_slots { + if let SlotType::Reserved = slot_type { + // if we are trying to insert a reserved peer, but we all of our slots are full, + // we need to remove one of the existing common connections + let to_remove = self.slots.iter() + .find(|(_, &slot_type)| slot_type == SlotType::Common) + .map(|(to_remove, _)| to_remove) + .cloned(); + + if let Some(to_remove) = to_remove { + self.slots.remove(&to_remove); + self.slots.insert(peer_id.clone(), slot_type); + + return SlotState::Swaped { + removed: to_remove, + added: peer_id, + }; + } + } + return SlotState::MaxConnections(peer_id); + } + + self.slots.insert(peer_id.clone(), slot_type); + SlotState::Added(peer_id) + } + + pub fn clear_common_slots(&mut self) -> Vec { + let slots = mem::replace(&mut self.slots, LinkedHashMap::with_capacity(self.max_slots)); + let mut common_peers = Vec::new(); + for (peer_id, slot_type) in slots { + match slot_type { + SlotType::Common => { + common_peers.push(peer_id); + }, + SlotType::Reserved => { + self.slots.insert(peer_id, slot_type); + }, + } + } + common_peers + } + + pub fn mark_reserved(&mut self, peer_id: &PeerId) { + if let Some(slot_type) = self.slots.get_mut(peer_id) { + *slot_type = SlotType::Reserved; + } + } + + pub fn mark_not_reserved(&mut self, peer_id: &PeerId) { + if let Some(slot_type) = self.slots.get_mut(peer_id) { + *slot_type = SlotType::Common; + } + } + + pub fn clear_slot(&mut self, peer_id: &PeerId) -> bool { + self.slots.remove(peer_id).is_some() + } + + pub fn is_connected_and_reserved(&self, peer_id: &PeerId) -> bool { + self.slots.get(peer_id) + .map(|slot_type| *slot_type == SlotType::Reserved) + .unwrap_or_else(|| false) + } +}