Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix tests
  • Loading branch information
blacktemplar committed Dec 11, 2020
commit 30a6cd339563ff2bdef3d3971f3c0435f2e6be41
130 changes: 65 additions & 65 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,61 @@ where
}
}

/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
/// that gets as input the number of filtered peers.
fn get_random_peers_dynamic(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
peer_protocols: &HashMap<PeerId, PeerKind>,
topic_hash: &TopicHash,
// maps the number of total peers to the number of selected peers
n_map: impl Fn(usize) -> usize,
mut f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
let mut gossip_peers = match topic_peers.get(topic_hash) {
// if they exist, filter the peers by `f`
Some(peer_list) => peer_list
.iter()
.cloned()
.filter(|p| {
f(p) && match peer_protocols.get(p) {
Some(PeerKind::Gossipsub) => true,
Some(PeerKind::Gossipsubv1_1) => true,
_ => false,
}
})
.collect(),
None => Vec::new(),
};

// if we have less than needed, return them
let n = n_map(gossip_peers.len());
if gossip_peers.len() <= n {
debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
return gossip_peers.into_iter().collect();
}

// we have more peers than needed, shuffle them and return n of them
let mut rng = thread_rng();
gossip_peers.partial_shuffle(&mut rng, n);

debug!("RANDOM PEERS: Got {:?} peers", n);

gossip_peers.into_iter().take(n).collect()
}

/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
/// filtered by the function `f`.
fn get_random_peers(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
peer_protocols: &HashMap<PeerId, PeerKind>,
topic_hash: &TopicHash,
n: usize,
f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
get_random_peers_dynamic(topic_peers, peer_protocols, topic_hash, |_| n, f)
}

impl<T, F> Gossipsub<T, F>
where
T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>,
Expand Down Expand Up @@ -593,7 +648,7 @@ where
} else {
// We have no fanout peers, select mesh_n of them and add them to the fanout
let mesh_n = self.config.mesh_n();
let new_peers = Self::get_random_peers(
let new_peers = get_random_peers(
&self.topic_peers,
&self.peer_protocols,
&topic_hash,
Expand Down Expand Up @@ -829,7 +884,7 @@ where
// check if we need to get more peers, which we randomly select
if added_peers.len() < self.config.mesh_n() {
// get the peers
let new_peers = Self::get_random_peers(
let new_peers = get_random_peers(
&self.topic_peers,
&self.peer_protocols,
topic_hash,
Expand Down Expand Up @@ -902,7 +957,7 @@ where

// Select peers for peer exchange
let peers = if do_px {
Self::get_random_peers(
get_random_peers(
&self.topic_peers,
&self.peer_protocols,
&topic_hash,
Expand Down Expand Up @@ -1809,7 +1864,7 @@ where
);
// not enough peers - get mesh_n - current_length more
let desired_peers = self.config.mesh_n() - peers.len();
let peer_list = Self::get_random_peers(
let peer_list = get_random_peers(
topic_peers,
&self.peer_protocols,
topic_hash,
Expand Down Expand Up @@ -1891,7 +1946,7 @@ where
// if we have not enough outbound peers, graft to some new outbound peers
if outbound < self.config.mesh_outbound_min() {
let needed = self.config.mesh_outbound_min() - outbound;
let peer_list = Self::get_random_peers(
let peer_list = get_random_peers(
topic_peers,
&self.peer_protocols,
topic_hash,
Expand Down Expand Up @@ -1948,7 +2003,7 @@ where
// if the median score is below the threshold, select a better peer (if any) and
// GRAFT
if median < thresholds.opportunistic_graft_threshold {
let peer_list = Self::get_random_peers(
let peer_list = get_random_peers(
topic_peers,
&self.peer_protocols,
topic_hash,
Expand Down Expand Up @@ -2032,7 +2087,7 @@ where
);
let needed_peers = self.config.mesh_n() - peers.len();
let explicit_peers = &self.explicit_peers;
let new_peers = Self::get_random_peers(
let new_peers = get_random_peers(
&self.topic_peers,
&self.peer_protocols,
topic_hash,
Expand Down Expand Up @@ -2126,7 +2181,7 @@ where
)
};
// get gossip_lazy random peers
let to_msg_peers = Self::get_random_peers_dynamic(
let to_msg_peers = get_random_peers_dynamic(
&self.topic_peers,
&self.peer_protocols,
&topic_hash,
Expand Down Expand Up @@ -2403,61 +2458,6 @@ where
}
}

/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
/// that gets as input the number of filtered peers.
fn get_random_peers_dynamic(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
peer_protocols: &HashMap<PeerId, PeerKind>,
topic_hash: &TopicHash,
// maps the number of total peers to the number of selected peers
n_map: impl Fn(usize) -> usize,
mut f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
let mut gossip_peers = match topic_peers.get(topic_hash) {
// if they exist, filter the peers by `f`
Some(peer_list) => peer_list
.iter()
.cloned()
.filter(|p| {
f(p) && match peer_protocols.get(p) {
Some(PeerKind::Gossipsub) => true,
Some(PeerKind::Gossipsubv1_1) => true,
_ => false,
}
})
.collect(),
None => Vec::new(),
};

// if we have less than needed, return them
let n = n_map(gossip_peers.len());
if gossip_peers.len() <= n {
debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
return gossip_peers.into_iter().collect();
}

// we have more peers than needed, shuffle them and return n of them
let mut rng = thread_rng();
gossip_peers.partial_shuffle(&mut rng, n);

debug!("RANDOM PEERS: Got {:?} peers", n);

gossip_peers.into_iter().take(n).collect()
}

/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
/// filtered by the function `f`.
fn get_random_peers(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
peer_protocols: &HashMap<PeerId, PeerKind>,
topic_hash: &TopicHash,
n: usize,
f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
Self::get_random_peers_dynamic(topic_peers, peer_protocols, topic_hash, |_| n, f)
}

// adds a control action to control_pool
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
Expand Down Expand Up @@ -3178,7 +3178,7 @@ mod local_test {
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();
let gs = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();

// Message under the limit should be fine.
let mut rpc = empty_rpc();
Expand Down Expand Up @@ -3226,7 +3226,7 @@ mod local_test {
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();
let gs = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();

let mut length_codec = unsigned_varint::codec::UviBytes::default();
length_codec.set_max_len(max_transmit_size);
Expand Down
Loading