Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ fn start_thread<B: BlockT + 'static>(
let (close_tx, close_rx) = oneshot::channel();
let service_clone = service.clone();
let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?;
let peerset_clone = peerset.clone();
let thread = thread::Builder::new().name("network".to_string()).spawn(move || {
let fut = run_thread(protocol_sender, service_clone, network_port)
let fut = run_thread(protocol_sender, service_clone, network_port, peerset_clone)
.select(close_rx.then(|_| Ok(())))
.map(|(val, _)| val)
.map_err(|(err,_ )| err);
Expand All @@ -509,6 +510,7 @@ fn run_thread<B: BlockT + 'static>(
protocol_sender: Sender<FromNetworkMsg<B>>,
network_service: Arc<Mutex<NetworkService<Message<B>>>>,
network_port: NetworkPort<B>,
peerset: PeersetHandle,
) -> impl Future<Item = (), Error = io::Error> {

let network_service_2 = network_service.clone();
Expand All @@ -532,9 +534,8 @@ fn run_thread<B: BlockT + 'static>(
match severity {
Severity::Bad(message) => {
info!(target: "sync", "Banning {:?} because {:?}", who, message);
warn!(target: "sync", "Banning a node is a deprecated mechanism that \
should be removed");
network_service_2.lock().drop_node(&who)
network_service_2.lock().drop_node(&who);
peerset.report_peer(who, i32::min_value());
},
Severity::Useless(message) => {
debug!(target: "sync", "Dropping {:?} because {:?}", who, message);
Expand Down
4 changes: 4 additions & 0 deletions core/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl Peerset {
self.data.discovered.remove_peer(&peer_id);

// notify that connection has been made
trace!(target: "peerset", "Connecting to new reserved peer {}", peer_id);
self.message_queue.push_back(Message::Connect(peer_id));
return;
},
Expand All @@ -209,6 +210,7 @@ impl Peerset {
// let's add the peer we disconnected from to the discovered list again
self.data.discovered.add_peer(removed.clone(), SlotType::Common);
// swap connections
trace!(target: "peerset", "Connecting to new reserved peer {}, dropping {}", added, removed);
self.message_queue.push_back(Message::Drop(removed));
self.message_queue.push_back(Message::Connect(added));
}
Expand Down Expand Up @@ -278,10 +280,12 @@ impl Peerset {
while let Some((peer_id, slot_type)) = self.data.discovered.pop_most_important_peer(self.data.reserved_only) {
match self.data.out_slots.add_peer(peer_id, slot_type) {
SlotState::Added(peer_id) => {
trace!(target: "peerset", "Connecting to new peer {}", peer_id);
self.message_queue.push_back(Message::Connect(peer_id));
},
SlotState::Swaped { removed, added } => {
// insert peer back into discovered list
trace!(target: "peerset", "Connecting to new peer {}, dropping {}", added, removed);
self.data.discovered.add_peer(removed.clone(), SlotType::Common);
self.message_queue.push_back(Message::Drop(removed));
self.message_queue.push_back(Message::Connect(added));
Expand Down