Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
client/beefy: remove high-freq network events from main loop
Network events are many and very frequent, remove the net-event-stream
from the main voter loop and drastically reduce BEEFY voter task
'wakeups'.

Instead have the `GossipValidator` track known peers as it already
has callbacks for that coming from `GossipEngine`.

Signed-off-by: acatangiu <[email protected]>
  • Loading branch information
acatangiu committed Nov 22, 2022
commit c7884561be27b92a6f0de08f35ece8655cef98e5
4 changes: 4 additions & 0 deletions client/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl<B> Validator<B> for GossipValidator<B>
where
B: Block,
{
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, who: &PeerId) {
self.known_peers.lock().remove(who);
}

fn validate(
&self,
_context: &mut dyn ValidatorContext<B>,
Expand Down
12 changes: 2 additions & 10 deletions client/beefy/src/communication/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ impl<B: Block> KnownPeers<B> {
Self { live: HashMap::new() }
}

/// Add new connected `peer`.
pub fn add_new(&mut self, peer: PeerId) {
self.live.entry(peer).or_default();
}

/// Note vote round number for `peer`.
pub fn note_vote_for(&mut self, peer: PeerId, round: NumberFor<B>) {
let data = self.live.entry(peer).or_default();
Expand Down Expand Up @@ -87,16 +82,13 @@ mod tests {
let mut peers = KnownPeers::<sc_network_test::Block>::new();
assert!(peers.live.is_empty());

// Alice and Bob new connected peers.
peers.add_new(alice);
peers.add_new(bob);
// 'Tracked' Bob seen voting for 5.
peers.note_vote_for(bob, 5);
// Previously unseen Charlie now seen voting for 10.
peers.note_vote_for(charlie, 10);

assert_eq!(peers.live.len(), 3);
assert!(peers.contains(&alice));
assert_eq!(peers.live.len(), 2);
assert!(!peers.contains(&alice));
assert!(peers.contains(&bob));
assert!(peers.contains(&charlie));

Expand Down
4 changes: 2 additions & 2 deletions client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,12 @@ where
None,
);

// The `GossipValidator` adds and removes known peers based on valid votes and network events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
runtime.clone(),
justifications_protocol_name,
known_peers.clone(),
known_peers,
);

let metrics =
Expand Down Expand Up @@ -286,7 +287,6 @@ where
payload_provider,
network,
key_store: key_store.into(),
known_peers,
gossip_engine,
gossip_validator,
on_demand_justifications,
Expand Down
92 changes: 27 additions & 65 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
fmt::Debug,
marker::PhantomData,
sync::Arc,
use crate::{
communication::{
gossip::{topic, GossipValidator},
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
},
error::Error,
justification::BeefyVersionedFinalityProof,
keystore::BeefyKeystore,
metric_inc, metric_set,
metrics::Metrics,
round::Rounds,
BeefyVoterLinks,
};
use beefy_primitives::{
crypto::{AuthorityId, Signature},
BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment,
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
};

use codec::{Codec, Decode, Encode};
use futures::{stream::Fuse, FutureExt, StreamExt};
use log::{debug, error, info, log_enabled, trace, warn};
use parking_lot::Mutex;

use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
use sc_network_common::{
protocol::event::Event as NetEvent,
service::{NetworkEventStream, NetworkRequest},
};
use sc_network_common::service::{NetworkEventStream, NetworkRequest};
use sc_network_gossip::GossipEngine;

use sc_utils::notification::NotificationReceiver;
use sp_api::{BlockId, ProvideRuntimeApi};
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
use sp_consensus::SyncOracle;
Expand All @@ -44,26 +50,11 @@ use sp_runtime::{
traits::{Block, Header, NumberFor, Zero},
SaturatedConversion,
};

use beefy_primitives::{
crypto::{AuthorityId, Signature},
BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment,
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
};
use sc_utils::notification::NotificationReceiver;

use crate::{
communication::{
gossip::{topic, GossipValidator},
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
},
error::Error,
justification::BeefyVersionedFinalityProof,
keystore::BeefyKeystore,
metric_inc, metric_set,
metrics::Metrics,
round::Rounds,
BeefyVoterLinks, KnownPeers,
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
fmt::Debug,
marker::PhantomData,
sync::Arc,
};

pub(crate) enum RoundAction {
Expand Down Expand Up @@ -253,7 +244,6 @@ pub(crate) struct WorkerParams<B: Block, BE, P, R, N> {
pub payload_provider: P,
pub network: N,
pub key_store: BeefyKeystore,
pub known_peers: Arc<Mutex<KnownPeers<B>>>,
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub on_demand_justifications: OnDemandJustificationsEngine<B, R>,
Expand Down Expand Up @@ -305,7 +295,6 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, R, N> {
key_store: BeefyKeystore,

// communication
known_peers: Arc<Mutex<KnownPeers<B>>>,
gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>,
on_demand_justifications: OnDemandJustificationsEngine<B, R>,
Expand Down Expand Up @@ -349,7 +338,6 @@ where
gossip_engine,
gossip_validator,
on_demand_justifications,
known_peers,
links,
metrics,
persisted_state,
Expand All @@ -359,7 +347,6 @@ where
backend,
payload_provider,
network,
known_peers,
key_store,
gossip_engine,
gossip_validator,
Expand Down Expand Up @@ -794,7 +781,6 @@ where
) {
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block());

let mut network_events = self.network.event_stream("network-gossip").fuse();
let mut votes = Box::pin(
self.gossip_engine
.messages_for(topic::<B>())
Expand Down Expand Up @@ -837,15 +823,6 @@ where
error!(target: "beefy", "🥩 Gossip engine has terminated, closing worker.");
return;
},
// Keep track of connected peers.
net_event = network_events.next() => {
if let Some(net_event) = net_event {
self.handle_network_event(net_event);
} else {
error!(target: "beefy", "🥩 Network events stream terminated, closing worker.");
return;
}
},
// Process finality notifications first since these drive the voter.
notification = finality_notifications.next() => {
if let Some(notification) = notification {
Expand Down Expand Up @@ -895,20 +872,6 @@ where
}
}
}

/// Update known peers based on network events.
fn handle_network_event(&mut self, event: NetEvent) {
match event {
NetEvent::SyncConnected { remote } => {
self.known_peers.lock().add_new(remote);
},
NetEvent::SyncDisconnected { remote } => {
self.known_peers.lock().remove(&remote);
},
// We don't care about other events.
_ => (),
}
}
}

/// Scan the `header` digest log for a BEEFY validator set change. Return either the new
Expand Down Expand Up @@ -976,11 +939,11 @@ pub(crate) mod tests {
create_beefy_keystore, get_beefy_streams, make_beefy_ids, two_validators::TestApi,
BeefyPeer, BeefyTestNet,
},
BeefyRPCLinks,
BeefyRPCLinks, KnownPeers,
};

use beefy_primitives::{known_payloads, mmr::MmrRootProvider};
use futures::{executor::block_on, future::poll_fn, task::Poll};
use parking_lot::Mutex;
use sc_client_api::{Backend as BackendT, HeaderBackend};
use sc_network::NetworkService;
use sc_network_test::TestNetFactory;
Expand Down Expand Up @@ -1058,7 +1021,7 @@ pub(crate) mod tests {
network.clone(),
api.clone(),
"/beefy/justifs/1".into(),
known_peers.clone(),
known_peers,
);
let at = BlockId::number(Zero::zero());
let genesis_header = backend.blockchain().expect_header(at).unwrap();
Expand All @@ -1074,7 +1037,6 @@ pub(crate) mod tests {
backend,
payload_provider,
key_store: Some(keystore).into(),
known_peers,
links,
gossip_engine,
gossip_validator,
Expand Down