Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c86b760
Introduction of PeersetHandle
debris Mar 26, 2019
97430fb
integrate PeersetHandle with the rest of the codebase
debris Mar 26, 2019
12788ca
Merge branch 'master' into peerset-changes
debris Mar 26, 2019
939cc30
fix compilation errors
debris Mar 26, 2019
284fc7e
Merge branch 'master' into peerset-changes
debris Mar 28, 2019
5a04f53
more tests for peerset, fixed overwriting bug in add_reserved_peer
debris Mar 28, 2019
5037c8e
Merge branch 'master' into peerset-changes
debris Mar 29, 2019
119bff4
Merge branch 'master' into peerset-changes
debris Apr 1, 2019
141284e
Slots data structure and bugfixes for peerset
debris Apr 1, 2019
e849837
bend to pressure
debris Apr 1, 2019
9ecb800
Merge branch 'master' into peerset-changes
debris Apr 2, 2019
a74b69c
updated lru-cache to 0.1.2 and updated linked-hash-map to 0.5.2
debris Apr 2, 2019
94dc0f7
peerset discovered list is now a LinkedHashMap
debris Apr 2, 2019
1a3d85f
Merge branch 'master' into peerset-changes
debris Apr 2, 2019
832233a
fix review suggestions
debris Apr 2, 2019
48a7abf
split back Peerset and PeersetHandle
debris Apr 2, 2019
700f73b
test for Peerset::discovered
debris Apr 2, 2019
169e606
applied review suggestions
debris Apr 2, 2019
913f221
fixes to peerset::incoming
debris Apr 2, 2019
21033d7
Merge branch 'master' into peerset-changes
debris Apr 4, 2019
6bc7e14
peerset disconnects are all instantaneous
debris Apr 4, 2019
a92825f
instantaneous drop in peerset finished
debris Apr 4, 2019
ce5b4b2
Peerset::set_reserved_only can also reconnect nodes
debris Apr 4, 2019
a1fb988
Peerset scores cache uses lru-cache
debris Apr 4, 2019
8b20252
remove redundant function call and comment from Peerset::on_set_reser…
debris Apr 4, 2019
310cef2
add_peer returns SlotState enum
debris Apr 4, 2019
3e2340d
apply review suggestions
debris Apr 4, 2019
de2525b
is_reserved -> is_connected_and_reserved
debris Apr 4, 2019
494cca9
Merge branch 'master' into peerset-changes
debris Apr 4, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/network-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
local_public_key: PublicKey,
protocol: RegisteredProtocol<TMessage>,
known_addresses: Vec<(PeerId, Multiaddr)>,
peerset: substrate_peerset::PeersetMut,
peerset: substrate_peerset::Peerset,
enable_mdns: bool,
) -> Self {
let identify = {
Expand Down
16 changes: 8 additions & 8 deletions core/network-libp2p/src/custom_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct CustomProto<TMessage, TSubstream> {
protocol: RegisteredProtocol<TMessage>,

/// Receiver for instructions about who to connect to or disconnect from.
peerset: substrate_peerset::PeersetMut,
peerset: substrate_peerset::Peerset,

/// List of peers in our state.
peers: FnvHashMap<PeerId, PeerState>,
Expand Down Expand Up @@ -175,7 +175,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
/// Creates a `CustomProtos`.
pub fn new(
protocol: RegisteredProtocol<TMessage>,
peerset: substrate_peerset::PeersetMut,
peerset: substrate_peerset::Peerset,
) -> Self {
CustomProto {
protocol,
Expand Down Expand Up @@ -213,7 +213,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
// DisabledPendingEnable => Disabled.
PeerState::DisabledPendingEnable { open, connected_point, timer } => {
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id);
self.peerset.dropped(peer_id.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, why do you think it's better to clone on the call site? My preference would be to pass a reference and clone only if needed. In that case it doesn't really matter, since we know that current implementation requires ownership anyway, but IMHO passing reference is more future proof (shall the impl change).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, why do you think it's better to clone on the call site?

No, but we always need a value, cause we send it over the channel. That's why I changed the function signature

let banned_until = Some(if let Some(ban) = ban {
cmp::max(timer.deadline(), Instant::now() + ban)
} else {
Expand All @@ -225,7 +225,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
// Enabled => Disabled.
PeerState::Enabled { open, connected_point } => {
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id);
self.peerset.dropped(peer_id.clone());
debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
Expand Down Expand Up @@ -484,7 +484,7 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
debug!(target: "sub-libp2p", "PSM => Accept({:?}, {:?}): Obsolete incoming,
sending back dropped", index, incoming.peer_id);
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", incoming.peer_id);
self.peerset.dropped(&incoming.peer_id);
self.peerset.dropped(incoming.peer_id.clone());
return
}

Expand Down Expand Up @@ -662,7 +662,7 @@ where
debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was disabled \
(through {:?}) but pending enable", peer_id, endpoint);
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id);
self.peerset.dropped(peer_id.clone());
self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer.deadline() });
if open {
debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id);
Expand All @@ -679,7 +679,7 @@ where
debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was enabled \
(through {:?})", peer_id, endpoint);
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id);
self.peerset.dropped(peer_id.clone());

if open {
debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id);
Expand Down Expand Up @@ -730,7 +730,7 @@ where
until: Instant::now() + Duration::from_secs(5)
};
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id)
self.peerset.dropped(peer_id.clone())
},

// We can still get dial failures even if we are already connected to the node,
Expand Down
8 changes: 4 additions & 4 deletions core/network-libp2p/src/service_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::time::Duration;
pub fn start_service<TMessage>(
config: NetworkConfiguration,
registered_custom: RegisteredProtocol<TMessage>,
) -> Result<(Service<TMessage>, Arc<substrate_peerset::Peerset>), IoError>
) -> Result<(Service<TMessage>, substrate_peerset::PeersetHandle), IoError>
where TMessage: CustomMessage + Send + 'static {

if let Some(ref path) = config.net_config_path {
Expand Down Expand Up @@ -72,7 +72,7 @@ where TMessage: CustomMessage + Send + 'static {
}

// Build the peerset.
let (peerset, peerset_receiver) = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig {
let (peerset, peerset_handle) = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig {
in_peers: config.in_peers,
out_peers: config.out_peers,
bootnodes,
Expand All @@ -88,7 +88,7 @@ where TMessage: CustomMessage + Send + 'static {
// Build the swarm.
let (mut swarm, bandwidth) = {
let user_agent = format!("{} ({})", config.client_version, config.node_name);
let behaviour = Behaviour::new(user_agent, local_public, registered_custom, known_addresses, peerset_receiver, config.enable_mdns);
let behaviour = Behaviour::new(user_agent, local_public, registered_custom, known_addresses, peerset, config.enable_mdns);
let (transport, bandwidth) = transport::build_transport(local_identity);
(Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth)
};
Expand Down Expand Up @@ -116,7 +116,7 @@ where TMessage: CustomMessage + Send + 'static {
injected_events: Vec::new(),
};

Ok((service, peerset))
Ok((service, peerset_handle))
}

/// Event produced by the service.
Expand Down
8 changes: 4 additions & 4 deletions core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use parking_lot::{Mutex, RwLock};
use network_libp2p::{ProtocolId, NetworkConfiguration, Severity};
use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent};
use network_libp2p::{multiaddr, RegisteredProtocol, NetworkState};
use peerset::Peerset;
use peerset::PeersetHandle;
use consensus::import_queue::{ImportQueue, Link};
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};

Expand Down Expand Up @@ -143,7 +143,7 @@ pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
network: Arc<Mutex<NetworkService<Message<B>>>>,
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
/// nodes it should be connected to or not.
peerset: Arc<Peerset>,
peerset: PeersetHandle,
/// Protocol sender
protocol_sender: Sender<ProtocolMsg<B, S>>,
/// Sender for messages to the background service task, and handle for the background thread.
Expand Down Expand Up @@ -360,7 +360,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for Service
}

fn remove_reserved_peer(&self, peer: PeerId) {
self.peerset.remove_reserved_peer(&peer);
self.peerset.remove_reserved_peer(peer);
}

fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
Expand Down Expand Up @@ -474,7 +474,7 @@ fn start_thread<B: BlockT + 'static>(
network_port: NetworkPort<B>,
config: NetworkConfiguration,
registered: RegisteredProtocol<Message<B>>,
) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService<Message<B>>>>, Arc<Peerset>), Error> {
) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService<Message<B>>>>, PeersetHandle), Error> {
// Start the main service.
let (service, peerset) = match start_service(config, registered) {
Ok((service, peerset)) => (Arc::new(Mutex::new(service)), peerset),
Expand Down
8 changes: 2 additions & 6 deletions core/peerset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@ authors = ["Parity Technologies <[email protected]>"]
edition = "2018"

[dependencies]
fnv = "1.0"
futures = "0.1"
libp2p = { version = "0.6.0", default-features = false }
linked-hash-map = "0.5"
log = "0.4"
parking_lot = "0.7"
rand = "0.6"
serde = "1.0.70"
serde_derive = "1.0.70"
lru-cache = "0.1.2"
serde_json = "1.0.24"
tokio-io = "0.1"
Loading