Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Pump the gossip engine while waiting for the BEEFY runtime pallet
This fixes a memory leak when the BEEFY gadget is turned on, but
the runtime doesn't actually use BEEFY.
  • Loading branch information
koute committed Jun 17, 2022
commit b5d36a6bc57c7e0965d0871b194f4e65c26fc15e
46 changes: 27 additions & 19 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ use std::{
};

use codec::{Codec, Decode, Encode};
use futures::{future, FutureExt, StreamExt};
use futures::{FutureExt, StreamExt};
use log::{debug, error, info, log_enabled, trace, warn};
use parking_lot::Mutex;

use sc_client_api::{Backend, FinalityNotification, FinalityNotifications};
use sc_network_gossip::GossipEngine;
Expand Down Expand Up @@ -80,7 +79,7 @@ pub(crate) struct BeefyWorker<B: Block, BE, C, R, SO> {
runtime: Arc<R>,
key_store: BeefyKeystore,
signed_commitment_sender: BeefySignedCommitmentSender<B>,
gossip_engine: Arc<Mutex<GossipEngine<B>>>,
gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>,
/// Min delta in block numbers between two blocks, BEEFY should vote on
min_block_delta: u32,
Expand Down Expand Up @@ -143,7 +142,7 @@ where
runtime,
key_store,
signed_commitment_sender,
gossip_engine: Arc::new(Mutex::new(gossip_engine)),
gossip_engine,
gossip_validator,
// always target at least one block better than current best beefy
min_block_delta: min_block_delta.max(1),
Expand Down Expand Up @@ -471,15 +470,21 @@ where
true,
);

self.gossip_engine.lock().gossip_message(topic::<B>(), encoded_message, false);
self.gossip_engine.gossip_message(topic::<B>(), encoded_message, false);
}

/// Wait for BEEFY runtime pallet to be available.
async fn wait_for_runtime_pallet(&mut self) {
self.client
.finality_notification_stream()
.take_while(|notif| {
let at = BlockId::hash(notif.header.hash());
let gossip_engine = &mut self.gossip_engine;
let mut finality_stream = self.client.finality_notification_stream();
loop {
futures::select! {
notif = finality_stream.next().fuse() => {
let notif = match notif {
Some(notif) => notif,
None => break
};
let at = BlockId::hash(notif.header.hash());
if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() {
if active.id() == GENESIS_AUTHORITY_SET_ID {
// When starting from genesis, there is no session boundary digest.
Expand All @@ -490,16 +495,19 @@ where
// worker won't vote until it witnesses a session change.
// Once we'll implement 'initial sync' (catch-up), the worker will be able to
// start voting right away.
self.handle_finality_notification(notif);
future::ready(false)
self.handle_finality_notification(&notif);
break
} else {
trace!(target: "beefy", "🥩 Finality notification: {:?}", notif);
debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available...");
future::ready(true)
}
})
.for_each(|_| future::ready(()))
.await;
},
_ = gossip_engine.fuse() => {
break
}
}
}

// get a new stream that provides _new_ notifications (from here on out)
self.finality_notifications = self.client.finality_notification_stream();
}
Expand All @@ -512,7 +520,7 @@ where
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number());
self.wait_for_runtime_pallet().await;

let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::<B>()).filter_map(
let mut votes = Box::pin(self.gossip_engine.messages_for(topic::<B>()).filter_map(
|notification| async move {
trace!(target: "beefy", "🥩 Got vote message: {:?}", notification);

Expand All @@ -529,11 +537,11 @@ where
futures_timer::Delay::new(Duration::from_secs(5)).await;
}

let engine = self.gossip_engine.clone();
let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx));
let gossip_engine = &mut self.gossip_engine;
let finality_notifications = &mut self.finality_notifications;

futures::select! {
notification = self.finality_notifications.next().fuse() => {
notification = finality_notifications.next().fuse() => {
if let Some(notification) = notification {
self.handle_finality_notification(&notification);
} else {
Expand Down