Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 116 additions & 44 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use state_processing::{
per_block_processing::{errors::AttestationValidationError, is_merge_transition_complete},
per_slot_processing,
state_advance::{complete_state_advance, partial_state_advance},
BlockSignatureStrategy, SigVerifiedOp,
BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot,
};
use std::borrow::Cow;
use std::cmp::Ordering;
Expand Down Expand Up @@ -488,7 +488,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn forwards_iter_block_roots(
&self,
start_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
let oldest_block_slot = self.store.get_oldest_block_slot();
if start_slot < oldest_block_slot {
return Err(Error::HistoricalBlockError(
Expand All @@ -501,8 +501,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let local_head = self.head()?;

let iter = HotColdDB::forwards_block_roots_iterator(
self.store.clone(),
let iter = self.store.forwards_block_roots_iterator(
start_slot,
local_head.beacon_state,
local_head.beacon_block_root,
Expand All @@ -512,6 +511,43 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(iter.map(|result| result.map_err(Into::into)))
}

/// Even more efficient variant of `forwards_iter_block_roots` that will avoid cloning the head
/// state if it isn't required for the requested range of blocks.
pub fn forwards_iter_block_roots_until(
&self,
start_slot: Slot,
end_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
let oldest_block_slot = self.store.get_oldest_block_slot();
if start_slot < oldest_block_slot {
return Err(Error::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot: start_slot,
oldest_block_slot,
},
));
}

self.with_head(move |head| {
let iter = self.store.forwards_block_roots_iterator_until(
start_slot,
end_slot,
|| {
(
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_block_root,
)
},
&self.spec,
)?;
Ok(iter
.map(|result| result.map_err(Into::into))
.take_while(move |result| {
result.as_ref().map_or(true, |(_, slot)| *slot <= end_slot)
}))
})
}

/// Traverse backwards from `block_root` to find the block roots of its ancestors.
///
/// ## Notes
Expand All @@ -524,14 +560,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn rev_iter_block_roots_from(
&self,
block_root: Hash256,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
let block = self
.get_block(&block_root)?
.ok_or(Error::MissingBeaconBlock(block_root))?;
let state = self
.get_state(&block.state_root(), Some(block.slot()))?
.ok_or_else(|| Error::MissingBeaconState(block.state_root()))?;
let iter = BlockRootsIterator::owned(self.store.clone(), state);
let iter = BlockRootsIterator::owned(&self.store, state);
Ok(std::iter::once(Ok((block_root, block.slot())))
.chain(iter)
.map(|result| result.map_err(|e| e.into())))
Expand Down Expand Up @@ -618,12 +654,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot
/// returned may be earlier than the wall-clock slot.
pub fn rev_iter_state_roots_from<'a>(
&self,
&'a self,
state_root: Hash256,
state: &'a BeaconState<T::EthSpec>,
) -> impl Iterator<Item = Result<(Hash256, Slot), Error>> + 'a {
std::iter::once(Ok((state_root, state.slot())))
.chain(StateRootsIterator::new(self.store.clone(), state))
.chain(StateRootsIterator::new(&self.store, state))
.map(|result| result.map_err(Into::into))
}

Expand All @@ -637,11 +673,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn forwards_iter_state_roots(
&self,
start_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
let local_head = self.head()?;

let iter = HotColdDB::forwards_state_roots_iterator(
self.store.clone(),
let iter = self.store.forwards_state_roots_iterator(
start_slot,
local_head.beacon_state_root(),
local_head.beacon_state,
Expand All @@ -651,6 +686,36 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(iter.map(|result| result.map_err(Into::into)))
}

/// Super-efficient forwards state roots iterator that avoids cloning the head if the state
/// roots lie entirely within the freezer database.
///
/// The iterator returned will include roots for `start_slot..=end_slot`, i.e. it
/// is endpoint inclusive.
pub fn forwards_iter_state_roots_until(
&self,
start_slot: Slot,
end_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
self.with_head(move |head| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice this holds a read-lock on the head until the iteration completes. I don't have an idea on how long freezer roots iteration takes, I'm wondering what your thoughts are on hogging the head lock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch.

It's ok in the case where the iteration starts in the frozen root section of the DB, because we clone the head and return the iterator (which is lazy) here:

let continuation_data =
if end_slot.map_or(false, |end_slot| end_slot < latest_restore_point_slot) {
Box::new(None)
} else {
Box::new(Some(get_state()))
};
PreFinalization {
iter,
continuation_data,
}

However in the case where we start after the last_restore_point_slot we eagerly iterate backwards from the head while holding the lock, which is not ideal:

let (end_state, end_state_root) = get_state();
PostFinalization {
iter: SimpleForwardsStateRootsIterator::new(
store,
start_slot,
end_state,
end_state_root,
)?,
}

I'll add a variant to the hybrid iterator that represents the "between state" of just holding the state before we start iterating backwards. Then we can preserve the property that the iterator is always quick to construct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented this in b892bb7 and ended up consolidating the block root and state root iterators while I was there.

let iter = self.store.forwards_state_roots_iterator_until(
start_slot,
end_slot,
|| {
(
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_state_root(),
)
},
&self.spec,
)?;
Ok(iter
.map(|result| result.map_err(Into::into))
.take_while(move |result| {
result.as_ref().map_or(true, |(_, slot)| *slot <= end_slot)
}))
})
}

/// Returns the block at the given slot, if any. Only returns blocks in the canonical chain.
///
/// Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot.
Expand Down Expand Up @@ -708,18 +773,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(Some(root));
}

process_results(self.forwards_iter_state_roots(request_slot)?, |mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
process_results(
self.forwards_iter_state_roots_until(request_slot, request_slot)?,
|mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
}
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
Ok(None)
}
} else {
Ok(None)
}
})?
},
)?
}

/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
Expand Down Expand Up @@ -790,11 +858,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(root_opt);
}

if let Some(((prev_root, _), (curr_root, curr_slot))) =
process_results(self.forwards_iter_block_roots(prev_slot)?, |iter| {
iter.tuple_windows().next()
})?
{
if let Some(((prev_root, _), (curr_root, curr_slot))) = process_results(
self.forwards_iter_block_roots_until(prev_slot, request_slot)?,
|iter| iter.tuple_windows().next(),
)? {
// Sanity check.
if curr_slot != request_slot {
return Err(Error::InconsistentForwardsIter {
Expand Down Expand Up @@ -842,18 +909,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(Some(root));
}

process_results(self.forwards_iter_block_roots(request_slot)?, |mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
process_results(
self.forwards_iter_block_roots_until(request_slot, request_slot)?,
|mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
}
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
Ok(None)
}
} else {
Ok(None)
}
})?
},
)?
}

/// Returns the block at the given root, if any.
Expand Down Expand Up @@ -1112,12 +1182,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(state)
}
Ordering::Less => {
let state_root = process_results(self.forwards_iter_state_roots(slot)?, |iter| {
iter.take_while(|(_, current_slot)| *current_slot >= slot)
.find(|(_, current_slot)| *current_slot == slot)
.map(|(root, _slot)| root)
})?
.ok_or(Error::NoStateForSlot(slot))?;
let state_root =
process_results(self.forwards_iter_state_roots_until(slot, slot)?, |iter| {
iter.take_while(|(_, current_slot)| *current_slot >= slot)
.find(|(_, current_slot)| *current_slot == slot)
.map(|(root, _slot)| root)
})?
.ok_or(Error::NoStateForSlot(slot))?;

Ok(self
.get_state(&state_root, Some(slot))?
Expand Down Expand Up @@ -1256,7 +1327,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
beacon_block_root: Hash256,
state: &BeaconState<T::EthSpec>,
) -> Result<Option<Hash256>, Error> {
let iter = BlockRootsIterator::new(self.store.clone(), state);
let iter = BlockRootsIterator::new(&self.store, state);
let iter_with_head = std::iter::once(Ok((beacon_block_root, state.slot())))
.chain(iter)
.map(|result| result.map_err(|e| e.into()));
Expand Down Expand Up @@ -2983,6 +3054,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&block,
None,
BlockSignatureStrategy::VerifyRandao,
VerifyBlockRoot::True,
&self.spec,
)?;
drop(process_timer);
Expand Down Expand Up @@ -3324,7 +3396,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.epoch
.start_slot(T::EthSpec::slots_per_epoch());
let new_finalized_state_root = process_results(
StateRootsIterator::new(self.store.clone(), &head.beacon_state),
StateRootsIterator::new(&self.store, &head.beacon_state),
|mut iter| {
iter.find_map(|(state_root, slot)| {
if slot == new_finalized_slot {
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing,
state_advance::partial_state_advance,
BlockProcessingError, BlockSignatureStrategy, SlotProcessingError,
BlockProcessingError, BlockSignatureStrategy, SlotProcessingError, VerifyBlockRoot,
};
use std::borrow::Cow;
use std::fs;
Expand Down Expand Up @@ -1185,6 +1185,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
Some(block_root),
// Signatures were verified earlier in this function.
BlockSignatureStrategy::NoVerification,
VerifyBlockRoot::True,
&chain.spec,
) {
match err {
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use state_processing::{
},
signature_sets::Error as SignatureSetError,
state_advance::Error as StateAdvanceError,
BlockProcessingError, SlotProcessingError,
BlockProcessingError, BlockReplayError, SlotProcessingError,
};
use std::time::Duration;
use task_executor::ShutdownReason;
Expand Down Expand Up @@ -86,6 +86,7 @@ pub enum BeaconChainError {
ValidatorPubkeyCacheIncomplete(usize),
SignatureSetError(SignatureSetError),
BlockSignatureVerifierError(state_processing::block_signature_verifier::Error),
BlockReplayError(BlockReplayError),
DuplicateValidatorPublicKey,
ValidatorPubkeyCacheFileError(String),
ValidatorIndexUnknown(usize),
Expand Down Expand Up @@ -160,6 +161,7 @@ easy_from_to!(ArithError, BeaconChainError);
easy_from_to!(ForkChoiceStoreError, BeaconChainError);
easy_from_to!(HistoricalBlockError, BeaconChainError);
easy_from_to!(StateAdvanceError, BeaconChainError);
easy_from_to!(BlockReplayError, BeaconChainError);

#[derive(Debug)]
pub enum BlockProductionError {
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use fork_choice::{ForkChoice, PayloadVerificationStatus};
use itertools::process_results;
use slog::{info, warn, Logger};
use state_processing::state_advance::complete_state_advance;
use state_processing::{per_block_processing, per_block_processing::BlockSignatureStrategy};
use state_processing::{
per_block_processing, per_block_processing::BlockSignatureStrategy, VerifyBlockRoot,
};
use std::sync::Arc;
use std::time::Duration;
use store::{iter::ParentRootBlockIterator, HotColdDB, ItemStore};
Expand Down Expand Up @@ -161,6 +163,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
&block,
None,
BlockSignatureStrategy::NoVerification,
VerifyBlockRoot::True,
spec,
)
.map_err(|e| format!("Error replaying block: {:?}", e))?;
Expand Down
14 changes: 6 additions & 8 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
new_finalized_slot,
(new_finalized_block_hash, new_finalized_state_hash),
)))
.chain(
RootsIterator::new(store.clone(), new_finalized_state).map(|res| {
res.map(|(block_root, state_root, slot)| {
(slot, (block_root.into(), state_root.into()))
})
}),
)
.chain(RootsIterator::new(&store, new_finalized_state).map(|res| {
res.map(|(block_root, state_root, slot)| {
(slot, (block_root.into(), state_root.into()))
})
}))
.take_while(|res| {
res.as_ref()
.map_or(true, |(slot, _)| *slot >= old_finalized_slot)
Expand Down Expand Up @@ -416,7 +414,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho

// Iterate backwards from this head, staging blocks and states for deletion.
let iter = std::iter::once(Ok((head_hash, head_state_root, head_slot)))
.chain(RootsIterator::from_block(store.clone(), head_hash)?);
.chain(RootsIterator::from_block(&store, head_hash)?);

for maybe_tuple in iter {
let (block_root, state_root, slot) = maybe_tuple?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn map_relevant_epochs_to_roots<T: BeaconChainTypes>(

// Iterate backwards from the given `head_root` and `head_slot` and find the block root at each epoch.
let mut iter = std::iter::once(Ok((head_root, head_slot)))
.chain(BlockRootsIterator::from_block(db, head_root).map_err(|e| format!("{:?}", e))?);
.chain(BlockRootsIterator::from_block(&db, head_root).map_err(|e| format!("{:?}", e))?);
let mut roots_by_epoch = HashMap::new();
for epoch in relevant_epochs {
let start_slot = epoch.start_slot(T::EthSpec::slots_per_epoch());
Expand Down
Loading