Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Some gossip improvements (#1892)
* queue messages in future

* use new gossip API in GRANDPA

* implement message_expired for grandpa

* fix indent
  • Loading branch information
rphmeier committed Feb 28, 2019
commit 584c7e28ef8ce06ff16cbb0ddcb5855aca11e115
85 changes: 51 additions & 34 deletions core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,42 @@ struct TopicTracker {
set_id: u64,
}

impl TopicTracker {
fn is_expired(&self, round: u64, set_id: u64) -> bool {
if set_id < self.set_id {
trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, self.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_set_id";
"set_id" => ?set_id, "ours" => ?self.set_id
);
return true;
} else if set_id == self.set_id + 1 {
// allow a few first rounds of future set.
if round > MESSAGE_ROUND_TOLERANCE {
trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, self.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set";
"round" => ?round, "ours" => ?self.set_id
);
return true;
}
} else if set_id == self.set_id {
if round < self.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) {
trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, self.min_live_round, self.max_round);
telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob";
"round" => ?round, "our_min_live_round" => ?self.min_live_round, "our_max_round" => ?self.max_round
);
return true;
}
} else {
trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, self.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set";
"set_id" => ?set_id, "ours" => ?self.set_id
);
return true;
}
false
}
}

struct GossipValidator<Block: BlockT> {
rounds: parking_lot::RwLock<TopicTracker>,
_marker: ::std::marker::PhantomData<Block>,
Expand Down Expand Up @@ -324,38 +360,7 @@ impl<Block: BlockT> GossipValidator<Block> {
}

fn is_expired(&self, round: u64, set_id: u64) -> bool {
let rounds = self.rounds.read();
if set_id < rounds.set_id {
trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_set_id";
"set_id" => ?set_id, "ours" => ?rounds.set_id
);
return true;
} else if set_id == rounds.set_id + 1 {
// allow a few first rounds of future set.
if round > MESSAGE_ROUND_TOLERANCE {
trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set";
"round" => ?round, "ours" => ?rounds.set_id
);
return true;
}
} else if set_id == rounds.set_id {
if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) {
trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round);
telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob";
"round" => ?round, "our_min_live_round" => ?rounds.min_live_round, "our_max_round" => ?rounds.max_round
);
return true;
}
} else {
trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set";
"set_id" => ?set_id, "ours" => ?rounds.set_id
);
return true;
}
false
self.rounds.read().is_expired(round, set_id)
}

fn validate_round_message(&self, full: VoteOrPrecommitMessage<Block>)
Expand Down Expand Up @@ -431,6 +436,18 @@ impl<Block: BlockT> network_gossip::Validator<Block::Hash> for GossipValidator<B
}
}
}

fn message_expired<'a>(&'a self) -> Box<FnMut(Block::Hash, &[u8]) -> bool + 'a> {
let rounds = self.rounds.read();
Box::new(move |_topic, mut data| {
match GossipMessage::<Block>::decode(&mut data) {
None => true,
Some(GossipMessage::Commit(full)) => rounds.is_expired(full.round, full.set_id),
Some(GossipMessage::VoteOrPrecommit(full)) =>
rounds.is_expired(full.round, full.set_id),
}
})
}
}

/// A handle to the network. This is generally implemented by providing some
Expand Down Expand Up @@ -506,7 +523,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B
self.validator.note_round(round, set_id);
let (tx, rx) = sync::oneshot::channel();
self.service.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(message_topic::<B>(round, set_id));
let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, message_topic::<B>(round, set_id));
let _ = tx.send(inner_rx);
});
NetworkStream { outer: rx, inner: None }
Expand All @@ -531,7 +548,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B
self.validator.note_set(set_id);
let (tx, rx) = sync::oneshot::channel();
self.service.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(commit_topic::<B>(set_id));
let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, commit_topic::<B>(set_id));
let _ = tx.send(inner_rx);
});
NetworkStream { outer: rx, inner: None }
Expand Down
10 changes: 8 additions & 2 deletions core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ impl Network<Block> for MessageRouting {
self.validator.note_round(round, set_id);
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id));
let messages = peer.consensus_gossip_messages_for(
GRANDPA_ENGINE_ID,
make_topic(round, set_id),
);

let messages = messages.map_err(
move |_| panic!("Messages for round {} dropped too early", round)
Expand Down Expand Up @@ -212,7 +215,10 @@ impl Network<Block> for MessageRouting {
self.validator.note_set(set_id);
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id));
let messages = peer.consensus_gossip_messages_for(
GRANDPA_ENGINE_ID,
make_commit_topic(set_id),
);

let messages = messages.map_err(
move |_| panic!("Commit messages for set {} dropped too early", set_id)
Expand Down
Loading