diff --git a/node/core/chain-selection/src/backend.rs b/node/core/chain-selection/src/backend.rs index 71b6797d3526..6c5396a5c64d 100644 --- a/node/core/chain-selection/src/backend.rs +++ b/node/core/chain-selection/src/backend.rs @@ -46,10 +46,11 @@ pub(super) trait Backend { /// Load the stagnant list at the given timestamp. fn load_stagnant_at(&self, timestamp: Timestamp) -> Result, Error>; /// Load all stagnant lists up to and including the given Unix timestamp - /// in ascending order. + /// in ascending order. Stop fetching stagnant entries upon reaching `max_elements`. fn load_stagnant_at_up_to( &self, up_to: Timestamp, + max_elements: usize, ) -> Result)>, Error>; /// Load the earliest kept block number. fn load_first_block_number(&self) -> Result, Error>; diff --git a/node/core/chain-selection/src/db_backend/v1.rs b/node/core/chain-selection/src/db_backend/v1.rs index 8b4591f5c3a9..db117ff945df 100644 --- a/node/core/chain-selection/src/db_backend/v1.rs +++ b/node/core/chain-selection/src/db_backend/v1.rs @@ -229,6 +229,7 @@ impl Backend for DbBackend { fn load_stagnant_at_up_to( &self, up_to: crate::Timestamp, + max_elements: usize, ) -> Result)>, Error> { let stagnant_at_iter = self.inner.iter_with_prefix(self.config.col_data, &STAGNANT_AT_PREFIX[..]); @@ -240,7 +241,9 @@ impl Backend for DbBackend { _ => None, } }) - .take_while(|(at, _)| *at <= up_to.into()) + .enumerate() + .take_while(|(idx, (at, _))| *at <= up_to.into() && *idx < max_elements) + .map(|(_, v)| v) .collect::>(); Ok(val) @@ -528,7 +531,10 @@ mod tests { let mut backend = DbBackend::new(db, config); // Prove that it's cheap - assert!(backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap().is_empty()); + assert!(backend + .load_stagnant_at_up_to(Timestamp::max_value(), usize::MAX) + .unwrap() + .is_empty()); backend .write(vec![ @@ -539,7 +545,7 @@ mod tests { .unwrap(); assert_eq!( - backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap(), + backend.load_stagnant_at_up_to(Timestamp::max_value(), usize::MAX).unwrap(), vec![ (2, vec![Hash::repeat_byte(1)]), (5, vec![Hash::repeat_byte(2)]), @@ -548,7 +554,7 @@ mod tests { ); assert_eq!( - backend.load_stagnant_at_up_to(10).unwrap(), + backend.load_stagnant_at_up_to(10, usize::MAX).unwrap(), vec![ (2, vec![Hash::repeat_byte(1)]), (5, vec![Hash::repeat_byte(2)]), @@ -557,21 +563,26 @@ mod tests { ); assert_eq!( - backend.load_stagnant_at_up_to(9).unwrap(), + backend.load_stagnant_at_up_to(9, usize::MAX).unwrap(), vec![(2, vec![Hash::repeat_byte(1)]), (5, vec![Hash::repeat_byte(2)]),] ); + assert_eq!( + backend.load_stagnant_at_up_to(9, 1).unwrap(), + vec![(2, vec![Hash::repeat_byte(1)]),] + ); + backend.write(vec![BackendWriteOp::DeleteStagnantAt(2)]).unwrap(); assert_eq!( - backend.load_stagnant_at_up_to(5).unwrap(), + backend.load_stagnant_at_up_to(5, usize::MAX).unwrap(), vec![(5, vec![Hash::repeat_byte(2)]),] ); backend.write(vec![BackendWriteOp::WriteStagnantAt(5, vec![])]).unwrap(); assert_eq!( - backend.load_stagnant_at_up_to(10).unwrap(), + backend.load_stagnant_at_up_to(10, usize::MAX).unwrap(), vec![(10, vec![Hash::repeat_byte(3)]),] ); } diff --git a/node/core/chain-selection/src/lib.rs b/node/core/chain-selection/src/lib.rs index a8646d874d8f..be6509e54a29 100644 --- a/node/core/chain-selection/src/lib.rs +++ b/node/core/chain-selection/src/lib.rs @@ -50,6 +50,8 @@ type Timestamp = u64; // If a block isn't approved in 120 seconds, nodes will abandon it // and begin building on another chain. const STAGNANT_TIMEOUT: Timestamp = 120; +// Maximum number of stagnant entries cleaned during one `STAGNANT_TIMEOUT` iteration +const MAX_STAGNANT_ENTRIES: usize = 1000; #[derive(Debug, Clone)] enum Approval { @@ -435,7 +437,7 @@ where } } _ = stagnant_check_stream.next().fuse() => { - detect_stagnant(backend, clock.timestamp_now())?; + detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES)?; } } } @@ -637,9 +639,13 @@ fn handle_approved_block(backend: &mut impl Backend, approved_block: Hash) -> Re backend.write(ops) } -fn detect_stagnant(backend: &mut impl Backend, now: Timestamp) -> Result<(), Error> { +fn detect_stagnant( + backend: &mut impl Backend, + now: Timestamp, + max_elements: usize, +) -> Result<(), Error> { let ops = { - let overlay = tree::detect_stagnant(&*backend, now)?; + let overlay = tree::detect_stagnant(&*backend, now, max_elements)?; overlay.into_write_ops() }; diff --git a/node/core/chain-selection/src/tests.rs b/node/core/chain-selection/src/tests.rs index 0b8947a200cf..20c4700dff57 100644 --- a/node/core/chain-selection/src/tests.rs +++ b/node/core/chain-selection/src/tests.rs @@ -139,13 +139,16 @@ impl Backend for TestBackend { fn load_stagnant_at_up_to( &self, up_to: Timestamp, + max_elements: usize, ) -> Result)>, Error> { Ok(self .inner .lock() .stagnant_at .range(..=up_to) - .map(|(t, v)| (*t, v.clone())) + .enumerate() + .take_while(|(idx, _)| *idx < max_elements) + .map(|(_, (t, v))| (*t, v.clone())) .collect()) } fn load_first_block_number(&self) -> Result, Error> { diff --git a/node/core/chain-selection/src/tree.rs b/node/core/chain-selection/src/tree.rs index d6f19b792a75..5edb6748934d 100644 --- a/node/core/chain-selection/src/tree.rs +++ b/node/core/chain-selection/src/tree.rs @@ -534,12 +534,28 @@ pub(super) fn approve_block( pub(super) fn detect_stagnant<'a, B: 'a + Backend>( backend: &'a B, up_to: Timestamp, + max_elements: usize, ) -> Result, Error> { - let stagnant_up_to = backend.load_stagnant_at_up_to(up_to)?; + let stagnant_up_to = backend.load_stagnant_at_up_to(up_to, max_elements)?; let mut backend = OverlayedBackend::new(backend); + let (min_ts, max_ts) = match stagnant_up_to.len() { + 0 => (0 as Timestamp, 0 as Timestamp), + 1 => (stagnant_up_to[0].0, stagnant_up_to[0].0), + n => (stagnant_up_to[0].0, stagnant_up_to[n - 1].0), + }; + // As this is in ascending order, only the earliest stagnant // blocks will involve heavy viability propagations. + gum::debug!( + target: LOG_TARGET, + ?up_to, + ?min_ts, + ?max_ts, + "Prepared {} stagnant entries for pruning", + stagnant_up_to.len() + ); + for (timestamp, maybe_stagnant) in stagnant_up_to { backend.delete_stagnant_at(timestamp); @@ -550,12 +566,27 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>( entry.viability.approval = Approval::Stagnant; } let is_viable = entry.viability.is_viable(); + gum::trace!( + target: LOG_TARGET, + ?block_hash, + ?timestamp, + ?was_viable, + ?is_viable, + "Found existing stagnant entry" + ); if was_viable && !is_viable { propagate_viability_update(&mut backend, entry)?; } else { backend.write_block_entry(entry); } + } else { + gum::trace!( + target: LOG_TARGET, + ?block_hash, + ?timestamp, + "Found non-existing stagnant entry" + ); } } }