Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions core/consensus/babe/src/epoch_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,35 @@ impl<Hash, Number> EpochChanges<Hash, Number> where
EpochChanges { inner: ForkTree::new() }
}

/// Prune out finalized epochs, except for the ancestor of the finalized block.
/// Prune out finalized epochs, except for the ancestor of the finalized
/// block. The given slot should be the slot number at which the finalized
/// block was authored.
pub fn prune_finalized<D: IsDescendentOfBuilder<Hash>>(
&mut self,
descendent_of_builder: D,
_hash: &Hash,
_number: Number,
hash: &Hash,
number: Number,
slot: SlotNumber,
) -> Result<(), fork_tree::Error<D::Error>> {
let _is_descendent_of = descendent_of_builder
let is_descendent_of = descendent_of_builder
.build_is_descendent_of(None);

// TODO:
// https://github.com/paritytech/substrate/issues/3651
//
let predicate = |epoch: &PersistedEpoch| match *epoch {
PersistedEpoch::Genesis(_, ref epoch_1) =>
slot >= epoch_1.end_slot(),
PersistedEpoch::Regular(ref epoch_n) =>
slot >= epoch_n.end_slot(),
};

// prune any epochs which could not be _live_ as of the children of the
// finalized block.
// i.e. re-root the fork tree to the oldest ancestor of (hash, number)
// where epoch.end_slot() >= slot(hash)
// finalized block, i.e. re-root the fork tree to the oldest ancestor of
// (hash, number) where epoch.end_slot() >= finalized_slot
self.inner.prune(
hash,
&number,
&is_descendent_of,
&predicate,
)?;

Ok(())
}
Expand Down Expand Up @@ -299,6 +311,12 @@ impl<Hash, Number> EpochChanges<Hash, Number> where
Err(e) => Err(e),
}
}

/// Return the inner fork tree, useful for testing purposes.
#[cfg(test)]
pub fn tree(&self) -> &ForkTree<Hash, Number, PersistedEpoch> {
&self.inner
}
}

/// Type alias to produce the epoch-changes tree from a block type.
Expand Down
78 changes: 43 additions & 35 deletions core/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,6 @@ pub fn start_babe<B, C, SC, E, I, SO, Error>(BabeParams {
&inherent_data_providers,
)?;

let epoch_changes = babe_link.epoch_changes.clone();
let pruning_task = client.finality_notification_stream()
.for_each(move |notification| {
// TODO: supply is-descendent-of and maybe write to disk _now_
// as opposed to waiting for the next epoch?
let res = epoch_changes.lock().prune_finalized(
descendent_query(&*client),
&notification.hash,
*notification.header.number(),
);

if let Err(e) = res {
babe_err!("Could not prune expired epoch changes: {:?}", e);
}

future::ready(())
});

babe_info!("Starting BABE Authorship worker");
let slot_worker = slots::start_slot_worker(
config.0,
Expand All @@ -286,9 +268,9 @@ pub fn start_babe<B, C, SC, E, I, SO, Error>(BabeParams {
sync_oracle,
inherent_data_providers,
babe_link.time_source,
).map(|_| ());
);

Ok(future::select(slot_worker, pruning_task).map(|_| Ok::<(), ()>(())).compat())
Ok(slot_worker.map(|_| Ok::<(), ()>(())).compat())
}

struct BabeWorker<B: BlockT, C, E, I, SO> {
Expand Down Expand Up @@ -1280,6 +1262,8 @@ impl<B, E, Block, I, RA, PRA> BlockImport<Block> for BabeBlockImport<B, E, Block
// this way we can revert it if there's any error
let mut old_epoch_changes = None;

let info = self.client.info().chain;

if let Some(next_epoch_descriptor) = next_epoch_digest {
let next_epoch = epoch.increment(next_epoch_descriptor);

Expand All @@ -1289,21 +1273,48 @@ impl<B, E, Block, I, RA, PRA> BlockImport<Block> for BabeBlockImport<B, E, Block
epoch.as_ref().epoch_index, hash, slot_number, epoch.as_ref().start_slot);
babe_info!("Next epoch starts at slot {}", next_epoch.as_ref().start_slot);

// track the epoch change in the fork tree
let res = epoch_changes.import(
descendent_query(&*self.client),
hash,
number,
*block.header.parent_hash(),
next_epoch,
);
// prune the tree of epochs not part of the finalized chain or
// that are not live anymore, and then track the given epoch change
// in the tree.
// NOTE: it is important that these operations are done in this
// order, otherwise if pruning after import the `is_descendent_of`
// used by pruning may not know about the block that is being
// imported.
let prune_and_import = || {
let finalized_slot = {
let finalized_header = self.client.header(&BlockId::Hash(info.finalized_hash))
.map_err(|e| ConsensusError::ClientImport(format!("{:?}", e)))?
.expect("best finalized hash was given by client; \
finalized headers must exist in db; qed");

find_pre_digest::<Block>(&finalized_header)
.expect("finalized header must be valid; \
valid blocks have a pre-digest; qed")
.slot_number()
};

epoch_changes.prune_finalized(
descendent_query(&*self.client),
&info.finalized_hash,
info.finalized_number,
finalized_slot,
).map_err(|e| ConsensusError::ClientImport(format!("{:?}", e)))?;

epoch_changes.import(
descendent_query(&*self.client),
hash,
number,
*block.header.parent_hash(),
next_epoch,
).map_err(|e| ConsensusError::ClientImport(format!("{:?}", e)))?;

Ok(())
};

if let Err(e) = res {
let err = ConsensusError::ClientImport(format!("{:?}", e));
if let Err(e) = prune_and_import() {
babe_err!("Failed to launch next epoch: {:?}", e);
*epoch_changes = old_epoch_changes.expect("set `Some` above and not taken; qed");
return Err(err);
return Err(e);
}

crate::aux_schema::write_epoch_changes::<Block, _, _>(
Expand All @@ -1326,10 +1337,7 @@ impl<B, E, Block, I, RA, PRA> BlockImport<Block> for BabeBlockImport<B, E, Block
// more primary blocks), if there's a tie we go with the longest
// chain.
block.fork_choice = {
let (last_best, last_best_number) = {
let info = self.client.info().chain;
(info.best_hash, info.best_number)
};
let (last_best, last_best_number) = (info.best_hash, info.best_number);

let last_best_weight = if &last_best == block.header.parent_hash() {
// the parent=genesis case is already covered for loading parent weight,
Expand Down
161 changes: 161 additions & 0 deletions core/consensus/babe/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl DummyProposer {
&BlockId::Hash(self.parent_hash),
pre_digests,
).unwrap();

let mut block = match block_builder.bake().map_err(|e| e.into()) {
Ok(b) => b,
Err(e) => return future::ready(Err(e)),
Expand Down Expand Up @@ -595,3 +596,163 @@ fn importing_block_one_sets_genesis_epoch() {
).unwrap().unwrap().into_inner();
assert_eq!(epoch_for_second_block, genesis_epoch);
}

#[test]
fn importing_epoch_change_block_prunes_tree() {
use client::backend::Finalizer;

let mut net = BabeTestNet::new(1);

let peer = net.peer(0);
let data = peer.data.as_ref().expect("babe link set up during initialization");

let client = peer.client().as_full().expect("Only full clients are used in tests").clone();
let mut block_import = data.block_import.lock().take().expect("import set up during init");
let epoch_changes = data.link.epoch_changes.clone();

// This is just boilerplate code for proposing and importing a valid BABE
// block that's built on top of the given parent. The proposer takes care
// of producing epoch change digests according to the epoch duration (which
// is set to 6 slots in the test runtime).
let mut propose_and_import_block = |parent_header| {
let mut environ = DummyFactory {
client: client.clone(),
config: data.link.config.clone(),
epoch_changes: data.link.epoch_changes.clone(),
mutator: Arc::new(|_, _| ()),
};

let mut proposer = environ.init(&parent_header).unwrap();
let parent_pre_digest = find_pre_digest::<Block>(&parent_header).unwrap();

let pre_digest = sr_primitives::generic::Digest {
logs: vec![
Item::babe_pre_digest(
BabePreDigest::Secondary {
authority_index: 0,
slot_number: parent_pre_digest.slot_number() + 1,
},
),
],
};

let mut block = futures::executor::block_on(proposer.propose_with(pre_digest)).unwrap();

let seal = {
// sign the pre-sealed hash of the block and then
// add it to a digest item.
let pair = AuthorityPair::from_seed(&[1; 32]);
let pre_hash = block.header.hash();
let signature = pair.sign(pre_hash.as_ref());
Item::babe_seal(signature)
};

let post_hash = {
block.header.digest_mut().push(seal.clone());
let h = block.header.hash();
block.header.digest_mut().pop();
h
};

let next_epoch_digest =
find_next_epoch_digest::<Block>(&block.header).unwrap();

let import_result = block_import.import_block(
BlockImportParams {
origin: BlockOrigin::Own,
header: block.header,
justification: None,
post_digests: vec![seal],
body: Some(block.extrinsics),
finalized: false,
auxiliary: Vec::new(),
fork_choice: ForkChoiceStrategy::LongestChain,
},
Default::default(),
).unwrap();

match import_result {
ImportResult::Imported(_) => {},
_ => panic!("expected block to be imported"),
}

(post_hash, next_epoch_digest)
};

let mut propose_and_import_blocks = |parent_id, n| {
let mut hashes = Vec::new();
let mut parent_header = client.header(&parent_id).unwrap().unwrap();

for _ in 0..n {
let (block_hash, _) = propose_and_import_block(parent_header);
hashes.push(block_hash);
parent_header = client.header(&BlockId::Hash(block_hash)).unwrap().unwrap();
}

hashes
};

// This is the block tree that we're going to use in this test. Each node
// represents an epoch change block, the epoch duration is 6 slots.
//
// *---- F (#7)
// / *------ G (#19) - H (#25)
// / /
// A (#1) - B (#7) - C (#13) - D (#19) - E (#25)
// \
// *------ I (#25)

// Create and import the canon chain and keep track of fork blocks (A, C, D)
// from the diagram above.
let canon_hashes = propose_and_import_blocks(BlockId::Number(0), 30);

// Create the forks
let fork_1 = propose_and_import_blocks(BlockId::Hash(canon_hashes[0]), 10);
let fork_2 = propose_and_import_blocks(BlockId::Hash(canon_hashes[12]), 15);
let fork_3 = propose_and_import_blocks(BlockId::Hash(canon_hashes[18]), 10);

// We should be tracking a total of 9 epochs in the fork tree
assert_eq!(
epoch_changes.lock().tree().iter().count(),
9,
);

// And only one root
assert_eq!(
epoch_changes.lock().tree().roots().count(),
1,
);

// We finalize block #13 from the canon chain, so on the next epoch
// change the tree should be pruned, to not contain F (#7).
client.finalize_block(BlockId::Hash(canon_hashes[12]), None, false).unwrap();
propose_and_import_blocks(BlockId::Hash(client.info().chain.best_hash), 7);

// at this point no hashes from the first fork must exist on the tree
assert!(
!epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_1.contains(h)),
);

// but the epoch changes from the other forks must still exist
assert!(
epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_2.contains(h))
);

assert!(
epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_3.contains(h)),
);

// finalizing block #25 from the canon chain should prune out the second fork
client.finalize_block(BlockId::Hash(canon_hashes[24]), None, false).unwrap();
propose_and_import_blocks(BlockId::Hash(client.info().chain.best_hash), 8);

// at this point no hashes from the second fork must exist on the tree
assert!(
!epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_2.contains(h)),
);

// while epoch changes from the last fork should still exist
assert!(
epoch_changes.lock().tree().iter().map(|(h, _, _)| h).any(|h| fork_3.contains(h)),
);
}
Loading