Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ fn start_thread<B: BlockT + 'static>(
let (close_tx, close_rx) = oneshot::channel();
let service_clone = service.clone();
let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?;
let peerset_clone = peerset.clone();
let thread = thread::Builder::new().name("network".to_string()).spawn(move || {
let fut = run_thread(protocol_sender, service_clone, network_port)
let fut = run_thread(protocol_sender, service_clone, network_port, peerset_clone)
.select(close_rx.then(|_| Ok(())))
.map(|(val, _)| val)
.map_err(|(err,_ )| err);
Expand All @@ -509,6 +510,7 @@ fn run_thread<B: BlockT + 'static>(
protocol_sender: Sender<FromNetworkMsg<B>>,
network_service: Arc<Mutex<NetworkService<Message<B>>>>,
network_port: NetworkPort<B>,
peerset: PeersetHandle,
) -> impl Future<Item = (), Error = io::Error> {

let network_service_2 = network_service.clone();
Expand All @@ -532,9 +534,9 @@ fn run_thread<B: BlockT + 'static>(
match severity {
Severity::Bad(message) => {
info!(target: "sync", "Banning {:?} because {:?}", who, message);
warn!(target: "sync", "Banning a node is a deprecated mechanism that \
should be removed");
network_service_2.lock().drop_node(&who)
network_service_2.lock().drop_node(&who);
// temporary: make sure the peer gets dropped from the peerset
peerset.report_peer(who, i32::min_value());
},
Severity::Useless(message) => {
debug!(target: "sync", "Dropping {:?} because {:?}", who, message);
Expand Down
4 changes: 4 additions & 0 deletions core/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl Peerset {
self.data.discovered.remove_peer(&peer_id);

// notify that connection has been made
trace!(target: "peerset", "Connecting to new reserved peer {}", peer_id);
self.message_queue.push_back(Message::Connect(peer_id));
return;
},
Expand All @@ -209,6 +210,7 @@ impl Peerset {
// let's add the peer we disconnected from to the discovered list again
self.data.discovered.add_peer(removed.clone(), SlotType::Common);
// swap connections
trace!(target: "peerset", "Connecting to new reserved peer {}, dropping {}", added, removed);
self.message_queue.push_back(Message::Drop(removed));
self.message_queue.push_back(Message::Connect(added));
}
Expand Down Expand Up @@ -278,10 +280,12 @@ impl Peerset {
while let Some((peer_id, slot_type)) = self.data.discovered.pop_most_important_peer(self.data.reserved_only) {
match self.data.out_slots.add_peer(peer_id, slot_type) {
SlotState::Added(peer_id) => {
trace!(target: "peerset", "Connecting to new peer {}", peer_id);
self.message_queue.push_back(Message::Connect(peer_id));
},
SlotState::Swaped { removed, added } => {
// insert peer back into discovered list
trace!(target: "peerset", "Connecting to new peer {}, dropping {}", added, removed);
self.data.discovered.add_peer(removed.clone(), SlotType::Common);
self.message_queue.push_back(Message::Drop(removed));
self.message_queue.push_back(Message::Connect(added));
Expand Down