Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions core/network-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behav
for addr in &info.listen_addrs {
self.discovery.kademlia.add_connected_address(&peer_id, addr.clone());
}
self.custom_protocols.add_discovered_node(&peer_id);
self.custom_protocols.add_discovered_nodes(Some(peer_id.clone()));
self.events.push(BehaviourOut::Identified { peer_id, info });
}
IdentifyEvent::Error { .. } => {}
Expand All @@ -272,7 +272,7 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behavio
match out {
KademliaOut::Discovered { .. } => {}
KademliaOut::KBucketAdded { peer_id, .. } => {
self.custom_protocols.add_discovered_node(&peer_id);
self.custom_protocols.add_discovered_nodes(Some(peer_id));
}
KademliaOut::FindNodeResult { key, closer_peers } => {
trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results",
Expand Down Expand Up @@ -303,9 +303,7 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<MdnsEvent> for Behaviour
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
for (peer_id, _) in list {
self.custom_protocols.add_discovered_node(&peer_id);
}
self.custom_protocols.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id));
},
MdnsEvent::Expired(_) => {}
}
Expand Down
8 changes: 5 additions & 3 deletions core/network-libp2p/src/custom_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,11 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
}

/// Indicates to the peerset that we have discovered new addresses for a given node.
pub fn add_discovered_node(&mut self, peer_id: &PeerId) {
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what to do about this debug!. I could move print it in Peerset::discovered if needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to have the logging both in sub-libp2p and in the peerset.
I guess we could have a custom iterator that does the same as Iterator::map but also implements Drop and prints the elements that are ignored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggestion. I realised that we can just print all items inside .map combinator. I fixed it in the latest commit

self.peerset.discovered(peer_id.clone())
pub fn add_discovered_nodes<I: IntoIterator<Item = PeerId>>(&mut self, peer_ids: I) {
self.peerset.discovered(peer_ids.into_iter().map(|peer_id| {
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
peer_id
}));
}

/// Returns the state of the peerset manager, for debugging purposes.
Expand Down
32 changes: 22 additions & 10 deletions core/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,11 @@ impl Peerset {
/// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the
/// peerset is already connected to, in which case it must not answer.
pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) {
trace!("Incoming {}\nin_slots={:?}\nout_slots={:?}", peer_id, self.data.in_slots, self.data.out_slots);
trace!(
target: "peerset",
"Incoming {:?}\nin_slots={:?}\nout_slots={:?}",
peer_id, self.data.in_slots, self.data.out_slots
);
// if `reserved_only` is set, but this peer is not a part of our discovered list,
// a) it is not reserved, so we reject the connection
// b) we are already connected to it, so we reject the connection
Expand Down Expand Up @@ -417,7 +421,11 @@ impl Peerset {
/// Must only be called after the PSM has either generated a `Connect` message with this
/// `PeerId`, or accepted an incoming connection with this `PeerId`.
pub fn dropped(&mut self, peer_id: PeerId) {
trace!("Dropping {}\nin_slots={:?}\nout_slots={:?}", peer_id, self.data.in_slots, self.data.out_slots);
trace!(
target: "peerset",
"Dropping {:?}\nin_slots={:?}\nout_slots={:?}",
peer_id, self.data.in_slots, self.data.out_slots
);
// Automatically connect back if reserved.
if self.data.in_slots.is_connected_and_reserved(&peer_id) || self.data.out_slots.is_connected_and_reserved(&peer_id) {
self.message_queue.push_back(Message::Connect(peer_id));
Expand All @@ -434,16 +442,20 @@ impl Peerset {
self.alloc_slots();
}

/// Adds a discovered peer id to the PSM.
/// Adds discovered peer ids to the PSM.
///
/// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility
/// > of the PSM to remove `PeerId`s that fail to dial too often.
pub fn discovered(&mut self, peer_id: PeerId) {
if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) {
return;
pub fn discovered<I: IntoIterator<Item = PeerId>>(&mut self, peer_ids: I) {
for peer_id in peer_ids {
if !self.data.in_slots.contains(&peer_id) && !self.data.out_slots.contains(&peer_id) && !self.data.discovered.contains(&peer_id) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this additional check in data.discovered, so we can print peer_id without cloning it

trace!(target: "peerset", "Discovered new peer: {:?}", peer_id);
self.data.discovered.add_peer(peer_id, SlotType::Common);
} else {
trace!(target: "peerset", "Discovered known peer: {:?}", peer_id);
}
}

self.data.discovered.add_peer(peer_id, SlotType::Common);
self.alloc_slots();
}

Expand Down Expand Up @@ -721,9 +733,9 @@ mod tests {
};

let (mut peerset, _handle) = Peerset::from_config(config);
peerset.discovered(discovered.clone());
peerset.discovered(discovered.clone());
peerset.discovered(discovered2);
peerset.discovered(Some(discovered.clone()));
peerset.discovered(Some(discovered.clone()));
peerset.discovered(Some(discovered2));

assert_messages(peerset, vec![
Message::Connect(bootnode),
Expand Down