diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 0c1f54b1864..540d4b55a28 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,5 +1,14 @@ # 0.25.0 [unreleased] +- Upon newly established connections, delay routing table + updates until after the configured protocol name has + been confirmed by the connection handler, i.e. until + after at least one substream has been successfully + negotiated. In configurations with different protocol names, + this avoids undesirable nodes being included in the + local routing table at least temporarily. + [PR 1821](https://github.com/libp2p/rust-libp2p/pull/1821). + - Update dependencies. # 0.24.0 [2020-10-16] diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 2dc3ddb0e84..e55d1e4d654 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -24,7 +24,13 @@ mod test; use crate::K_VALUE; use crate::addresses::Addresses; -use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn}; +use crate::handler::{ + KademliaHandlerProto, + KademliaHandlerConfig, + KademliaRequestId, + KademliaHandlerEvent, + KademliaHandlerIn +}; use crate::jobs::*; use crate::kbucket::{self, KBucketsTable, NodeStatus}; use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer}; @@ -38,7 +44,6 @@ use libp2p_swarm::{ NetworkBehaviourAction, NotifyHandler, PollParameters, - ProtocolsHandler }; use log::{info, debug, warn}; use smallvec::SmallVec; @@ -1428,11 +1433,11 @@ where for<'a> TStore: RecordStore<'a>, TStore: Send + 'static, { - type ProtocolsHandler = KademliaHandler; + type ProtocolsHandler = KademliaHandlerProto; type OutEvent = KademliaEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - KademliaHandler::new(KademliaHandlerConfig { + KademliaHandlerProto::new(KademliaHandlerConfig { protocol_config: self.protocol_config.clone(), allow_listening: true, idle_timeout: self.connection_idle_timeout, @@ -1462,17 +1467,11 @@ where peer_addrs } - fn inject_connection_established(&mut self, peer: &PeerId, _: &ConnectionId, endpoint: &ConnectedPoint) { - // The remote's address can only be put into the routing table, - // and thus shared with other nodes, if the local node is the dialer, - // since the remote address on an inbound connection is specific to - // that connection (e.g. typically the TCP port numbers). - let address = match endpoint { - ConnectedPoint::Dialer { address } => Some(address.clone()), - ConnectedPoint::Listener { .. } => None, - }; - - self.connection_updated(peer.clone(), address, NodeStatus::Connected); + fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) { + // When a connection is established, we don't know yet whether the + // remote supports the configured protocol name. Only once a connection + // handler reports [`KademliaHandlerEvent::ProtocolConfirmed`] do we + // update the local routing table. } fn inject_connected(&mut self, peer: &PeerId) { @@ -1606,6 +1605,19 @@ where event: KademliaHandlerEvent ) { match event { + KademliaHandlerEvent::ProtocolConfirmed { endpoint } => { + debug_assert!(self.connected_peers.contains(&source)); + // The remote's address can only be put into the routing table, + // and thus shared with other nodes, if the local node is the dialer, + // since the remote address on an inbound connection may be specific + // to that connection (e.g. typically the TCP port numbers). + let address = match endpoint { + ConnectedPoint::Dialer { address } => Some(address.clone()), + ConnectedPoint::Listener { .. } => None, + }; + self.connection_updated(source, address, NodeStatus::Connected); + } + KademliaHandlerEvent::FindNodeReq { key, request_id } => { let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { @@ -1804,7 +1816,7 @@ where fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll< NetworkBehaviourAction< - as ProtocolsHandler>::InEvent, + KademliaHandlerIn, Self::OutEvent, >, > { diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index da0689194df..db81ddf4675 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -1113,10 +1113,27 @@ fn network_behaviour_inject_address_change() { MemoryStore::new(local_peer_id), ); + let endpoint = ConnectedPoint::Dialer { address: old_address.clone() }; + + // Mimick a connection being established. kademlia.inject_connection_established( &remote_peer_id, &connection_id, - &ConnectedPoint::Dialer { address: old_address.clone() }, + &endpoint, + ); + kademlia.inject_connected(&remote_peer_id); + + // At this point the remote is not yet known to support the + // configured protocol name, so the peer is not yet in the + // local routing table and hence no addresses are known. + assert!(kademlia.addresses_of_peer(&remote_peer_id).is_empty()); + + // Mimick the connection handler confirming the protocol for + // the test connection, so that the peer is added to the routing table. + kademlia.inject_event( + remote_peer_id.clone(), + connection_id.clone(), + KademliaHandlerEvent::ProtocolConfirmed { endpoint } ); assert_eq!( diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index e30f1c43b94..a910610eec2 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -25,22 +25,54 @@ use crate::protocol::{ use crate::record::{self, Record}; use futures::prelude::*; use libp2p_swarm::{ - NegotiatedSubstream, + IntoProtocolsHandler, KeepAlive, + NegotiatedSubstream, SubstreamProtocol, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; use libp2p_core::{ + ConnectedPoint, + PeerId, either::EitherOutput, upgrade::{self, InboundUpgrade, OutboundUpgrade} }; use log::trace; -use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration}; +use std::{error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; -/// Protocol handler that handles Kademlia communications with the remote. +/// A prototype from which [`KademliaHandler`]s can be constructed. +pub struct KademliaHandlerProto { + config: KademliaHandlerConfig, + _type: PhantomData, +} + +impl KademliaHandlerProto { + pub fn new(config: KademliaHandlerConfig) -> Self { + KademliaHandlerProto { config, _type: PhantomData } + } +} + +impl IntoProtocolsHandler for KademliaHandlerProto { + type Handler = KademliaHandler; + + fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { + KademliaHandler::new(self.config, endpoint.clone()) + } + + fn inbound_protocol(&self) -> ::InboundProtocol { + if self.config.allow_listening { + upgrade::EitherUpgrade::A(self.config.protocol_config.clone()) + } else { + upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade) + } + } +} + +/// Protocol handler that manages substreams for the Kademlia protocol +/// on a single connection with a peer. /// /// The handler will automatically open a Kademlia substream with the remote for each request we /// make. @@ -58,6 +90,27 @@ pub struct KademliaHandler { /// Until when to keep the connection alive. keep_alive: KeepAlive, + + /// The connected endpoint of the connection that the handler + /// is associated with. + endpoint: ConnectedPoint, + + /// The current state of protocol confirmation. + protocol_status: ProtocolStatus, +} + +/// The states of protocol confirmation that a connection +/// handler transitions through. +enum ProtocolStatus { + /// It is as yet unknown whether the remote supports the + /// configured protocol name. + Unconfirmed, + /// The configured protocol name has been confirmed by the remote + /// but has not yet been reported to the `Kademlia` behaviour. + Confirmed, + /// The configured protocol has been confirmed by the remote + /// and the confirmation reported to the `Kademlia` behaviour. + Reported, } /// Configuration of a [`KademliaHandler`]. @@ -135,6 +188,15 @@ impl SubstreamState { /// Event produced by the Kademlia handler. #[derive(Debug)] pub enum KademliaHandlerEvent { + /// The configured protocol name has been confirmed by the peer through + /// a successfully negotiated substream. + /// + /// This event is only emitted once by a handler upon the first + /// successfully negotiated inbound or outbound substream and + /// indicates that the connected peer participates in the Kademlia + /// overlay network identified by the configured protocol name. + ProtocolConfirmed { endpoint: ConnectedPoint }, + /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes /// returned is not specified, but should be around 20. FindNodeReq { @@ -379,24 +441,20 @@ struct UniqueConnecId(u64); impl KademliaHandler { /// Create a [`KademliaHandler`] using the given configuration. - pub fn new(config: KademliaHandlerConfig) -> Self { + pub fn new(config: KademliaHandlerConfig, endpoint: ConnectedPoint) -> Self { let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); KademliaHandler { config, + endpoint, next_connec_unique_id: UniqueConnecId(0), substreams: Vec::new(), keep_alive, + protocol_status: ProtocolStatus::Unconfirmed, } } } -impl Default for KademliaHandler { - fn default() -> Self { - KademliaHandler::new(Default::default()) - } -} - impl ProtocolsHandler for KademliaHandler where TUserData: Clone + Send + 'static, @@ -423,8 +481,13 @@ where protocol: >::Output, (msg, user_data): Self::OutboundOpenInfo, ) { - self.substreams - .push(SubstreamState::OutPendingSend(protocol, msg, user_data)); + self.substreams.push(SubstreamState::OutPendingSend(protocol, msg, user_data)); + if let ProtocolStatus::Unconfirmed = self.protocol_status { + // Upon the first successfully negotiated substream, we know that the + // remote is configured with the same protocol name and we want + // the behaviour to add this peer to the routing table, if possible. + self.protocol_status = ProtocolStatus::Confirmed; + } } fn inject_fully_negotiated_inbound( @@ -442,8 +505,13 @@ where debug_assert!(self.config.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; - self.substreams - .push(SubstreamState::InWaitingMessage(connec_unique_id, protocol)); + self.substreams.push(SubstreamState::InWaitingMessage(connec_unique_id, protocol)); + if let ProtocolStatus::Unconfirmed = self.protocol_status { + // Upon the first successfully negotiated substream, we know that the + // remote is configured with the same protocol name and we want + // the behaviour to add this peer to the routing table, if possible. + self.protocol_status = ProtocolStatus::Confirmed; + } } fn inject_event(&mut self, message: KademliaHandlerIn) { @@ -618,6 +686,14 @@ where return Poll::Pending; } + if let ProtocolStatus::Confirmed = self.protocol_status { + self.protocol_status = ProtocolStatus::Reported; + return Poll::Ready(ProtocolsHandlerEvent::Custom( + KademliaHandlerEvent::ProtocolConfirmed { + endpoint: self.endpoint.clone() + })) + } + // We remove each element from `substreams` one by one and add them back. for n in (0..self.substreams.len()).rev() { let mut substream = self.substreams.swap_remove(n);