Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub struct ImportSummary<Block: BlockT> {
/// Contains information about the block that just got finalized,
/// including tree heads that became stale at the moment of finalization.
pub struct FinalizeSummary<Block: BlockT> {
/// Last finalized block header.
pub header: Block::Header,
/// Blocks that were finalized.
/// The last entry is the one that has been explicitly finalized.
pub finalized: Vec<Block::Hash>,
Expand Down
55 changes: 54 additions & 1 deletion client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use sp_runtime::{
};
use std::{collections::HashSet, convert::TryFrom, fmt, sync::Arc};

use crate::{blockchain::Info, notifications::StorageEventStream};
use crate::{blockchain::Info, notifications::StorageEventStream, FinalizeSummary, ImportSummary};

use sc_transaction_pool_api::ChainEvent;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_blockchain;
Expand Down Expand Up @@ -76,6 +77,34 @@ pub trait BlockchainEvents<Block: BlockT> {
) -> sp_blockchain::Result<StorageEventStream<Block::Hash>>;
}

/// List of operations to be performed on storage aux data.
/// First tuple element is the encoded data key.
/// Second tuple element is the encoded optional data to write.
/// If `None`, the key and the associated data are deleted from storage.
pub type AuxDataOperations = Vec<(Vec<u8>, Option<Vec<u8>>)>;

/// Callback invoked before committing the operations created during block import.
/// This gives the opportunity to perform auxiliary pre-commit actions and optionally
/// enqueue further storage write operations to be atomically performed on commit.
pub type OnImportAction<Block> =
Box<dyn (Fn(&BlockImportNotification<Block>) -> AuxDataOperations) + Send>;

/// Callback invoked before committing the operations created during block finalization.
/// This gives the opportunity to perform auxiliary pre-commit actions and optionally
/// enqueue further storage write operations to be atomically performed on commit.
pub type OnFinalityAction<Block> =
Box<dyn (Fn(&FinalityNotification<Block>) -> AuxDataOperations) + Send>;

/// Interface to perform auxiliary actions before committing a block import or
/// finality operation.
pub trait PreCommitActions<Block: BlockT> {
/// Actions to be performed on block import.
fn register_import_action(&self, op: OnImportAction<Block>);

/// Actions to be performed on block finalization.
fn register_finality_action(&self, op: OnFinalityAction<Block>);
}

/// Interface for fetching block data.
pub trait BlockBackend<Block: BlockT> {
/// Get block body by ID. Returns `None` if the body is not stored.
Expand Down Expand Up @@ -300,3 +329,27 @@ impl<B: BlockT> From<FinalityNotification<B>> for ChainEvent<B> {
Self::Finalized { hash: n.hash, tree_route: n.tree_route }
}
}

impl<B: BlockT> From<FinalizeSummary<B>> for FinalityNotification<B> {
fn from(mut summary: FinalizeSummary<B>) -> Self {
let hash = summary.finalized.pop().unwrap_or_default();
FinalityNotification {
hash,
header: summary.header,
tree_route: Arc::new(summary.finalized),
stale_heads: Arc::new(summary.stale_heads),
}
}
}

impl<B: BlockT> From<ImportSummary<B>> for BlockImportNotification<B> {
fn from(summary: ImportSummary<B>) -> Self {
BlockImportNotification {
hash: summary.hash,
origin: summary.origin,
header: summary.header,
is_new_best: summary.is_new_best,
tree_route: summary.tree_route.map(Arc::new),
}
}
}
98 changes: 86 additions & 12 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@
#![warn(missing_docs)]

use std::{
borrow::Cow, collections::HashMap, convert::TryInto, pin::Pin, sync::Arc, time::Duration, u64,
borrow::Cow,
collections::{HashMap, HashSet},
convert::TryInto,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use codec::{Decode, Encode};
Expand All @@ -84,7 +91,10 @@ use prometheus_endpoint::Registry;
use retain_mut::RetainMut;
use schnorrkel::SignatureError;

use sc_client_api::{backend::AuxStore, BlockchainEvents, ProvideUncles, UsageProvider};
use sc_client_api::{
backend::AuxStore, AuxDataOperations, BlockchainEvents, FinalityNotification, PreCommitActions,
ProvideUncles, UsageProvider,
};
use sc_consensus::{
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
Expand Down Expand Up @@ -115,7 +125,7 @@ use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvid
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
generic::{BlockId, OpaqueDigestItemId},
traits::{Block as BlockT, Header, Zero},
traits::{Block as BlockT, Header, One, Saturating, Zero},
DigestItem,
};

Expand Down Expand Up @@ -465,6 +475,7 @@ where
C: ProvideRuntimeApi<B>
+ ProvideUncles<B>
+ BlockchainEvents<B>
+ PreCommitActions<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
Expand Down Expand Up @@ -509,7 +520,7 @@ where
};

info!(target: "babe", "👶 Starting BABE Authorship worker");
let inner = sc_consensus_slots::start_slot_worker(
let slot_worker = sc_consensus_slots::start_slot_worker(
config.0.clone(),
select_chain,
worker,
Expand All @@ -522,13 +533,69 @@ where

let answer_requests =
answer_requests(worker_rx, config.0, client, babe_link.epoch_changes.clone());

let inner = future::select(Box::pin(slot_worker), Box::pin(answer_requests));
Ok(BabeWorker {
inner: Box::pin(future::join(inner, answer_requests).map(|_| ())),
inner: Box::pin(inner.map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
}

// Remove obsolete block's weight data by leveraging finality notifications.
// This includes data for all finalized blocks (excluding the most recent one)
// and all stale branches.
fn aux_storage_cleanup<C: HeaderMetadata<Block>, Block: BlockT>(
client: &C,
notification: &FinalityNotification<Block>,
) -> AuxDataOperations {
let mut aux_keys = HashSet::new();
let mut height_limit = Zero::zero();

// Cleans data for finalized block's ancestors down to, and including, the previously
// finalized one.

let first_new_finalized = notification.tree_route.get(0).unwrap_or(&notification.hash);
match client.header_metadata(*first_new_finalized) {
Ok(meta) => {
aux_keys.insert(aux_schema::block_weight_key(meta.parent));
height_limit = meta.number.saturating_sub(One::one());
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", first_new_finalized.to_string(), err.to_string());
},
}

aux_keys.extend(notification.tree_route.iter().map(aux_schema::block_weight_key));

// Cleans data for stale branches.

for head in notification.stale_heads.iter() {
let mut hash = *head;
// Insert stale blocks hashes until canonical chain is not reached.
// Soon or late we should hit an element already present within the `aux_keys` set.
while aux_keys.insert(aux_schema::block_weight_key(hash)) {
match client.header_metadata(hash) {
Ok(meta) => {
// A fallback in case of malformed notification.
// This should never happen and must be considered a bug.
if meta.number <= height_limit {
warn!(target: "babe", "unexpected canonical chain state or malformed finality notification");
break
}
hash = meta.parent;
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", head.to_string(), err.to_string());
break
},
}
}
}

aux_keys.into_iter().map(|val| (val, None)).collect()
}

async fn answer_requests<B: BlockT, C>(
mut request_rx: Receiver<BabeRequest<B>>,
genesis_config: sc_consensus_slots::SlotDuration<BabeGenesisConfiguration>,
Expand Down Expand Up @@ -611,7 +678,7 @@ impl<B: BlockT> BabeWorkerHandle<B> {
/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output = ()> + Send + 'static>>,
inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
slot_notification_sinks: SlotNotificationSinks<B>,
handle: BabeWorkerHandle<B>,
}
Expand All @@ -635,13 +702,10 @@ impl<B: BlockT> BabeWorker<B> {
}
}

impl<B: BlockT> futures::Future for BabeWorker<B> {
impl<B: BlockT> Future for BabeWorker<B> {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context,
) -> futures::task::Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
Expand Down Expand Up @@ -1682,7 +1746,11 @@ pub fn block_import<Client, Block: BlockT, I>(
client: Arc<Client>,
) -> ClientResult<(BabeBlockImport<Block, Client, I>, BabeLink<Block>)>
where
Client: AuxStore + HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
Client: AuxStore
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ PreCommitActions<Block>
+ 'static,
{
let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
Expand All @@ -1692,6 +1760,12 @@ where
// startup rather than waiting until importing the next epoch change block.
prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;

let client_clone = client.clone();
let on_finality = move |summary: &FinalityNotification<Block>| {
aux_storage_cleanup(client_clone.as_ref(), summary)
};
client.register_finality_action(Box::new(on_finality));

let import = BabeBlockImport::new(client, epoch_changes, wrapped_block_import, config);

Ok((import, link))
Expand Down
Loading