Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
sc-consensus-beefy: report BEEFY gossip peer reputation changes
  • Loading branch information
acatangiu committed Apr 11, 2023
commit d17887b9d8ac7884609b1178646d0350f8f13aec
20 changes: 15 additions & 5 deletions client/consensus/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ use sp_runtime::traits::{Block, Hash, Header, NumberFor};
use codec::{Decode, Encode};
use log::{debug, trace};
use parking_lot::{Mutex, RwLock};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use wasm_timer::Instant;

use crate::{
communication::{benefit, cost, peers::KnownPeers},
communication::{
benefit, cost,
peers::{KnownPeers, PeerReport},
},
justification::{
proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof,
},
Expand Down Expand Up @@ -231,20 +235,26 @@ where
gossip_filter: RwLock<Filter<B>>,
next_rebroadcast: Mutex<Instant>,
known_peers: Arc<Mutex<KnownPeers<B>>>,
report_sender: TracingUnboundedSender<PeerReport>,
}

impl<B> GossipValidator<B>
where
B: Block,
{
pub fn new(known_peers: Arc<Mutex<KnownPeers<B>>>) -> GossipValidator<B> {
GossipValidator {
pub(crate) fn new(
known_peers: Arc<Mutex<KnownPeers<B>>>,
) -> (GossipValidator<B>, TracingUnboundedReceiver<PeerReport>) {
let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 10_000);
let val = GossipValidator {
votes_topic: votes_topic::<B>(),
justifs_topic: proofs_topic::<B>(),
gossip_filter: RwLock::new(Filter::new()),
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
known_peers,
}
report_sender: tx,
};
(val, rx)
}

/// Update gossip validator filter.
Expand All @@ -256,7 +266,7 @@ where
}

fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
// TODO
let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
}

fn validate_vote(
Expand Down
8 changes: 7 additions & 1 deletion client/consensus/beefy/src/communication/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
// TODO (issue #12296): replace this naive peer tracking with generic one that infers data
// from multiple network protocols.

use sc_network::PeerId;
use sc_network::{PeerId, ReputationChange};
use sp_runtime::traits::{Block, NumberFor, Zero};
use std::collections::{HashMap, VecDeque};

/// Report specifying a reputation change for a given peer.
pub(crate) struct PeerReport {
pub who: PeerId,
pub cost_benefit: ReputationChange,
}

struct PeerData<B: Block> {
last_voted_on: NumberFor<B>,
}
Expand Down
8 changes: 5 additions & 3 deletions client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,10 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let gossip_validator =
Arc::new(communication::gossip::GossipValidator::new(known_peers.clone()));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name,
Expand Down Expand Up @@ -303,6 +304,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
key_store: key_store.into(),
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
links,
metrics,
Expand Down
19 changes: 18 additions & 1 deletion client/consensus/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::{
communication::{
gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator},
peers::PeerReport,
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
},
error::Error,
Expand All @@ -34,7 +35,7 @@ use futures::{stream::Fuse, FutureExt, StreamExt};
use log::{debug, error, info, log_enabled, trace, warn};
use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
use sc_network_gossip::GossipEngine;
use sc_utils::notification::NotificationReceiver;
use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver};
use sp_api::{BlockId, ProvideRuntimeApi};
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
use sp_consensus::SyncOracle;
Expand Down Expand Up @@ -263,6 +264,7 @@ pub(crate) struct WorkerParams<B: Block, BE, P, R, S> {
pub key_store: BeefyKeystore,
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
pub links: BeefyVoterLinks<B>,
pub metrics: Option<VoterMetrics>,
Expand Down Expand Up @@ -320,6 +322,7 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
// communication
gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>,
gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
on_demand_justifications: OnDemandJustificationsEngine<B>,

// channels
Expand Down Expand Up @@ -359,6 +362,7 @@ where
sync,
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
links,
metrics,
Expand All @@ -373,6 +377,7 @@ where
key_store,
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
links,
metrics,
Expand Down Expand Up @@ -849,7 +854,12 @@ where
// Act on changed 'state'.
self.process_new_state();

// Mutable reference used to drive the gossip engine.
let mut gossip_engine = &mut self.gossip_engine;
// Use temp val and report after async section,
// to avoid having to Mutex-wrap `gossip_engine`.
let mut gossip_report: Option<PeerReport> = None;

// Wait for, and handle external events.
// The branches below only change 'state', actual voting happens afterwards,
// based on the new resulting 'state'.
Expand Down Expand Up @@ -918,6 +928,13 @@ where
return;
}
},
// Process peer reports.
report = self.gossip_report_stream.next() => {
gossip_report = report;
},
}
if let Some(PeerReport { who, cost_benefit }) = gossip_report {
self.gossip_engine.report(who, cost_benefit);
}
}
}
Expand Down