@@ -52,11 +52,15 @@ use wasm_timer::Instant;
5252
5353pub use crate :: query:: QueryStats ;
5454
55- /// Network behaviour that handles Kademlia.
55+ /// `Kademlia` is a `NetworkBehaviour` that implements the libp2p
56+ /// Kademlia protocol.
5657pub struct Kademlia < TStore > {
5758 /// The Kademlia routing table.
5859 kbuckets : KBucketsTable < kbucket:: Key < PeerId > , Addresses > ,
5960
61+ /// The k-bucket insertion strategy.
62+ kbucket_inserts : KademliaBucketInserts ,
63+
6064 /// Configuration of the wire protocol.
6165 protocol_config : KademliaProtocolConfig ,
6266
@@ -92,6 +96,30 @@ pub struct Kademlia<TStore> {
9296 store : TStore ,
9397}
9498
99+ /// The configurable strategies for the insertion of peers
100+ /// and their addresses into the k-buckets of the Kademlia
101+ /// routing table.
102+ #[ derive( Copy , Clone , Debug , PartialEq , Eq ) ]
103+ pub enum KademliaBucketInserts {
104+ /// Whenever a connection to a peer is established as a
105+ /// result of a dialing attempt and that peer is not yet
106+ /// in the routing table, it is inserted as long as there
107+ /// is a free slot in the corresponding k-bucket. If the
108+ /// k-bucket is full but still has a free pending slot,
109+ /// it may be inserted into the routing table at a later time if an unresponsive
110+ /// disconnected peer is evicted from the bucket.
111+ OnConnected ,
112+ /// New peers and addresses are only added to the routing table via
113+ /// explicit calls to [`Kademlia::add_address`].
114+ ///
115+ /// > **Note**: Even though peers can only get into the
116+ /// > routing table as a result of [`Kademlia::add_address`],
117+ /// > routing table entries are still updated as peers
118+ /// > connect and disconnect (i.e. the order of the entries
119+ /// > as well as the network addresses).
120+ Manual ,
121+ }
122+
95123/// The configuration for the `Kademlia` behaviour.
96124///
97125/// The configuration is consumed by [`Kademlia::new`].
@@ -106,6 +134,7 @@ pub struct KademliaConfig {
106134 provider_record_ttl : Option < Duration > ,
107135 provider_publication_interval : Option < Duration > ,
108136 connection_idle_timeout : Duration ,
137+ kbucket_inserts : KademliaBucketInserts ,
109138}
110139
111140impl Default for KademliaConfig {
@@ -120,6 +149,7 @@ impl Default for KademliaConfig {
120149 provider_publication_interval : Some ( Duration :: from_secs ( 12 * 60 * 60 ) ) ,
121150 provider_record_ttl : Some ( Duration :: from_secs ( 24 * 60 * 60 ) ) ,
122151 connection_idle_timeout : Duration :: from_secs ( 10 ) ,
152+ kbucket_inserts : KademliaBucketInserts :: OnConnected ,
123153 }
124154 }
125155}
@@ -275,6 +305,12 @@ impl KademliaConfig {
275305 self . protocol_config . set_max_packet_size ( size) ;
276306 self
277307 }
308+
309+ /// Sets the k-bucket insertion strategy for the Kademlia routing table.
310+ pub fn set_kbucket_inserts ( & mut self , inserts : KademliaBucketInserts ) -> & mut Self {
311+ self . kbucket_inserts = inserts;
312+ self
313+ }
278314}
279315
280316impl < TStore > Kademlia < TStore >
@@ -312,6 +348,7 @@ where
312348 Kademlia {
313349 store,
314350 kbuckets : KBucketsTable :: new ( local_key, config. kbucket_pending_timeout ) ,
351+ kbucket_inserts : config. kbucket_inserts ,
315352 protocol_config : config. protocol_config ,
316353 queued_events : VecDeque :: with_capacity ( config. query_config . replication_factor . get ( ) ) ,
317354 queries : QueryPool :: new ( config. query_config ) ,
@@ -381,7 +418,7 @@ where
381418 ///
382419 /// If the routing table has been updated as a result of this operation,
383420 /// a [`KademliaEvent::RoutingUpdated`] event is emitted.
384- pub fn add_address ( & mut self , peer : & PeerId , address : Multiaddr ) {
421+ pub fn add_address ( & mut self , peer : & PeerId , address : Multiaddr ) -> RoutingUpdate {
385422 let key = kbucket:: Key :: new ( peer. clone ( ) ) ;
386423 match self . kbuckets . entry ( & key) {
387424 kbucket:: Entry :: Present ( mut entry, _) => {
@@ -394,9 +431,11 @@ where
394431 }
395432 ) )
396433 }
434+ RoutingUpdate :: Success
397435 }
398436 kbucket:: Entry :: Pending ( mut entry, _) => {
399437 entry. value ( ) . insert ( address) ;
438+ RoutingUpdate :: Pending
400439 }
401440 kbucket:: Entry :: Absent ( entry) => {
402441 let addresses = Addresses :: new ( address) ;
@@ -415,26 +454,97 @@ where
415454 old_peer : None ,
416455 }
417456 ) ) ;
457+ RoutingUpdate :: Success
418458 } ,
419459 kbucket:: InsertResult :: Full => {
420- debug ! ( "Bucket full. Peer not added to routing table: {}" , peer)
460+ debug ! ( "Bucket full. Peer not added to routing table: {}" , peer) ;
461+ RoutingUpdate :: Failed
421462 } ,
422463 kbucket:: InsertResult :: Pending { disconnected } => {
423464 self . queued_events . push_back ( NetworkBehaviourAction :: DialPeer {
424465 peer_id : disconnected. into_preimage ( ) ,
425466 condition : DialPeerCondition :: Disconnected
426- } )
467+ } ) ;
468+ RoutingUpdate :: Pending
427469 } ,
428470 }
429471 } ,
430- kbucket:: Entry :: SelfEntry => { } ,
472+ kbucket:: Entry :: SelfEntry => RoutingUpdate :: Failed ,
431473 }
432474 }
433475
434- /// Returns an iterator over all peer IDs of nodes currently contained in a bucket
435- /// of the Kademlia routing table.
436- pub fn kbuckets_entries ( & mut self ) -> impl Iterator < Item = & PeerId > {
437- self . kbuckets . iter ( ) . map ( |entry| entry. node . key . preimage ( ) )
476+ /// Removes an address of a peer from the routing table.
477+ ///
478+ /// If the given address is the last address of the peer in the
479+ /// routing table, the peer is removed from the routing table
480+ /// and `Some` is returned with a view of the removed entry.
481+ /// The same applies if the peer is currently pending insertion
482+ /// into the routing table.
483+ ///
484+ /// If the given peer or address is not in the routing table,
485+ /// this is a no-op.
486+ pub fn remove_address ( & mut self , peer : & PeerId , address : & Multiaddr )
487+ -> Option < kbucket:: EntryView < kbucket:: Key < PeerId > , Addresses > >
488+ {
489+ let key = kbucket:: Key :: new ( peer. clone ( ) ) ;
490+ match self . kbuckets . entry ( & key) {
491+ kbucket:: Entry :: Present ( mut entry, _) => {
492+ if entry. value ( ) . remove ( address) . is_err ( ) {
493+ Some ( entry. remove ( ) ) // it is the last address, thus remove the peer.
494+ } else {
495+ None
496+ }
497+ }
498+ kbucket:: Entry :: Pending ( mut entry, _) => {
499+ if entry. value ( ) . remove ( address) . is_err ( ) {
500+ Some ( entry. remove ( ) ) // it is the last address, thus remove the peer.
501+ } else {
502+ None
503+ }
504+ }
505+ kbucket:: Entry :: Absent ( ..) | kbucket:: Entry :: SelfEntry => {
506+ None
507+ }
508+ }
509+ }
510+
511+ /// Removes a peer from the routing table.
512+ ///
513+ /// Returns `None` if the peer was not in the routing table,
514+ /// not even pending insertion.
515+ pub fn remove_peer ( & mut self , peer : & PeerId )
516+ -> Option < kbucket:: EntryView < kbucket:: Key < PeerId > , Addresses > >
517+ {
518+ let key = kbucket:: Key :: new ( peer. clone ( ) ) ;
519+ match self . kbuckets . entry ( & key) {
520+ kbucket:: Entry :: Present ( entry, _) => {
521+ Some ( entry. remove ( ) )
522+ }
523+ kbucket:: Entry :: Pending ( entry, _) => {
524+ Some ( entry. remove ( ) )
525+ }
526+ kbucket:: Entry :: Absent ( ..) | kbucket:: Entry :: SelfEntry => {
527+ None
528+ }
529+ }
530+ }
531+
532+ /// Returns an iterator over all non-empty buckets in the routing table.
533+ pub fn kbuckets ( & mut self )
534+ -> impl Iterator < Item = kbucket:: KBucketRef < kbucket:: Key < PeerId > , Addresses > >
535+ {
536+ self . kbuckets . iter ( ) . filter ( |b| !b. is_empty ( ) )
537+ }
538+
539+ /// Returns the k-bucket for the distance to the given key.
540+ ///
541+ /// Returns `None` if the given key refers to the local key.
542+ pub fn kbucket < K > ( & mut self , key : K )
543+ -> Option < kbucket:: KBucketRef < kbucket:: Key < PeerId > , Addresses > >
544+ where
545+ K : Borrow < [ u8 ] > + Clone
546+ {
547+ self . kbuckets . bucket ( & kbucket:: Key :: new ( key) )
438548 }
439549
440550 /// Initiates an iterative query for the closest peers to the given key.
@@ -723,7 +833,7 @@ where
723833 self . queries . add_iter_closest ( target. clone ( ) , peers, inner) ;
724834 }
725835
726- /// Updates the connection status of a peer in the Kademlia routing table .
836+ /// Updates the routing table with a new connection status and address of a peer .
727837 fn connection_updated ( & mut self , peer : PeerId , address : Option < Multiaddr > , new_status : NodeStatus ) {
728838 let key = kbucket:: Key :: new ( peer. clone ( ) ) ;
729839 match self . kbuckets . entry ( & key) {
@@ -755,9 +865,22 @@ where
755865
756866 kbucket:: Entry :: Absent ( entry) => {
757867 // Only connected nodes with a known address are newly inserted.
758- if new_status == NodeStatus :: Connected {
759- if let Some ( address) = address {
760- let addresses = Addresses :: new ( address) ;
868+ if new_status != NodeStatus :: Connected {
869+ return
870+ }
871+ match ( address, self . kbucket_inserts ) {
872+ ( None , _) => {
873+ self . queued_events . push_back ( NetworkBehaviourAction :: GenerateEvent (
874+ KademliaEvent :: UnroutablePeer { peer }
875+ ) ) ;
876+ }
877+ ( Some ( a) , KademliaBucketInserts :: Manual ) => {
878+ self . queued_events . push_back ( NetworkBehaviourAction :: GenerateEvent (
879+ KademliaEvent :: RoutablePeer { peer, address : a }
880+ ) ) ;
881+ }
882+ ( Some ( a) , KademliaBucketInserts :: OnConnected ) => {
883+ let addresses = Addresses :: new ( a) ;
761884 match entry. insert ( addresses. clone ( ) , new_status) {
762885 kbucket:: InsertResult :: Inserted => {
763886 let event = KademliaEvent :: RoutingUpdated {
@@ -769,20 +892,24 @@ where
769892 NetworkBehaviourAction :: GenerateEvent ( event) ) ;
770893 } ,
771894 kbucket:: InsertResult :: Full => {
772- debug ! ( "Bucket full. Peer not added to routing table: {}" , peer)
895+ debug ! ( "Bucket full. Peer not added to routing table: {}" , peer) ;
896+ let address = addresses. first ( ) . clone ( ) ;
897+ self . queued_events . push_back ( NetworkBehaviourAction :: GenerateEvent (
898+ KademliaEvent :: RoutablePeer { peer, address }
899+ ) ) ;
773900 } ,
774901 kbucket:: InsertResult :: Pending { disconnected } => {
775902 debug_assert ! ( !self . connected_peers. contains( disconnected. preimage( ) ) ) ;
903+ let address = addresses. first ( ) . clone ( ) ;
904+ self . queued_events . push_back ( NetworkBehaviourAction :: GenerateEvent (
905+ KademliaEvent :: PendingRoutablePeer { peer, address }
906+ ) ) ;
776907 self . queued_events . push_back ( NetworkBehaviourAction :: DialPeer {
777908 peer_id : disconnected. into_preimage ( ) ,
778909 condition : DialPeerCondition :: Disconnected
779910 } )
780911 } ,
781912 }
782- } else {
783- self . queued_events . push_back ( NetworkBehaviourAction :: GenerateEvent (
784- KademliaEvent :: UnroutablePeer { peer }
785- ) ) ;
786913 }
787914 }
788915 } ,
@@ -806,8 +933,8 @@ where
806933 // a bucket refresh should be performed for every bucket farther away than
807934 // the first non-empty bucket (which are most likely no more than the last
808935 // few, i.e. farthest, buckets).
809- self . kbuckets . buckets ( )
810- . skip_while ( |b| b. num_entries ( ) == 0 )
936+ self . kbuckets . iter ( )
937+ . skip_while ( |b| b. is_empty ( ) )
811938 . skip ( 1 ) // Skip the bucket with the closest neighbour.
812939 . map ( |b| {
813940 // Try to find a key that falls into the bucket. While such keys can
@@ -1770,10 +1897,42 @@ pub enum KademliaEvent {
17701897
17711898 /// A peer has connected for whom no listen address is known.
17721899 ///
1773- /// If the peer is to be added to the local node's routing table, a known
1900+ /// If the peer is to be added to the routing table, a known
17741901 /// listen address for the peer must be provided via [`Kademlia::add_address`].
17751902 UnroutablePeer {
17761903 peer : PeerId
1904+ } ,
1905+
1906+ /// A connection to a peer has been established for whom a listen address
1907+ /// is known but the peer has not been added to the routing table either
1908+ /// because [`KademliaBucketInserts::Manual`] is configured or because
1909+ /// the corresponding bucket is full.
1910+ ///
1911+ /// If the peer is to be included in the routing table, it must
1912+ /// must be explicitly added via [`Kademlia::add_address`], possibly after
1913+ /// removing another peer.
1914+ ///
1915+ /// See [`Kademlia::kbucket`] for insight into the contents of
1916+ /// the k-bucket of `peer`.
1917+ RoutablePeer {
1918+ peer : PeerId ,
1919+ address : Multiaddr ,
1920+ } ,
1921+
1922+ /// A connection to a peer has been established for whom a listen address
1923+ /// is known but the peer is only pending insertion into the routing table
1924+ /// if the least-recently disconnected peer is unresponsive, i.e. the peer
1925+ /// may not make it into the routing table.
1926+ ///
1927+ /// If the peer is to be unconditionally included in the routing table,
1928+ /// it should be explicitly added via [`Kademlia::add_address`] after
1929+ /// removing another peer.
1930+ ///
1931+ /// See [`Kademlia::kbucket`] for insight into the contents of
1932+ /// the k-bucket of `peer`.
1933+ PendingRoutablePeer {
1934+ peer : PeerId ,
1935+ address : Multiaddr ,
17771936 }
17781937}
17791938
@@ -2294,3 +2453,22 @@ impl fmt::Display for NoKnownPeers {
22942453}
22952454
22962455impl std:: error:: Error for NoKnownPeers { }
2456+
2457+ /// The possible outcomes of [`Kademlia::add_address`].
2458+ pub enum RoutingUpdate {
2459+ /// The given peer and address has been added to the routing
2460+ /// table.
2461+ Success ,
2462+ /// The peer and address is pending insertion into
2463+ /// the routing table, if a disconnected peer fails
2464+ /// to respond. If the given peer and address ends up
2465+ /// in the routing table, [`KademliaEvent::RoutingUpdated`]
2466+ /// is eventually emitted.
2467+ Pending ,
2468+ /// The routing table update failed, either because the
2469+ /// corresponding bucket for the peer is full and the
2470+ /// pending slot(s) are occupied, or because the given
2471+ /// peer ID is deemed invalid (e.g. refers to the local
2472+ /// peer ID).
2473+ Failed ,
2474+ }
0 commit comments