Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion node/core/chain-selection/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ pub(super) trait Backend {
/// Load the stagnant list at the given timestamp.
fn load_stagnant_at(&self, timestamp: Timestamp) -> Result<Vec<Hash>, Error>;
/// Load all stagnant lists up to and including the given Unix timestamp
/// in ascending order.
/// in ascending order. Stop fetching stagnant entries upon reaching `max_elements`.
fn load_stagnant_at_up_to(
&self,
up_to: Timestamp,
max_elements: usize,
) -> Result<Vec<(Timestamp, Vec<Hash>)>, Error>;
/// Load the earliest kept block number.
fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error>;
Expand Down
25 changes: 18 additions & 7 deletions node/core/chain-selection/src/db_backend/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ impl Backend for DbBackend {
fn load_stagnant_at_up_to(
&self,
up_to: crate::Timestamp,
max_elements: usize,
) -> Result<Vec<(crate::Timestamp, Vec<Hash>)>, Error> {
let stagnant_at_iter =
self.inner.iter_with_prefix(self.config.col_data, &STAGNANT_AT_PREFIX[..]);
Expand All @@ -240,7 +241,9 @@ impl Backend for DbBackend {
_ => None,
}
})
.take_while(|(at, _)| *at <= up_to.into())
.enumerate()
.take_while(|(idx, (at, _))| *at <= up_to.into() && *idx < max_elements)
.map(|(_, v)| v)
.collect::<Vec<_>>();

Ok(val)
Expand Down Expand Up @@ -528,7 +531,10 @@ mod tests {
let mut backend = DbBackend::new(db, config);

// Prove that it's cheap
assert!(backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap().is_empty());
assert!(backend
.load_stagnant_at_up_to(Timestamp::max_value(), usize::MAX)
.unwrap()
.is_empty());

backend
.write(vec![
Expand All @@ -539,7 +545,7 @@ mod tests {
.unwrap();

assert_eq!(
backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap(),
backend.load_stagnant_at_up_to(Timestamp::max_value(), usize::MAX).unwrap(),
vec![
(2, vec![Hash::repeat_byte(1)]),
(5, vec![Hash::repeat_byte(2)]),
Expand All @@ -548,7 +554,7 @@ mod tests {
);

assert_eq!(
backend.load_stagnant_at_up_to(10).unwrap(),
backend.load_stagnant_at_up_to(10, usize::MAX).unwrap(),
vec![
(2, vec![Hash::repeat_byte(1)]),
(5, vec![Hash::repeat_byte(2)]),
Expand All @@ -557,21 +563,26 @@ mod tests {
);

assert_eq!(
backend.load_stagnant_at_up_to(9).unwrap(),
backend.load_stagnant_at_up_to(9, usize::MAX).unwrap(),
vec![(2, vec![Hash::repeat_byte(1)]), (5, vec![Hash::repeat_byte(2)]),]
);

assert_eq!(
backend.load_stagnant_at_up_to(9, 1).unwrap(),
vec![(2, vec![Hash::repeat_byte(1)]),]
);

backend.write(vec![BackendWriteOp::DeleteStagnantAt(2)]).unwrap();

assert_eq!(
backend.load_stagnant_at_up_to(5).unwrap(),
backend.load_stagnant_at_up_to(5, usize::MAX).unwrap(),
vec![(5, vec![Hash::repeat_byte(2)]),]
);

backend.write(vec![BackendWriteOp::WriteStagnantAt(5, vec![])]).unwrap();

assert_eq!(
backend.load_stagnant_at_up_to(10).unwrap(),
backend.load_stagnant_at_up_to(10, usize::MAX).unwrap(),
vec![(10, vec![Hash::repeat_byte(3)]),]
);
}
Expand Down
12 changes: 9 additions & 3 deletions node/core/chain-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Timestamp = u64;
// If a block isn't approved in 120 seconds, nodes will abandon it
// and begin building on another chain.
const STAGNANT_TIMEOUT: Timestamp = 120;
// Maximum number of stagnant entries cleaned during one `STAGNANT_TIMEOUT` iteration
const MAX_STAGNANT_ENTRIES: usize = 1000;

#[derive(Debug, Clone)]
enum Approval {
Expand Down Expand Up @@ -435,7 +437,7 @@ where
}
}
_ = stagnant_check_stream.next().fuse() => {
detect_stagnant(backend, clock.timestamp_now())?;
detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES)?;
}
}
}
Expand Down Expand Up @@ -637,9 +639,13 @@ fn handle_approved_block(backend: &mut impl Backend, approved_block: Hash) -> Re
backend.write(ops)
}

fn detect_stagnant(backend: &mut impl Backend, now: Timestamp) -> Result<(), Error> {
fn detect_stagnant(
backend: &mut impl Backend,
now: Timestamp,
max_elements: usize,
) -> Result<(), Error> {
let ops = {
let overlay = tree::detect_stagnant(&*backend, now)?;
let overlay = tree::detect_stagnant(&*backend, now, max_elements)?;

overlay.into_write_ops()
};
Expand Down
5 changes: 4 additions & 1 deletion node/core/chain-selection/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,16 @@ impl Backend for TestBackend {
fn load_stagnant_at_up_to(
&self,
up_to: Timestamp,
max_elements: usize,
) -> Result<Vec<(Timestamp, Vec<Hash>)>, Error> {
Ok(self
.inner
.lock()
.stagnant_at
.range(..=up_to)
.map(|(t, v)| (*t, v.clone()))
.enumerate()
.take_while(|(idx, _)| *idx < max_elements)
.map(|(_, (t, v))| (*t, v.clone()))
.collect())
}
fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error> {
Expand Down
14 changes: 13 additions & 1 deletion node/core/chain-selection/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,15 @@ pub(super) fn approve_block(
pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
backend: &'a B,
up_to: Timestamp,
max_elements: usize,
) -> Result<OverlayedBackend<'a, B>, Error> {
let stagnant_up_to = backend.load_stagnant_at_up_to(up_to)?;
let stagnant_up_to = backend.load_stagnant_at_up_to(up_to, max_elements)?;
let mut backend = OverlayedBackend::new(backend);

// As this is in ascending order, only the earliest stagnant
// blocks will involve heavy viability propagations.
gum::debug!(target: LOG_TARGET, ?up_to, "Loaded {} stagnant entries", stagnant_up_to.len());

for (timestamp, maybe_stagnant) in stagnant_up_to {
backend.delete_stagnant_at(timestamp);

Expand All @@ -550,12 +553,21 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
entry.viability.approval = Approval::Stagnant;
}
let is_viable = entry.viability.is_viable();
gum::trace!(
target: LOG_TARGET,
?block_hash,
?was_viable,
is_viable,
"Found existing stagnant entry"
);

if was_viable && !is_viable {
propagate_viability_update(&mut backend, entry)?;
} else {
backend.write_block_entry(entry);
}
} else {
gum::trace!(target: LOG_TARGET, ?block_hash, "Found non-existing stagnant entry");
}
}
}
Expand Down