Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 97 additions & 11 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@
#![warn(missing_docs)]

use std::{
borrow::Cow, collections::HashMap, convert::TryInto, pin::Pin, sync::Arc, time::Duration, u64,
borrow::Cow,
collections::{HashMap, HashSet},
convert::TryInto,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use codec::{Decode, Encode};
Expand All @@ -84,7 +91,9 @@ use prometheus_endpoint::Registry;
use retain_mut::RetainMut;
use schnorrkel::SignatureError;

use sc_client_api::{backend::AuxStore, BlockchainEvents, ProvideUncles, UsageProvider};
use sc_client_api::{
backend::AuxStore, BlockchainEvents, FinalityNotifications, ProvideUncles, UsageProvider,
};
use sc_consensus::{
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
Expand Down Expand Up @@ -115,7 +124,7 @@ use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvid
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
generic::{BlockId, OpaqueDigestItemId},
traits::{Block as BlockT, Header, Zero},
traits::{Block as BlockT, Header, One, Saturating, Zero},
DigestItem,
};

Expand Down Expand Up @@ -467,6 +476,7 @@ where
+ BlockchainEvents<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ AuxStore
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -521,14 +531,93 @@ where
let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);

let answer_requests =
answer_requests(worker_rx, config.0, client, babe_link.epoch_changes.clone());
answer_requests(worker_rx, config.0, client.clone(), babe_link.epoch_changes.clone());

let finality_notifications = client.finality_notification_stream();
let clean_weights = aux_storage_cleanup(client, finality_notifications);

// TODO: PROPOSAL
// The inner futures are "never-ending" tasks.
// Maybe we should replace `join` with `select` to exit from the worker as
// soon as one of its inner futures terminates.
// The `join` will keep polling the others even if one of them has finished.
Ok(BabeWorker {
inner: Box::pin(future::join(inner, answer_requests).map(|_| ())),
inner: Box::pin(future::join3(inner, answer_requests, clean_weights).map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
}

// Remove obsolete block's weight data by leveraging finality notifications.
// This includes data for all finalized blocks (excluded the most recent one)
// and all stale branches.
async fn aux_storage_cleanup<B: BlockT, C>(
client: Arc<C>,
mut finality_notifications: FinalityNotifications<B>,
) where
C: HeaderBackend<B> + AuxStore,
{
while let Some(notification) = finality_notifications.next().await {
let mut aux_keys = HashSet::new();
let mut height_limit = Zero::zero();

// Cleans data for finalized block's ancestors down to, and including, the previously
// finalized one.

let first_new_finalized = notification.tree_route.get(0).unwrap_or(&notification.hash);
match client.header(BlockId::Hash(*first_new_finalized)) {
Ok(Some(header)) => {
aux_keys.insert(aux_schema::block_weight_key(header.parent_hash()));
height_limit = header.number().saturating_sub(One::one());
},
Ok(None) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}", first_new_finalized.to_string());
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", first_new_finalized.to_string(), err.to_string());
},
}

for hash in notification.tree_route.iter() {
aux_keys.insert(aux_schema::block_weight_key(hash));
}

// Cleans data for stale branches.

for head in notification.stale_heads.iter() {
let mut hash = *head;
// Insert stale blocks hashes until canonical chain is not reached.
// Soon or late we should hit an element already present within the `aux_keys` set.
while aux_keys.insert(aux_schema::block_weight_key(hash)) {
match client.header(BlockId::Hash(hash)) {
Ok(Some(header)) => {
// A fallback in case of malformed notification.
// This should never happen and must be considered a bug.
if header.number().le(&height_limit) {
warn!(target: "babe", "unexpected canonical chain state or malformed finality notification");
break
}
hash = *header.parent_hash();
},
Ok(None) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}", hash.to_string());
break
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", head.to_string(), err.to_string());
break
},
}
}
}

let aux_keys: Vec<_> = aux_keys.iter().map(|val| val.as_slice()).collect();
if let Err(err) = client.insert_aux(&[], aux_keys.iter()) {
warn!(target: "babe", " Error cleaning up blocks data: {}", err.to_string());
}
}
}

async fn answer_requests<B: BlockT, C>(
mut request_rx: Receiver<BabeRequest<B>>,
genesis_config: sc_consensus_slots::SlotDuration<BabeGenesisConfiguration>,
Expand Down Expand Up @@ -611,7 +700,7 @@ impl<B: BlockT> BabeWorkerHandle<B> {
/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output = ()> + Send + 'static>>,
inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
slot_notification_sinks: SlotNotificationSinks<B>,
handle: BabeWorkerHandle<B>,
}
Expand All @@ -635,13 +724,10 @@ impl<B: BlockT> BabeWorker<B> {
}
}

impl<B: BlockT> futures::Future for BabeWorker<B> {
impl<B: BlockT> Future for BabeWorker<B> {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context,
) -> futures::task::Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
Expand Down
140 changes: 108 additions & 32 deletions client/consensus/babe/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use log::debug;
use rand::RngCore;
use rand_chacha::{rand_core::SeedableRng, ChaChaRng};
use sc_block_builder::{BlockBuilder, BlockBuilderProvider};
use sc_client_api::{backend::TransactionFor, BlockchainEvents};
use sc_client_api::{backend::TransactionFor, BlockchainEvents, Finalizer};
use sc_consensus::{BoxBlockImport, BoxJustificationImport};
use sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging;
use sc_keystore::LocalKeystore;
Expand Down Expand Up @@ -608,8 +608,8 @@ fn propose_and_import_block<Transaction: Send + 'static>(
slot: Option<Slot>,
proposer_factory: &mut DummyFactory,
block_import: &mut BoxBlockImport<TestBlock, Transaction>,
) -> sp_core::H256 {
let mut proposer = futures::executor::block_on(proposer_factory.init(parent)).unwrap();
) -> Hash {
let mut proposer = block_on(proposer_factory.init(parent)).unwrap();

let slot = slot.unwrap_or_else(|| {
let parent_pre_digest = find_pre_digest::<TestBlock>(parent).unwrap();
Expand All @@ -625,7 +625,7 @@ fn propose_and_import_block<Transaction: Send + 'static>(

let parent_hash = parent.hash();

let mut block = futures::executor::block_on(proposer.propose_with(pre_digest)).unwrap().block;
let mut block = block_on(proposer.propose_with(pre_digest)).unwrap().block;

let epoch_descriptor = proposer_factory
.epoch_changes
Expand Down Expand Up @@ -673,6 +673,29 @@ fn propose_and_import_block<Transaction: Send + 'static>(
post_hash
}

// Propose and import n valid BABE blocks that are built on top of the given parent.
// The proposer takes care of producing epoch change digests according to the epoch
// duration (which is set to 6 slots in the test runtime).
fn propose_and_import_blocks<Transaction: Send + 'static>(
client: &PeersFullClient,
proposer_factory: &mut DummyFactory,
block_import: &mut BoxBlockImport<TestBlock, Transaction>,
parent_id: BlockId<TestBlock>,
n: usize,
) -> Vec<Hash> {
let mut hashes = Vec::with_capacity(n);
let mut parent_header = client.header(&parent_id).unwrap().unwrap();

for _ in 0..n {
let block_hash =
propose_and_import_block(&parent_header, None, proposer_factory, block_import);
hashes.push(block_hash);
parent_header = client.header(&BlockId::Hash(block_hash)).unwrap().unwrap();
}

hashes
}

#[test]
fn importing_block_one_sets_genesis_epoch() {
let mut net = BabeTestNet::new(1);
Expand Down Expand Up @@ -714,8 +737,6 @@ fn importing_block_one_sets_genesis_epoch() {

#[test]
fn importing_epoch_change_block_prunes_tree() {
use sc_client_api::Finalizer;

let mut net = BabeTestNet::new(1);

let peer = net.peer(0);
Expand All @@ -732,26 +753,8 @@ fn importing_epoch_change_block_prunes_tree() {
mutator: Arc::new(|_, _| ()),
};

// This is just boilerplate code for proposing and importing n valid BABE
// blocks that are built on top of the given parent. The proposer takes care
// of producing epoch change digests according to the epoch duration (which
// is set to 6 slots in the test runtime).
let mut propose_and_import_blocks = |parent_id, n| {
let mut hashes = Vec::new();
let mut parent_header = client.header(&parent_id).unwrap().unwrap();

for _ in 0..n {
let block_hash = propose_and_import_block(
&parent_header,
None,
&mut proposer_factory,
&mut block_import,
);
hashes.push(block_hash);
parent_header = client.header(&BlockId::Hash(block_hash)).unwrap().unwrap();
}

hashes
let mut propose_and_import_blocks_wrap = |parent_id, n| {
propose_and_import_blocks(&client, &mut proposer_factory, &mut block_import, parent_id, n)
};

// This is the block tree that we're going to use in this test. Each node
Expand All @@ -766,12 +769,12 @@ fn importing_epoch_change_block_prunes_tree() {

// Create and import the canon chain and keep track of fork blocks (A, C, D)
// from the diagram above.
let canon_hashes = propose_and_import_blocks(BlockId::Number(0), 30);
let canon_hashes = propose_and_import_blocks_wrap(BlockId::Number(0), 30);

// Create the forks
let fork_1 = propose_and_import_blocks(BlockId::Hash(canon_hashes[0]), 10);
let fork_2 = propose_and_import_blocks(BlockId::Hash(canon_hashes[12]), 15);
let fork_3 = propose_and_import_blocks(BlockId::Hash(canon_hashes[18]), 10);
let fork_1 = propose_and_import_blocks_wrap(BlockId::Hash(canon_hashes[0]), 10);
let fork_2 = propose_and_import_blocks_wrap(BlockId::Hash(canon_hashes[12]), 15);
let fork_3 = propose_and_import_blocks_wrap(BlockId::Hash(canon_hashes[18]), 10);

// We should be tracking a total of 9 epochs in the fork tree
assert_eq!(epoch_changes.shared_data().tree().iter().count(), 9);
Expand All @@ -782,7 +785,7 @@ fn importing_epoch_change_block_prunes_tree() {
// We finalize block #13 from the canon chain, so on the next epoch
// change the tree should be pruned, to not contain F (#7).
client.finalize_block(BlockId::Hash(canon_hashes[12]), None, false).unwrap();
propose_and_import_blocks(BlockId::Hash(client.chain_info().best_hash), 7);
propose_and_import_blocks_wrap(BlockId::Hash(client.chain_info().best_hash), 7);

// at this point no hashes from the first fork must exist on the tree
assert!(!epoch_changes
Expand All @@ -809,7 +812,7 @@ fn importing_epoch_change_block_prunes_tree() {

// finalizing block #25 from the canon chain should prune out the second fork
client.finalize_block(BlockId::Hash(canon_hashes[24]), None, false).unwrap();
propose_and_import_blocks(BlockId::Hash(client.chain_info().best_hash), 8);
propose_and_import_blocks_wrap(BlockId::Hash(client.chain_info().best_hash), 8);

// at this point no hashes from the second fork must exist on the tree
assert!(!epoch_changes
Expand Down Expand Up @@ -894,3 +897,76 @@ fn babe_transcript_generation_match() {
};
debug_assert!(test(orig_transcript) == test(transcript_from_data(new_transcript)));
}

#[test]
fn obsolete_blocks_aux_data_cleanup() {
let mut net = BabeTestNet::new(1);

let peer = net.peer(0);
let data = peer.data.as_ref().expect("babe link set up during initialization");
let client = peer.client().as_client();

let mut proposer_factory = DummyFactory {
client: client.clone(),
config: data.link.config.clone(),
epoch_changes: data.link.epoch_changes.clone(),
mutator: Arc::new(|_, _| ()),
};

let mut block_import = data.block_import.lock().take().expect("import set up during init");

let mut propose_and_import_blocks_wrap = |parent_id, n| {
propose_and_import_blocks(&client, &mut proposer_factory, &mut block_import, parent_id, n)
};

let aux_data_check = |hashes: &[Hash], expected: bool| {
hashes.iter().all(|hash| {
aux_schema::load_block_weight(&*peer.client().as_backend(), hash)
.unwrap()
.is_some() == expected
})
};

// Create the following test scenario:
//
// /-----B3 --- B4 ( < fork2 )
// G --- A1 --- A2 --- A3 --- A4 ( < fork1 )
// \-----C4 --- C5 ( < fork3 )

let fork1_hashes = propose_and_import_blocks_wrap(BlockId::Number(0), 4);
let fork2_hashes = propose_and_import_blocks_wrap(BlockId::Number(2), 2);
let fork3_hashes = propose_and_import_blocks_wrap(BlockId::Number(3), 2);

// Check that aux data is present for all but the genesis block.
assert!(aux_data_check(&[client.chain_info().genesis_hash], false));
assert!(aux_data_check(&fork1_hashes, true));
assert!(aux_data_check(&fork2_hashes, true));
assert!(aux_data_check(&fork3_hashes, true));

// Trigger cleanup. Actions order is important:
// 1. Get the finality notification stream to create a sink entry within the client.
// 2. Finalize a block (A2). This will send finality notifications to subscribers.
// 3. Corcively close the finality notification sinks to allow `aux_storage_cleanup` future to
// exit as soon as the outstanding finality messages are consumed.

let finality_notifications = client.finality_notification_stream();

client.finalize_block(BlockId::Number(3), None, true).unwrap();

block_on(async {
for mut sink in client.finality_notification_sinks().lock().iter() {
let _ = sink.close().await;
}
// Consume the finalization messages left in the stream.
aux_storage_cleanup(client.clone(), finality_notifications).await;
});

// Wiped: A1, A2
assert!(aux_data_check(&fork1_hashes[..2], false));
// Present: A3, A4
assert!(aux_data_check(&fork1_hashes[2..], true));
// Wiped: B3, B4
assert!(aux_data_check(&fork2_hashes, false));
// Present C4, C5
assert!(aux_data_check(&fork3_hashes, true));
}