Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub struct ImportSummary<Block: BlockT> {
/// Contains information about the block that just got finalized,
/// including tree heads that became stale at the moment of finalization.
pub struct FinalizeSummary<Block: BlockT> {
/// Last finalized block header.
pub header: Block::Header,
/// Blocks that were finalized.
/// The last entry is the one that has been explicitly finalized.
pub finalized: Vec<Block::Hash>,
Expand Down
55 changes: 54 additions & 1 deletion client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use sp_runtime::{
};
use std::{collections::HashSet, convert::TryFrom, fmt, sync::Arc};

use crate::{blockchain::Info, notifications::StorageEventStream};
use crate::{blockchain::Info, notifications::StorageEventStream, FinalizeSummary, ImportSummary};

use sc_transaction_pool_api::ChainEvent;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_blockchain;
Expand Down Expand Up @@ -76,6 +77,34 @@ pub trait BlockchainEvents<Block: BlockT> {
) -> sp_blockchain::Result<StorageEventStream<Block::Hash>>;
}

/// List of operations to be performed on storage aux data.
/// First tuple element is the encoded data key.
/// Second tuple element is the encoded optional data to write.
/// If `None`, the key and the associated data are deleted from storage.
pub type AuxDataOperations = Vec<(Vec<u8>, Option<Vec<u8>>)>;

/// Callback invoked before committing the operations created during block import.
/// This gives the opportunity to perform auxiliary pre-commit actions and optionally
/// enqueue further storage write operations to be atomically performed on commit.
pub type OnImportAction<Block> =
Box<dyn (Fn(&BlockImportNotification<Block>) -> AuxDataOperations) + Send>;

/// Callback invoked before committing the operations created during block finalization.
/// This gives the opportunity to perform auxiliary pre-commit actions and optionally
/// enqueue further storage write operations to be atomically performed on commit.
pub type OnFinalityAction<Block> =
Box<dyn (Fn(&FinalityNotification<Block>) -> AuxDataOperations) + Send>;

/// Interface to perform auxiliary actions before committing a block import or
/// finality operation.
pub trait PreCommitActions<Block: BlockT> {
/// Actions to be performed on block import.
fn register_import_action(&self, op: OnImportAction<Block>);

/// Actions to be performed on block finalization.
fn register_finality_action(&self, op: OnFinalityAction<Block>);
}

/// Interface for fetching block data.
pub trait BlockBackend<Block: BlockT> {
/// Get block body by ID. Returns `None` if the body is not stored.
Expand Down Expand Up @@ -300,3 +329,27 @@ impl<B: BlockT> From<FinalityNotification<B>> for ChainEvent<B> {
Self::Finalized { hash: n.hash, tree_route: n.tree_route }
}
}

impl<B: BlockT> From<FinalizeSummary<B>> for FinalityNotification<B> {
fn from(mut summary: FinalizeSummary<B>) -> Self {
let hash = summary.finalized.pop().unwrap_or_default();
FinalityNotification {
hash,
header: summary.header,
tree_route: Arc::new(summary.finalized),
stale_heads: Arc::new(summary.stale_heads),
}
}
}

impl<B: BlockT> From<ImportSummary<B>> for BlockImportNotification<B> {
fn from(summary: ImportSummary<B>) -> Self {
BlockImportNotification {
hash: summary.hash,
origin: summary.origin,
header: summary.header,
is_new_best: summary.is_new_best,
tree_route: summary.tree_route.map(Arc::new),
}
}
}
105 changes: 91 additions & 14 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@
#![forbid(unsafe_code)]
#![warn(missing_docs)]

use std::{borrow::Cow, collections::HashMap, convert::TryInto, pin::Pin, sync::Arc, u64};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
convert::TryInto,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use codec::{Decode, Encode};
use futures::{
Expand All @@ -82,7 +91,10 @@ use prometheus_endpoint::Registry;
use retain_mut::RetainMut;
use schnorrkel::SignatureError;

use sc_client_api::{backend::AuxStore, BlockchainEvents, ProvideUncles, UsageProvider};
use sc_client_api::{
backend::AuxStore, AuxDataOperations, BlockchainEvents, FinalityNotification, PreCommitActions,
ProvideUncles, UsageProvider,
};
use sc_consensus::{
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
Expand All @@ -98,7 +110,7 @@ use sc_consensus_slots::{
SlotInfo, StorageChanges,
};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
use sp_api::{ApiExt, NumberFor, ProvideRuntimeApi};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_application_crypto::AppKey;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult};
Expand All @@ -113,7 +125,7 @@ use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvid
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
generic::{BlockId, OpaqueDigestItemId},
traits::{Block as BlockT, Header, Zero},
traits::{Block as BlockT, Header, NumberFor, One, SaturatedConversion, Saturating, Zero},
DigestItem,
};

Expand Down Expand Up @@ -458,6 +470,7 @@ where
C: ProvideRuntimeApi<B>
+ ProvideUncles<B>
+ BlockchainEvents<B>
+ PreCommitActions<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
Expand Down Expand Up @@ -501,7 +514,8 @@ where
};

info!(target: "babe", "👶 Starting BABE Authorship worker");
let inner = sc_consensus_slots::start_slot_worker(

let slot_worker = sc_consensus_slots::start_slot_worker(
babe_link.config.slot_duration(),
select_chain,
worker,
Expand All @@ -515,13 +529,69 @@ where
let answer_requests =
answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes.clone());

let inner = future::select(Box::pin(slot_worker), Box::pin(answer_requests));
Ok(BabeWorker {
inner: Box::pin(future::join(inner, answer_requests).map(|_| ())),
inner: Box::pin(inner.map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
}

// Remove obsolete block's weight data by leveraging finality notifications.
// This includes data for all finalized blocks (excluding the most recent one)
// and all stale branches.
fn aux_storage_cleanup<C: HeaderMetadata<Block>, Block: BlockT>(
client: &C,
notification: &FinalityNotification<Block>,
) -> AuxDataOperations {
let mut aux_keys = HashSet::new();

// Cleans data for finalized block's ancestors down to, and including, the previously
// finalized one.

let first_new_finalized = notification.tree_route.get(0).unwrap_or(&notification.hash);
match client.header_metadata(*first_new_finalized) {
Ok(meta) => {
aux_keys.insert(aux_schema::block_weight_key(meta.parent));
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", first_new_finalized.to_string(), err.to_string());
},
}

aux_keys.extend(notification.tree_route.iter().map(aux_schema::block_weight_key));

// Cleans data for stale branches.

// A safenet in case of malformed notification.
let height_limit = notification.header.number().saturating_sub(
notification.tree_route.len().saturated_into::<NumberFor<Block>>() + One::one(),
);
for head in notification.stale_heads.iter() {
let mut hash = *head;
// Insert stale blocks hashes until canonical chain is not reached.
// Soon or late we should hit an element already present within the `aux_keys` set.
while aux_keys.insert(aux_schema::block_weight_key(hash)) {
match client.header_metadata(hash) {
Ok(meta) => {
// This should never happen and must be considered a bug.
if meta.number <= height_limit {
warn!(target: "babe", "unexpected canonical chain state or malformed finality notification");
break
}
hash = meta.parent;
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", head.to_string(), err.to_string());
break
},
}
}
}

aux_keys.into_iter().map(|val| (val, None)).collect()
}

async fn answer_requests<B: BlockT, C>(
mut request_rx: Receiver<BabeRequest<B>>,
config: Config,
Expand Down Expand Up @@ -604,7 +674,7 @@ impl<B: BlockT> BabeWorkerHandle<B> {
/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output = ()> + Send + 'static>>,
inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
slot_notification_sinks: SlotNotificationSinks<B>,
handle: BabeWorkerHandle<B>,
}
Expand All @@ -628,13 +698,10 @@ impl<B: BlockT> BabeWorker<B> {
}
}

impl<B: BlockT> futures::Future for BabeWorker<B> {
impl<B: BlockT> Future for BabeWorker<B> {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context,
) -> futures::task::Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
Expand Down Expand Up @@ -857,7 +924,7 @@ where
self.telemetry.clone()
}

fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> std::time::Duration {
fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());

sc_consensus_slots::proposing_remaining_duration(
Expand Down Expand Up @@ -1683,7 +1750,11 @@ pub fn block_import<Client, Block: BlockT, I>(
client: Arc<Client>,
) -> ClientResult<(BabeBlockImport<Block, Client, I>, BabeLink<Block>)>
where
Client: AuxStore + HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
Client: AuxStore
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ PreCommitActions<Block>
+ 'static,
{
let epoch_changes =
aux_schema::load_epoch_changes::<Block, _>(&*client, &config.genesis_config)?;
Expand All @@ -1694,6 +1765,12 @@ where
// startup rather than waiting until importing the next epoch change block.
prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;

let client_clone = client.clone();
let on_finality = move |summary: &FinalityNotification<Block>| {
aux_storage_cleanup(client_clone.as_ref(), summary)
};
client.register_finality_action(Box::new(on_finality));

let import = BabeBlockImport::new(client, epoch_changes, wrapped_block_import, config);

Ok((import, link))
Expand Down
Loading