Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
6 changes: 3 additions & 3 deletions client/consensus/beefy/src/aux_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_runtime::traits::Block as BlockT;
const VERSION_KEY: &[u8] = b"beefy_auxschema_version";
const WORKER_STATE_KEY: &[u8] = b"beefy_voter_state";

const CURRENT_VERSION: u32 = 2;
const CURRENT_VERSION: u32 = 3;

pub(crate) fn write_current_version<BE: AuxStore>(backend: &BE) -> ClientResult<()> {
info!(target: LOG_TARGET, "🥩 write aux schema version {:?}", CURRENT_VERSION);
Expand Down Expand Up @@ -63,8 +63,8 @@ where

match version {
None => (),
Some(1) => (), // version 1 is totally obsolete and should be simply ignored
Some(2) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE_KEY),
Some(1) | Some(2) => (), // versions 1 & 2 are obsolete and should be simply ignored
Some(3) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE_KEY),
other =>
return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))),
}
Expand Down
204 changes: 81 additions & 123 deletions client/consensus/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ use wasm_timer::Instant;
use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore, LOG_TARGET};
use sp_consensus_beefy::{
crypto::{Public, Signature},
VoteMessage,
ValidatorSetId, VoteMessage,
};

// Timeout for rebroadcasting messages.
const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
#[cfg(not(test))]
const REBROADCAST_AFTER: Duration = Duration::from_secs(60);
#[cfg(test)]
const REBROADCAST_AFTER: Duration = Duration::from_secs(5);

/// Gossip engine messages topic
pub(crate) fn topic<B: Block>() -> B::Hash
Expand All @@ -45,45 +48,51 @@ where
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy")
}

#[derive(Debug)]
pub(crate) struct GossipVoteFilter<B: Block> {
pub start: NumberFor<B>,
pub end: NumberFor<B>,
pub validator_set_id: ValidatorSetId,
}

/// A type that represents hash of the message.
pub type MessageHash = [u8; 8];

struct KnownVotes<B: Block> {
last_done: Option<NumberFor<B>>,
struct VotesFilter<B: Block> {
filter: Option<GossipVoteFilter<B>>,
live: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>,
}

impl<B: Block> KnownVotes<B> {
impl<B: Block> VotesFilter<B> {
pub fn new() -> Self {
Self { last_done: None, live: BTreeMap::new() }
}

/// Create new round votes set if not already present.
fn insert(&mut self, round: NumberFor<B>) {
self.live.entry(round).or_default();
Self { filter: None, live: BTreeMap::new() }
}

/// Remove `round` and older from live set, update `last_done` accordingly.
fn conclude(&mut self, round: NumberFor<B>) {
self.live.retain(|&number, _| number > round);
self.last_done = self.last_done.max(Some(round));
/// Update filter to new `start` and `set_id`.
fn update(&mut self, filter: GossipVoteFilter<B>) {
self.live.retain(|&round, _| round >= filter.start && round <= filter.end);
self.filter = Some(filter);
}

/// Return true if `round` is newer than previously concluded rounds.
/// Return true if `round` is >= than `max(session_start, best_beefy)`,
/// and vote set id matches session set id.
///
/// Latest concluded round is still considered alive to allow proper gossiping for it.
fn is_live(&self, round: &NumberFor<B>) -> bool {
Some(*round) >= self.last_done
fn is_live(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
self.filter
.as_ref()
.map(|f| set_id == f.validator_set_id && round >= f.start && round <= f.end)
.unwrap_or(false)
}

/// Add new _known_ `hash` to the round's known votes.
fn add_known(&mut self, round: &NumberFor<B>, hash: MessageHash) {
self.live.get_mut(round).map(|known| known.insert(hash));
fn add_known(&mut self, round: NumberFor<B>, hash: MessageHash) {
self.live.entry(round).or_default().insert(hash);
}

/// Check if `hash` is already part of round's known votes.
fn is_known(&self, round: &NumberFor<B>, hash: &MessageHash) -> bool {
self.live.get(round).map(|known| known.contains(hash)).unwrap_or(false)
fn is_known(&self, round: NumberFor<B>, hash: &MessageHash) -> bool {
self.live.get(&round).map(|known| known.contains(hash)).unwrap_or(false)
}
}

Expand All @@ -100,7 +109,7 @@ where
B: Block,
{
topic: B::Hash,
known_votes: RwLock<KnownVotes<B>>,
votes_filter: RwLock<VotesFilter<B>>,
next_rebroadcast: Mutex<Instant>,
known_peers: Arc<Mutex<KnownPeers<B>>>,
}
Expand All @@ -112,26 +121,18 @@ where
pub fn new(known_peers: Arc<Mutex<KnownPeers<B>>>) -> GossipValidator<B> {
GossipValidator {
topic: topic::<B>(),
known_votes: RwLock::new(KnownVotes::new()),
votes_filter: RwLock::new(VotesFilter::new()),
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
known_peers,
}
}

/// Note a voting round.
///
/// Noting round will track gossiped votes for `round`.
pub(crate) fn note_round(&self, round: NumberFor<B>) {
debug!(target: LOG_TARGET, "🥩 About to note gossip round #{}", round);
self.known_votes.write().insert(round);
}

/// Conclude a voting round.
/// Update gossip validator filter.
///
/// This can be called once round is complete so we stop gossiping for it.
pub(crate) fn conclude_round(&self, round: NumberFor<B>) {
debug!(target: LOG_TARGET, "🥩 About to drop gossip round #{}", round);
self.known_votes.write().conclude(round);
/// Only votes for `set_id` and rounds `start <= round <= end` will be accepted.
pub(crate) fn update_filter(&self, filter: GossipVoteFilter<B>) {
debug!(target: LOG_TARGET, "🥩 New gossip filter {:?}", filter);
self.votes_filter.write().update(filter);
}
}

Expand All @@ -152,25 +153,26 @@ where
if let Ok(msg) = VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
let msg_hash = twox_64(data);
let round = msg.commitment.block_number;
let set_id = msg.commitment.validator_set_id;
self.known_peers.lock().note_vote_for(*sender, round);

// Verify general usefulness of the message.
// We are going to discard old votes right away (without verification)
// Also we keep track of already received votes to avoid verifying duplicates.
{
let known_votes = self.known_votes.read();
let known_votes = self.votes_filter.read();

if !known_votes.is_live(&round) {
if !known_votes.is_live(round, set_id) {
return ValidationResult::Discard
}

if known_votes.is_known(&round, &msg_hash) {
if known_votes.is_known(round, &msg_hash) {
return ValidationResult::ProcessAndKeep(self.topic)
}
}

if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) {
self.known_votes.write().add_known(&round, msg_hash);
self.known_peers.lock().note_vote_for(*sender, round);
self.votes_filter.write().add_known(round, msg_hash);
return ValidationResult::ProcessAndKeep(self.topic)
} else {
// TODO: report peer
Expand All @@ -185,15 +187,16 @@ where
}

fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
let known_votes = self.known_votes.read();
let known_votes = self.votes_filter.read();
Box::new(move |_topic, mut data| {
let msg = match VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};

let round = msg.commitment.block_number;
let expired = !known_votes.is_live(&round);
let set_id = msg.commitment.validator_set_id;
let expired = !known_votes.is_live(round, set_id);

trace!(target: LOG_TARGET, "🥩 Message for round #{} expired: {}", round, expired);

Expand All @@ -208,14 +211,15 @@ where
let now = Instant::now();
let mut next_rebroadcast = self.next_rebroadcast.lock();
if now >= *next_rebroadcast {
trace!(target: LOG_TARGET, "🥩 Gossip rebroadcast");
*next_rebroadcast = now + REBROADCAST_AFTER;
true
} else {
false
}
};

let known_votes = self.known_votes.read();
let known_votes = self.votes_filter.read();
Box::new(move |_who, intent, _topic, mut data| {
if let MessageIntent::PeriodicRebroadcast = intent {
return do_rebroadcast
Expand All @@ -227,7 +231,8 @@ where
};

let round = msg.commitment.block_number;
let allowed = known_votes.is_live(&round);
let set_id = msg.commitment.validator_set_id;
let allowed = known_votes.is_live(round, set_id);

trace!(target: LOG_TARGET, "🥩 Message for round #{} allowed: {}", round, allowed);

Expand All @@ -252,81 +257,35 @@ mod tests {

#[test]
fn known_votes_insert_remove() {
let mut kv = KnownVotes::<Block>::new();
let mut kv = VotesFilter::<Block>::new();
let msg_hash = twox_64(b"data");

kv.insert(1);
kv.insert(1);
kv.insert(2);
kv.add_known(1, msg_hash);
kv.add_known(1, msg_hash);
kv.add_known(2, msg_hash);
assert_eq!(kv.live.len(), 2);

let mut kv = KnownVotes::<Block>::new();
kv.insert(1);
kv.insert(2);
kv.insert(3);
kv.add_known(3, msg_hash);
assert!(kv.is_known(3, &msg_hash));
assert!(!kv.is_known(3, &twox_64(b"other")));
assert!(!kv.is_known(4, &msg_hash));
assert_eq!(kv.live.len(), 3);

assert!(kv.last_done.is_none());
kv.conclude(2);
assert_eq!(kv.live.len(), 1);
assert!(!kv.live.contains_key(&2));
assert_eq!(kv.last_done, Some(2));
assert!(kv.filter.is_none());
assert!(!kv.is_live(1, 1));

kv.conclude(1);
assert_eq!(kv.last_done, Some(2));
kv.update(GossipVoteFilter { start: 3, end: 10, validator_set_id: 1 });
assert_eq!(kv.live.len(), 1);
assert!(kv.live.contains_key(&3));
assert!(!kv.is_live(2, 1));
assert!(kv.is_live(3, 1));
assert!(kv.is_live(4, 1));
assert!(!kv.is_live(4, 2));

kv.conclude(3);
assert_eq!(kv.last_done, Some(3));
kv.update(GossipVoteFilter { start: 5, end: 10, validator_set_id: 2 });
assert!(kv.live.is_empty());
}

#[test]
fn note_and_drop_round_works() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));

gv.note_round(1u64);

assert!(gv.known_votes.read().is_live(&1u64));

gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);

assert_eq!(gv.known_votes.read().live.len(), 4);

gv.conclude_round(7u64);

let votes = gv.known_votes.read();

// rounds 1 and 3 are outdated, don't gossip anymore
assert!(!votes.is_live(&1u64));
assert!(!votes.is_live(&3u64));
// latest concluded round is still gossiped
assert!(votes.is_live(&7u64));
// round 10 is alive and in-progress
assert!(votes.is_live(&10u64));
}

#[test]
fn note_same_round_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));

gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);

assert_eq!(gv.known_votes.read().live.len(), 3);

// note round #7 again -> should not change anything
gv.note_round(7u64);

let votes = gv.known_votes.read();

assert_eq!(votes.live.len(), 3);

assert!(votes.is_live(&3u64));
assert!(votes.is_live(&7u64));
assert!(votes.is_live(&10u64));
}

struct TestContext;
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
Expand Down Expand Up @@ -368,21 +327,18 @@ mod tests {
#[test]
fn should_avoid_verifying_signatures_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
let sender = sc_network::PeerId::random();
let mut context = TestContext;

let vote = dummy_vote(3);

gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);

// first time the cache should be populated
let res = gv.validate(&mut context, &sender, &vote.encode());

assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
assert_eq!(
gv.known_votes.read().live.get(&vote.commitment.block_number).map(|x| x.len()),
gv.votes_filter.read().live.get(&vote.commitment.block_number).map(|x| x.len()),
Some(1)
);

Expand All @@ -392,9 +348,11 @@ mod tests {
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));

// next we should quickly reject if the round is not live
gv.conclude_round(7_u64);
gv.update_filter(GossipVoteFilter { start: 7, end: 10, validator_set_id: 0 });

assert!(!gv.known_votes.read().is_live(&vote.commitment.block_number));
let number = vote.commitment.block_number;
let set_id = vote.commitment.validator_set_id;
assert!(!gv.votes_filter.read().is_live(number, set_id));

let res = gv.validate(&mut context, &sender, &vote.encode());

Expand All @@ -404,14 +362,13 @@ mod tests {
#[test]
fn messages_allowed_and_expired() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
let sender = sc_network::PeerId::random();
let topic = Default::default();
let intent = MessageIntent::Broadcast;

// note round 2 and 3, then conclude 2
gv.note_round(2u64);
gv.note_round(3u64);
gv.conclude_round(2u64);
// conclude 2
gv.update_filter(GossipVoteFilter { start: 2, end: 10, validator_set_id: 0 });
let mut allowed = gv.message_allowed();
let mut expired = gv.message_expired();

Expand Down Expand Up @@ -447,6 +404,7 @@ mod tests {
#[test]
fn messages_rebroadcast() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
let sender = sc_network::PeerId::random();
let topic = Default::default();

Expand Down
Loading