Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Next Next commit
grandpa: stricter gossip message filtering
  • Loading branch information
andresilva committed Nov 6, 2019
commit 6d319cd775905b09e56d1fee0a2094afd3254726
78 changes: 76 additions & 2 deletions core/finality-grandpa/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use substrate_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug, warn};
use futures::prelude::*;
use futures::sync::mpsc;
use rand::Rng;

use crate::{environment, CatchUp, CompactCommit, SignedMessage};
use super::{cost, benefit, Round, SetId};
Expand Down Expand Up @@ -483,6 +484,14 @@ impl<N: Ord> Peers<N> {
fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo<N>> {
self.inner.get(who)
}

fn authorities(&self) -> usize {
self.inner.iter().filter(|(_, info)| info.roles.is_authority()).count()
}

fn non_authorities(&self) -> usize {
self.inner.iter().filter(|(_, info)| !info.roles.is_authority()).count()
}
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -980,6 +989,64 @@ impl<Block: BlockT> Inner<Block> {

(true, report)
}

/// The initial logic for filtering messages follows the given state
/// transitions.
///
/// - State 0: not allowed to anyone (only if our local node is not an authority)
/// - State 1: allowed to random `sqrt(authorities)`
/// - State 2: allowed to all authorities
/// - State 3: allowed to random `sqrt(non-authorities)`
/// - State 4: allowed to all non-authorities
///
/// Transitions will be triggered on repropagation attempts by the
/// underlying gossip layer, which should happen every 30 seconds.
fn message_allowed<N>(&self, peer: &PeerInfo<N>, mut previous_attempts: usize) -> bool {
if !self.config.is_authority && previous_attempts == 0 {
// non-authority nodes don't gossip any messages right away. we
// assume that authorities (and sentries) are strongly connected, so
// it should be unnecessary for non-authorities to gossip all
// messages right away.
return false;
}

if !self.config.is_authority {
// since the node is not an authority we skipped the initial attempt
// to gossip the message, therefore we decrement `previous_attempts`
// so that the state machine below works the same way it does for
// authority nodes.
previous_attempts -= 1;
}

if peer.roles.is_authority() {
// the target node is an authority, on the first attempt we start by
// sending the message to only `sqrt(authorities)`.
if previous_attempts == 0 {
let authorities = self.peers.authorities() as f64;
let p = authorities.sqrt() / authorities;
rand::thread_rng().gen_bool(p)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not exactly same as selecting exactly sqrt random peers and sending to them. I think it is still fine here though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately there's no easy way to do this with the current interface, should be doable after refactoring consensus gossip.

} else {
// otherwise we already went through the step above, so
// we won't filter the message and send it to all
// authorities for whom it is polite to do so
true
}
} else {
// the node is not an authority so we apply stricter filters
if previous_attempts >= 3 {
// if we previously tried to send this message 3 (or more)
// times, then it is allowed to be sent to all peers.
true
} else if previous_attempts == 2 {
// otherwise we only send it to `sqrt(non-authorities)`.
let non_authorities = self.peers.non_authorities() as f64;
let p = non_authorities.sqrt() / non_authorities;
rand::thread_rng().gen_bool(p)
} else {
false
}
}
}
}

/// A validator for GRANDPA gossip messages.
Expand Down Expand Up @@ -1183,6 +1250,13 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
Some(x) => x,
};

if let MessageIntent::Broadcast { previous_attempts } = intent {
// early return if the message isn't allowed at this stage.
if !inner.message_allowed(peer, previous_attempts) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filter is currently applied before the GRANDPA politeness filter, should we do it afterwards instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is quicker, isn't it?

return false;
}
}

// if the topic is not something we're keeping at the moment,
// do not send.
let (maybe_round, set_id) = match inner.live_topics.topic_info(&topic) {
Expand All @@ -1209,8 +1283,8 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
Ok(GossipMessage::Commit(full)) => {
// we only broadcast our best commit and only if it's
// better than last received by peer.
Some(full.message.target_number) == our_best_commit
&& Some(full.message.target_number) > peer_best_commit
Some(full.message.target_number) == our_best_commit &&
Some(full.message.target_number) > peer_best_commit
}
Ok(GossipMessage::Neighbor(_)) => false,
Ok(GossipMessage::CatchUpRequest(_)) => false,
Expand Down
71 changes: 58 additions & 13 deletions core/network/src/protocol/consensus_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const UNREGISTERED_TOPIC_REPUTATION_CHANGE: i32 = -(1 << 10);

struct PeerConsensus<H> {
known_messages: HashSet<H>,
filtered_messages: HashMap<H, usize>,
roles: Roles,
}

Expand Down Expand Up @@ -105,8 +106,12 @@ pub enum MessageRecipient {
/// The reason for sending out the message.
#[derive(Eq, PartialEq, Copy, Clone)]
pub enum MessageIntent {
/// Requested broadcast
Broadcast,
/// Requested broadcast.
Broadcast {
/// How many times this message was previously filtered by the gossip
/// validator when trying to propagate to a given peer.
previous_attempts: usize
},
/// Requested broadcast to all peers.
ForcedBroadcast,
/// Periodic rebroadcast of all messages to all peers.
Expand All @@ -123,6 +128,12 @@ pub enum ValidationResult<H> {
Discard,
}

impl MessageIntent {
fn broadcast() -> MessageIntent {
MessageIntent::Broadcast { previous_attempts: 0 }
}
}

/// Validation context. Allows reacting to incoming messages by sending out further messages.
pub trait ValidatorContext<B: BlockT> {
/// Broadcast all messages with given topic to peers that do not have it yet.
Expand Down Expand Up @@ -196,28 +207,42 @@ fn propagate<'a, B: BlockT, I>(

for (message_hash, topic, message) in messages {
for (id, ref mut peer) in peers.iter_mut() {
let previous_attempts = peer.filtered_messages
.get(&message_hash)
.cloned()
.unwrap_or(0);

let intent = match intent {
MessageIntent::Broadcast =>
MessageIntent::Broadcast { .. } =>
if peer.known_messages.contains(&message_hash) {
continue
continue;
} else {
MessageIntent::Broadcast
MessageIntent::Broadcast { previous_attempts }
},
MessageIntent::PeriodicRebroadcast =>
if peer.known_messages.contains(&message_hash) {
MessageIntent::PeriodicRebroadcast
} else {
// peer doesn't know message, so the logic should treat it as an
// initial broadcast.
MessageIntent::Broadcast
MessageIntent::Broadcast { previous_attempts }
},
other => other,
};

if !message_allowed(id, intent, &topic, &message) {
continue
let count = peer.filtered_messages
.entry(message_hash.clone())
.or_insert(0);

*count += 1;

continue;
}

peer.filtered_messages.remove(message_hash);
peer.known_messages.insert(message_hash.clone());

trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
protocol.send_consensus(id.clone(), message.clone());
}
Expand Down Expand Up @@ -310,6 +335,7 @@ impl<B: BlockT> ConsensusGossip<B> {
trace!(target:"gossip", "Registering {:?} {}", roles, who);
self.peers.insert(who.clone(), PeerConsensus {
known_messages: HashSet::new(),
filtered_messages: HashMap::new(),
roles,
});
for (engine_id, v) in self.validators.clone() {
Expand Down Expand Up @@ -379,7 +405,7 @@ impl<B: BlockT> ConsensusGossip<B> {
.filter_map(|entry|
if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None }
);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
propagate(protocol, messages, intent, &mut self.peers, &self.validators);
}

Expand Down Expand Up @@ -527,17 +553,36 @@ impl<B: BlockT> ConsensusGossip<B> {
Some(validator) => validator.message_allowed(),
};

let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };

if let Some(ref mut peer) = self.peers.get_mut(who) {
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
let intent = if force {
MessageIntent::ForcedBroadcast
} else {
let previous_attempts = peer.filtered_messages
.get(&entry.message_hash)
.cloned()
.unwrap_or(0);

MessageIntent::Broadcast { previous_attempts }
};

if !force && peer.known_messages.contains(&entry.message_hash) {
continue
continue;
}

if !message_allowed(who, intent, &entry.topic, &entry.message.data) {
continue
let count = peer.filtered_messages
.entry(entry.message_hash)
.or_insert(0);

*count += 1;

continue;
}

peer.filtered_messages.remove(&entry.message_hash);
peer.known_messages.insert(entry.message_hash.clone());

trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
protocol.send_consensus(who.clone(), ConsensusMessage {
engine_id: engine_id.clone(),
Expand All @@ -557,7 +602,7 @@ impl<B: BlockT> ConsensusGossip<B> {
) {
let message_hash = HashFor::<B>::hash(&message.data);
self.register_message_hashed(message_hash, topic, message.clone(), None);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
}

Expand Down