From 977da03c2d6cb833f9d1de2688b14efb5dadbdb9 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 25 Jan 2023 15:39:24 +0100 Subject: [PATCH 01/27] Increase delay for pov-recovery --- client/service/src/lib.rs | 5 ++++- test/service/src/lib.rs | 5 +---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 8906ee23181..04dbc64c27d 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -121,7 +121,10 @@ where overseer_handle.clone(), // We want that collators wait at maximum the relay chain slot duration before starting // to recover blocks. - RecoveryDelay { min: core::time::Duration::ZERO, max: relay_chain_slot_duration }, + RecoveryDelay { + min: relay_chain_slot_duration.checked_div(2).expect("2 is larger than 0; qed"), + max: relay_chain_slot_duration, + }, client.clone(), import_queue, relay_chain_interface.clone(), diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index e14b7b7463e..42a6b594dc0 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -385,10 +385,7 @@ where para_id, relay_chain_interface, import_queue: import_queue_service, - // The slot duration is currently used internally only to configure - // the recovery delay of pov-recovery. We don't want to wait for too - // long on the full node to recover, so we reduce this time here. - relay_chain_slot_duration: Duration::from_millis(6), + relay_chain_slot_duration: Duration::from_secs(6), }; start_full_node(params)?; From dbef277e066d050d46670bd0d51f843117095ee3 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 25 Jan 2023 16:25:25 +0100 Subject: [PATCH 02/27] Update client/service/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/service/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 04dbc64c27d..caa1fb39660 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -122,7 +122,7 @@ where // We want that collators wait at maximum the relay chain slot duration before starting // to recover blocks. RecoveryDelay { - min: relay_chain_slot_duration.checked_div(2).expect("2 is larger than 0; qed"), + min: relay_chain_slot_duration / 2, max: relay_chain_slot_duration, }, client.clone(), From 72e76460a44a34d99bdf24f02c0b96b5cf91f4a7 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 25 Jan 2023 16:28:41 +0100 Subject: [PATCH 03/27] Comment --- client/service/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index caa1fb39660..8b72bd0bd27 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -120,7 +120,8 @@ where let pov_recovery = PoVRecovery::new( overseer_handle.clone(), // We want that collators wait at maximum the relay chain slot duration before starting - // to recover blocks. + // to recover blocks. Additionally, we wait at least half the slot time to give the + // relay chain the chance to increase availability. RecoveryDelay { min: relay_chain_slot_duration / 2, max: relay_chain_slot_duration, From 190b012607f5a9fcb994ecd1ff04f7217d474c98 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Fri, 27 Jan 2023 14:38:32 +0100 Subject: [PATCH 04/27] FMT --- client/service/src/lib.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 8b72bd0bd27..bd43f677e2c 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -122,10 +122,7 @@ where // We want that collators wait at maximum the relay chain slot duration before starting // to recover blocks. Additionally, we wait at least half the slot time to give the // relay chain the chance to increase availability. - RecoveryDelay { - min: relay_chain_slot_duration / 2, - max: relay_chain_slot_duration, - }, + RecoveryDelay { min: relay_chain_slot_duration / 2, max: relay_chain_slot_duration }, client.clone(), import_queue, relay_chain_interface.clone(), From b63782fa614791fb2237371e49d754f57613fb8e Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 31 Jan 2023 11:23:21 +0100 Subject: [PATCH 05/27] Clear waiting_recovery when block is recovered or recovery failed --- client/pov-recovery/src/lib.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 2eed968d5f9..d4e194d67b5 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -112,6 +112,8 @@ struct Candidate { block_number: NumberFor, parent_hash: Block::Hash, // Lazy recovery has been submitted. + // Should be true iff a block is either queued to be recovered or + // recovery is currently in progress. waiting_recovery: bool, } @@ -217,8 +219,8 @@ where }); } - /// Handle an imported block. - fn handle_block_imported(&mut self, block_hash: &Block::Hash) { + /// Block is no longer waiting for recovery + fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) { self.candidates.get_mut(block_hash).map(|candidate| { // Prevents triggering an already enqueued recovery request candidate.waiting_recovery = false; @@ -263,6 +265,7 @@ where Some(data) => data, None => { self.clear_waiting_for_parent(block_hash); + self.clear_waiting_recovery(&block_hash); return }, }; @@ -276,6 +279,7 @@ where tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV"); self.clear_waiting_for_parent(block_hash); + self.clear_waiting_recovery(&block_hash); return }, @@ -290,6 +294,7 @@ where "Failed to decode parachain block data from recovered PoV", ); + self.clear_waiting_recovery(&block_hash); self.clear_waiting_for_parent(block_hash); return @@ -302,12 +307,18 @@ where match self.parachain_client.block_status(parent) { Ok(BlockStatus::Unknown) => { - if self.active_candidate_recovery.is_being_recovered(&parent) { + // If the parent block is currently being recovered or is scheduled to be recovered, we want to wait for the parent. + let parent_scheduled_for_recovery = + self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery); + if self.active_candidate_recovery.is_being_recovered(&parent) || + parent_scheduled_for_recovery + { tracing::debug!( target: LOG_TARGET, ?block_hash, parent_hash = ?parent, - "Parent is still being recovered, waiting.", + parent_scheduled_for_recovery, + "Waiting for recovery of parent.", ); self.waiting_for_parent.entry(parent).or_default().push(block); @@ -321,6 +332,7 @@ where ); self.clear_waiting_for_parent(block_hash); + self.clear_waiting_recovery(&block_hash); return } @@ -334,6 +346,7 @@ where ); self.clear_waiting_for_parent(block_hash); + self.clear_waiting_recovery(&block_hash); return }, @@ -480,7 +493,7 @@ where }, imported = imported_blocks.next() => { if let Some(imported) = imported { - self.handle_block_imported(&imported.hash); + self.clear_waiting_recovery(&imported.hash); } else { tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended"); return; From 3c660da431b66fd36083b315a1a8ee1a95103639 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 31 Jan 2023 12:28:05 +0100 Subject: [PATCH 06/27] Introduce recovery queue that preserved insertion order --- client/pov-recovery/src/lib.rs | 75 +++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index d4e194d67b5..4ed5066bcf4 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -117,6 +117,52 @@ struct Candidate { waiting_recovery: bool, } +struct RecoveryQueue { + candidate_queue: VecDeque, + signaling_queue: FuturesUnordered + Send>>>, +} + +impl RecoveryQueue { + pub fn new() -> Self { + return Self { candidate_queue: VecDeque::new(), signaling_queue: FuturesUnordered::new() } + } + + pub fn push_candidate(&mut self, hash: Block::Hash, delay: RecoveryDelay) { + let delay = delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen()); + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Starting block recovery in {:?} sec", + delay.as_secs(), + ); + self.candidate_queue.push_back(hash); + self.signaling_queue.push( + async move { + Delay::new(delay).await; + } + .boxed(), + ); + } + + pub async fn next_candidate(&mut self) -> Block::Hash { + loop { + if let Some(_) = self.signaling_queue.next().await { + if let Some(hash) = self.candidate_queue.pop_front() { + return hash + } else { + tracing::warn!( + target: LOG_TARGET, + "Recovery was signaled, but no candidate hash available." + ); + futures::pending!() + }; + } else { + futures::pending!() + } + } + } +} + /// Encapsulates the logic of the pov recovery. pub struct PoVRecovery { /// All the pending candidates that we are waiting for to be imported or that need to be @@ -124,9 +170,9 @@ pub struct PoVRecovery { candidates: HashMap>, /// A stream of futures that resolve to hashes of candidates that need to be recovered. /// - /// The candidates to the hashes are stored in `pending_candidates`. If a candidate is not + /// The candidates to the hashes are stored in `candidates`. If a candidate is not /// available anymore in this map, it means that it was already imported. - next_candidate_to_recover: FuturesUnordered + Send>>>, + candidate_recovery_queue: RecoveryQueue, active_candidate_recovery: ActiveCandidateRecovery, /// Blocks that wait that the parent is imported. /// @@ -158,7 +204,7 @@ where ) -> Self { Self { candidates: HashMap::new(), - next_candidate_to_recover: Default::default(), + candidate_recovery_queue: RecoveryQueue::new(), active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle), recovery_delay, waiting_for_parent: HashMap::new(), @@ -438,22 +484,7 @@ where if do_recover { for hash in to_recover.into_iter().rev() { - let delay = - delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen()); - tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Starting {:?} block recovery in {:?} sec", - kind, - delay.as_secs(), - ); - self.next_candidate_to_recover.push( - async move { - Delay::new(delay).await; - hash - } - .boxed(), - ); + self.candidate_recovery_queue.push_candidate(hash, delay); } } } @@ -507,10 +538,8 @@ where return; } }, - next_to_recover = self.next_candidate_to_recover.next() => { - if let Some(block_hash) = next_to_recover { - self.recover_candidate(block_hash).await; - } + next_to_recover = self.candidate_recovery_queue.next_candidate().fuse() => { + self.recover_candidate(next_to_recover).await; }, (block_hash, available_data) = self.active_candidate_recovery.wait_for_recovery().fuse() => From 8eaec6dfff4ed90b1167a579cf1be56b5132e220 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 31 Jan 2023 17:55:31 +0100 Subject: [PATCH 07/27] Better error logs --- .../common/src/parachain_consensus.rs | 33 +++++++++++++------ zombienet/tests/0002-pov_recovery.toml | 6 ++-- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 9bd2e144663..3f79dacc83b 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -56,7 +56,7 @@ where Ok(finalized_heads_stream) => finalized_heads_stream, Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); - return + return; }, }; @@ -67,7 +67,7 @@ where h } else { tracing::debug!(target: LOG_TARGET, "Stopping following finalized head."); - return + return; }; let header = match Block::Header::decode(&mut &finalized_head[..]) { @@ -78,7 +78,7 @@ where error = ?err, "Could not decode parachain header while following finalized heads.", ); - continue + continue; }, }; @@ -86,6 +86,11 @@ where // don't finalize the same block multiple times. if parachain.usage_info().chain.finalized_hash != hash { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Attempting to finalize header.", + ); if let Err(e) = parachain.finalize_block(hash, None, true) { match e { ClientError::UnknownBlock(_) => tracing::debug!( @@ -170,7 +175,7 @@ async fn follow_new_best( Ok(best_heads_stream) => best_heads_stream.fuse(), Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream."); - return + return; }, }; @@ -247,12 +252,12 @@ async fn handle_new_block_imported( }; let unset_hash = if notification.header.number() < unset_best_header.number() { - return + return; } else if notification.header.number() == unset_best_header.number() { let unset_hash = unset_best_header.hash(); if unset_hash != notification.hash { - return + return; } else { unset_hash } @@ -266,7 +271,11 @@ async fn handle_new_block_imported( let unset_best_header = unset_best_header_opt .take() .expect("We checked above that the value is set; qed"); - + tracing::debug!( + target: LOG_TARGET, + ?unset_hash, + "Importing block as new best for parachain.", + ); import_block_as_new_best(unset_hash, unset_best_header, parachain).await; }, state => tracing::debug!( @@ -298,7 +307,7 @@ async fn handle_new_best_parachain_head( error = ?err, "Could not decode Parachain header while following best heads.", ); - return + return; }, }; @@ -315,7 +324,11 @@ async fn handle_new_best_parachain_head( match parachain.block_status(hash) { Ok(BlockStatus::InChainWithState) => { unset_best_header.take(); - + tracing::debug!( + target: LOG_TARGET, + ?hash, + "Importing block as new best for parachain.", + ); import_block_as_new_best(hash, parachain_head, parachain).await; }, Ok(BlockStatus::InChainPruned) => { @@ -378,7 +391,7 @@ where "Skipping importing block as new best block, because there already exists a \ best block with an higher number", ); - return + return; } // Make it the new best block diff --git a/zombienet/tests/0002-pov_recovery.toml b/zombienet/tests/0002-pov_recovery.toml index 2e3f8629cab..dbbe69aad4b 100644 --- a/zombienet/tests/0002-pov_recovery.toml +++ b/zombienet/tests/0002-pov_recovery.toml @@ -48,7 +48,7 @@ add_to_genesis = false validator = false # full node image = "{{COL_IMAGE}}" command = "test-parachain" - args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}","--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}","--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] # run one as a RPC collator who does not produce blocks [[parachains.collators]] @@ -56,7 +56,7 @@ add_to_genesis = false validator = true # collator image = "{{COL_IMAGE}}" command = "test-parachain" - args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--relay-chain-rpc-url {{'ferdie'|zombie('wsUri')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--relay-chain-rpc-url {{'ferdie'|zombie('wsUri')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] # run two as a RPC parachain full node [[parachains.collators]] @@ -64,4 +64,4 @@ add_to_genesis = false validator = false # full node image = "{{COL_IMAGE}}" command = "test-parachain" - args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--relay-chain-rpc-url {{'ferdie'|zombie('wsUri')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--relay-chain-rpc-url {{'ferdie'|zombie('wsUri')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] From 8da6d8cd58b198fe2d3dd3faaa6ed3daa36c0463 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 1 Feb 2023 10:01:01 +0100 Subject: [PATCH 08/27] Decrease slot duration --- test/service/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 42a6b594dc0..69210653025 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -385,7 +385,7 @@ where para_id, relay_chain_interface, import_queue: import_queue_service, - relay_chain_slot_duration: Duration::from_secs(6), + relay_chain_slot_duration: Duration::from_millis(600), }; start_full_node(params)?; From 993dba56f972e48bfeb3edf9a2e927824101a45d Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 2 Feb 2023 16:31:51 +0100 Subject: [PATCH 09/27] Style improvements --- .../common/src/parachain_consensus.rs | 30 +++++++++---------- zombienet/tests/0002-pov_recovery.toml | 2 +- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 3f79dacc83b..a970d737c44 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -56,7 +56,7 @@ where Ok(finalized_heads_stream) => finalized_heads_stream, Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); - return; + return }, }; @@ -67,7 +67,7 @@ where h } else { tracing::debug!(target: LOG_TARGET, "Stopping following finalized head."); - return; + return }; let header = match Block::Header::decode(&mut &finalized_head[..]) { @@ -78,7 +78,7 @@ where error = ?err, "Could not decode parachain header while following finalized heads.", ); - continue; + continue }, }; @@ -171,15 +171,15 @@ async fn follow_new_best( R: RelayChainInterface + Clone, B: Backend, { - let new_best_heads = match new_best_heads(relay_chain, para_id).await { - Ok(best_heads_stream) => best_heads_stream.fuse(), + let new_best_relay_heads = match new_best_heads(relay_chain, para_id).await { + Ok(best_relay_heads_stream) => best_relay_heads_stream.fuse(), Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream."); - return; + return }, }; - pin_mut!(new_best_heads); + pin_mut!(new_best_relay_heads); let mut imported_blocks = parachain.import_notification_stream().fuse(); // The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain @@ -189,10 +189,10 @@ async fn follow_new_best( loop { select! { - h = new_best_heads.next() => { - match h { - Some(h) => handle_new_best_parachain_head( - h, + relay_header = new_best_relay_heads.next() => { + match relay_header { + Some(relay_header) => handle_new_best_parachain_head( + relay_header, &*parachain, &mut unset_best_header, recovery_chan_tx.as_mut(), @@ -252,12 +252,12 @@ async fn handle_new_block_imported( }; let unset_hash = if notification.header.number() < unset_best_header.number() { - return; + return } else if notification.header.number() == unset_best_header.number() { let unset_hash = unset_best_header.hash(); if unset_hash != notification.hash { - return; + return } else { unset_hash } @@ -307,7 +307,7 @@ async fn handle_new_best_parachain_head( error = ?err, "Could not decode Parachain header while following best heads.", ); - return; + return }, }; @@ -391,7 +391,7 @@ where "Skipping importing block as new best block, because there already exists a \ best block with an higher number", ); - return; + return } // Make it the new best block diff --git a/zombienet/tests/0002-pov_recovery.toml b/zombienet/tests/0002-pov_recovery.toml index dbbe69aad4b..cf9a7332688 100644 --- a/zombienet/tests/0002-pov_recovery.toml +++ b/zombienet/tests/0002-pov_recovery.toml @@ -40,7 +40,7 @@ add_to_genesis = false validator = true # collator image = "{{COL_IMAGE}}" command = "test-parachain" - args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] # run eve as a parachain full node [[parachains.collators]] From bfbae02fcf7b51dcbd8c794d0cc68937984a16fa Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 2 Feb 2023 18:22:23 +0100 Subject: [PATCH 10/27] Add option to use unordered queue --- client/pov-recovery/src/lib.rs | 101 ++++++++++++++++++++++++++------- 1 file changed, 81 insertions(+), 20 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 4ed5066bcf4..be8b928d5c1 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -63,7 +63,7 @@ use futures_timer::Delay; use rand::{thread_rng, Rng}; use std::{ - collections::{HashMap, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, pin::Pin, sync::Arc, time::Duration, @@ -105,6 +105,13 @@ pub struct RecoveryDelay { pub max: Duration, } +impl RecoveryDelay { + /// Produce a randomized duration between `min` and `max`. + fn duration(&self) -> Duration { + self.min + self.max.saturating_sub(self.min).mul_f64(thread_rng().gen()) + } +} + /// Represents an outstanding block candidate. struct Candidate { receipt: CandidateReceipt, @@ -119,7 +126,7 @@ struct Candidate { struct RecoveryQueue { candidate_queue: VecDeque, - signaling_queue: FuturesUnordered + Send>>>, + signaling_queue: FuturesUnordered> + Send>>>, } impl RecoveryQueue { @@ -127,18 +134,39 @@ impl RecoveryQueue { return Self { candidate_queue: VecDeque::new(), signaling_queue: FuturesUnordered::new() } } - pub fn push_candidate(&mut self, hash: Block::Hash, delay: RecoveryDelay) { - let delay = delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen()); + /// Add hash of a block that should be recovered after `delay` has passed. + /// In contrast to [`push_ordered_candidate`] this will start recovering `hash`, + /// ignoring the queue position. + pub fn push_unordered_candidate(&mut self, hash: Block::Hash, delay: Duration) { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Adding block to queue and adding new recovery slot in {:?} sec", + delay.as_secs(), + ); + self.signaling_queue.push( + async move { + Delay::new(delay).await; + Some(hash) + } + .boxed(), + ); + } + + /// Add hash of a block that should go to the end of the recovery queue. + /// A new recovery will be signaled after `delay` has passed. + pub fn push_ordered_candidate(&mut self, hash: Block::Hash, delay: Duration) { tracing::debug!( target: LOG_TARGET, block_hash = ?hash, - "Starting block recovery in {:?} sec", + "Adding block to queue and adding new recovery slot in {:?} sec", delay.as_secs(), ); self.candidate_queue.push_back(hash); self.signaling_queue.push( async move { Delay::new(delay).await; + None } .boxed(), ); @@ -146,16 +174,21 @@ impl RecoveryQueue { pub async fn next_candidate(&mut self) -> Block::Hash { loop { - if let Some(_) = self.signaling_queue.next().await { - if let Some(hash) = self.candidate_queue.pop_front() { - return hash - } else { - tracing::warn!( - target: LOG_TARGET, - "Recovery was signaled, but no candidate hash available." - ); - futures::pending!() - }; + if let Some(result) = self.signaling_queue.next().await { + match result { + Some(hash) => return hash, + None => { + if let Some(hash) = self.candidate_queue.pop_front() { + return hash + } else { + tracing::warn!( + target: LOG_TARGET, + "Recovery was signaled, but no candidate hash available." + ); + futures::pending!() + }; + }, + } } else { futures::pending!() } @@ -185,6 +218,8 @@ pub struct PoVRecovery { para_id: ParaId, /// Explicit block recovery requests channel. recovery_chan_rx: Receiver>, + /// Blocks that we are retrying currently + blocks_in_retry: HashSet, } impl PoVRecovery @@ -212,6 +247,7 @@ where parachain_import_queue, relay_chain_interface, para_id, + blocks_in_retry: HashSet::new(), recovery_chan_rx, } } @@ -308,11 +344,36 @@ where available_data: Option, ) { let available_data = match available_data { - Some(data) => data, + Some(data) => { + self.blocks_in_retry.remove(&block_hash); + data + }, None => { - self.clear_waiting_for_parent(block_hash); - self.clear_waiting_recovery(&block_hash); - return + if !self.blocks_in_retry.contains(&block_hash) { + // Retry recovery after 6 seconds. After that time, the + // block should be available. + // TODO: Double check to use relay chain slot duration here. + let duration = Duration::from_secs(6); + tracing::debug!( + target: LOG_TARGET, + ?block_hash, + "Retrying block recovery in {:?} seconds", + duration + ); + self.blocks_in_retry.insert(block_hash); + self.candidate_recovery_queue.push_unordered_candidate(block_hash, duration); + return + } else { + tracing::debug!( + target: LOG_TARGET, + ?block_hash, + "Unable to recover block after retry. Block should be available by now, this is likely a bug.", + ); + self.blocks_in_retry.remove(&block_hash); + self.clear_waiting_for_parent(block_hash); + self.clear_waiting_recovery(&block_hash); + return + } }, }; @@ -484,7 +545,7 @@ where if do_recover { for hash in to_recover.into_iter().rev() { - self.candidate_recovery_queue.push_candidate(hash, delay); + self.candidate_recovery_queue.push_ordered_candidate(hash, delay.duration()); } } } From 1139b80dd355a1d902c6d45ce26d452aa8bdf754 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Fri, 3 Feb 2023 12:57:12 +0100 Subject: [PATCH 11/27] Maintain cache of finalized blocks --- Cargo.lock | 1 + client/consensus/common/Cargo.toml | 1 + .../common/src/parachain_consensus.rs | 158 ++++++++++++------ test/service/src/lib.rs | 2 +- zombienet/tests/0002-pov_recovery.feature | 8 +- 5 files changed, 117 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1b278d737c..0e76bb66c9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1781,6 +1781,7 @@ dependencies = [ "polkadot-primitives", "sc-client-api", "sc-consensus", + "schnellru", "sp-blockchain", "sp-consensus", "sp-runtime", diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml index c992c263593..a91dcaf5063 100644 --- a/client/consensus/common/Cargo.toml +++ b/client/consensus/common/Cargo.toml @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = cumulus-primitives-core = { path = "../../../primitives/core" } cumulus-relay-chain-interface = { path = "../../relay-chain-interface" } cumulus-client-pov-recovery = { path = "../../pov-recovery" } +schnellru = "0.2.1" [dev-dependencies] futures-timer = "3.0.2" diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index a970d737c44..6fd50ce8ccb 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -18,6 +18,7 @@ use sc_client_api::{ Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider, }; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; +use schnellru::{ByLength, LruMap}; use sp_blockchain::Error as ClientError; use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; @@ -41,6 +42,55 @@ const LOG_TARGET: &str = "cumulus-consensus"; const RECOVERY_DELAY: RecoveryDelay = RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) }; +fn handle_new_finalized_head( + parachain: &Arc

, + finalized_head: Vec, + last_seen_finalized_hashes: &mut LruMap, +) where + Block: BlockT, + B: Backend, + P: Finalizer + UsageProvider + BlockchainEvents, +{ + let header = match Block::Header::decode(&mut &finalized_head[..]) { + Ok(header) => header, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + error = ?err, + "Could not decode parachain header while following finalized heads.", + ); + return + }, + }; + + let hash = header.hash(); + + last_seen_finalized_hashes.insert(hash, ()); + // don't finalize the same block multiple times. + if parachain.usage_info().chain.finalized_hash != hash { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Attempting to finalize header.", + ); + if let Err(e) = parachain.finalize_block(hash, None, true) { + match e { + ClientError::UnknownBlock(_) => tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Could not finalize block because it is unknown.", + ), + _ => tracing::warn!( + target: LOG_TARGET, + error = ?e, + block_hash = ?hash, + "Failed to finalize block", + ), + } + } + } +} + /// Follow the finalized head of the given parachain. /// /// For every finalized block of the relay chain, it will get the included parachain header @@ -48,62 +98,74 @@ const RECOVERY_DELAY: RecoveryDelay = async fn follow_finalized_head(para_id: ParaId, parachain: Arc

, relay_chain: R) where Block: BlockT, - P: Finalizer + UsageProvider, + P: Finalizer + UsageProvider + BlockchainEvents, R: RelayChainInterface + Clone, B: Backend, { let finalized_heads = match finalized_heads(relay_chain, para_id).await { - Ok(finalized_heads_stream) => finalized_heads_stream, + Ok(finalized_heads_stream) => finalized_heads_stream.fuse(), Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); return }, }; + let mut imported_blocks = parachain.import_notification_stream().fuse(); + pin_mut!(finalized_heads); - loop { - let finalized_head = if let Some(h) = finalized_heads.next().await { - h - } else { - tracing::debug!(target: LOG_TARGET, "Stopping following finalized head."); - return - }; + // We use this cache to finalize blocks that are imported late. + // For example, a block that has been recovered via PoV-Recovery + // on a full node can have several minutes delay. With this cache + // we have some "memory" of recently finalized blocks. + let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(40)); - let header = match Block::Header::decode(&mut &finalized_head[..]) { - Ok(header) => header, - Err(err) => { - tracing::debug!( - target: LOG_TARGET, - error = ?err, - "Could not decode parachain header while following finalized heads.", - ); - continue + loop { + select! { + fin = finalized_heads.next() => { + match fin { + Some(finalized_head) => + handle_new_finalized_head(¶chain, finalized_head, &mut last_seen_finalized_hashes), + None => { + tracing::debug!(target: LOG_TARGET, "Stopping following finalized head."); + return + } + } }, - }; - - let hash = header.hash(); - - // don't finalize the same block multiple times. - if parachain.usage_info().chain.finalized_hash != hash { - tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Attempting to finalize header.", - ); - if let Err(e) = parachain.finalize_block(hash, None, true) { - match e { - ClientError::UnknownBlock(_) => tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Could not finalize block because it is unknown.", - ), - _ => tracing::warn!( - target: LOG_TARGET, - error = ?e, - block_hash = ?hash, - "Failed to finalize block", - ), + imported = imported_blocks.next() => { + match imported { + Some(imported_block) => { + // When we see a block import that is already finalized, we immediately finalize it. + if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() { + tracing::debug!( + target: LOG_TARGET, + "Setting newly imported block as finalized.", + ); + + if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) { + match e { + ClientError::UnknownBlock(_) => tracing::debug!( + target: LOG_TARGET, + block_hash = ?imported_block.hash, + "Could not finalize block because it is unknown.", + ), + _ => tracing::warn!( + target: LOG_TARGET, + error = ?e, + block_hash = ?imported_block.hash, + "Failed to finalize block", + ), + } + } + } + }, + None => { + tracing::debug!( + target: LOG_TARGET, + "Stopping following imported blocks.", + ); + return + } } } } @@ -171,7 +233,7 @@ async fn follow_new_best( R: RelayChainInterface + Clone, B: Backend, { - let new_best_relay_heads = match new_best_heads(relay_chain, para_id).await { + let new_best_heads = match new_best_heads(relay_chain, para_id).await { Ok(best_relay_heads_stream) => best_relay_heads_stream.fuse(), Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream."); @@ -179,7 +241,7 @@ async fn follow_new_best( }, }; - pin_mut!(new_best_relay_heads); + pin_mut!(new_best_heads); let mut imported_blocks = parachain.import_notification_stream().fuse(); // The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain @@ -189,10 +251,10 @@ async fn follow_new_best( loop { select! { - relay_header = new_best_relay_heads.next() => { - match relay_header { - Some(relay_header) => handle_new_best_parachain_head( - relay_header, + h = new_best_heads.next() => { + match h { + Some(header) => handle_new_best_parachain_head( + header, &*parachain, &mut unset_best_header, recovery_chan_tx.as_mut(), diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 69210653025..42a6b594dc0 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -385,7 +385,7 @@ where para_id, relay_chain_interface, import_queue: import_queue_service, - relay_chain_slot_duration: Duration::from_millis(600), + relay_chain_slot_duration: Duration::from_secs(6), }; start_full_node(params)?; diff --git a/zombienet/tests/0002-pov_recovery.feature b/zombienet/tests/0002-pov_recovery.feature index 80524164821..be451d563c1 100644 --- a/zombienet/tests/0002-pov_recovery.feature +++ b/zombienet/tests/0002-pov_recovery.feature @@ -13,8 +13,8 @@ charlie: is up within 60 seconds one: is up within 60 seconds two: is up within 60 seconds -# wait 30 blocks and register parachain -validator-3: reports block height is at least 30 within 250 seconds +# wait 20 blocks and register parachain +validator-3: reports block height is at least 20 within 250 seconds validator-0: js-script ./register-para.js with "2000" within 240 seconds validator-0: parachain 2000 is registered within 300 seconds @@ -22,5 +22,5 @@ validator-0: parachain 2000 is registered within 300 seconds bob: reports block height is at least 20 within 600 seconds alice: reports block height is at least 20 within 600 seconds charlie: reports block height is at least 20 within 600 seconds -one: reports block height is at least 20 within 600 seconds -two: reports block height is at least 20 within 600 seconds +one: reports block height is at least 20 within 800 seconds +two: reports block height is at least 20 within 800 seconds From 18e63a579824f44ad46f937827cdd2964628b555 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Fri, 3 Feb 2023 13:32:53 +0100 Subject: [PATCH 12/27] Wait for one relay chain slot before recovery --- client/pov-recovery/src/lib.rs | 33 +++++++++++++++++++++------------ client/service/src/lib.rs | 2 ++ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index be8b928d5c1..ca857b21744 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -125,19 +125,22 @@ struct Candidate { } struct RecoveryQueue { - candidate_queue: VecDeque, + // Queue that keeps the hashes of blocks to be recovered + recovery_queue: VecDeque, + // Futures that resolve when a new recovery should be started. signaling_queue: FuturesUnordered> + Send>>>, } +/// Queue that is used to decide when to start PoV-recovery operations. impl RecoveryQueue { pub fn new() -> Self { - return Self { candidate_queue: VecDeque::new(), signaling_queue: FuturesUnordered::new() } + return Self { recovery_queue: VecDeque::new(), signaling_queue: FuturesUnordered::new() } } /// Add hash of a block that should be recovered after `delay` has passed. /// In contrast to [`push_ordered_candidate`] this will start recovering `hash`, /// ignoring the queue position. - pub fn push_unordered_candidate(&mut self, hash: Block::Hash, delay: Duration) { + pub fn push_unordered_recovery(&mut self, hash: Block::Hash, delay: Duration) { tracing::debug!( target: LOG_TARGET, block_hash = ?hash, @@ -155,14 +158,14 @@ impl RecoveryQueue { /// Add hash of a block that should go to the end of the recovery queue. /// A new recovery will be signaled after `delay` has passed. - pub fn push_ordered_candidate(&mut self, hash: Block::Hash, delay: Duration) { + pub fn push_ordered_recovery(&mut self, hash: Block::Hash, delay: Duration) { tracing::debug!( target: LOG_TARGET, block_hash = ?hash, "Adding block to queue and adding new recovery slot in {:?} sec", delay.as_secs(), ); - self.candidate_queue.push_back(hash); + self.recovery_queue.push_back(hash); self.signaling_queue.push( async move { Delay::new(delay).await; @@ -172,13 +175,15 @@ impl RecoveryQueue { ); } - pub async fn next_candidate(&mut self) -> Block::Hash { + /// Get the next hash for block recovery. + pub async fn next_recovery(&mut self) -> Block::Hash { loop { if let Some(result) = self.signaling_queue.next().await { + // If the return future resolves to `None`, we want to process the queue match result { Some(hash) => return hash, None => { - if let Some(hash) = self.candidate_queue.pop_front() { + if let Some(hash) = self.recovery_queue.pop_front() { return hash } else { tracing::warn!( @@ -220,6 +225,8 @@ pub struct PoVRecovery { recovery_chan_rx: Receiver>, /// Blocks that we are retrying currently blocks_in_retry: HashSet, + /// Waiting time before a retry + retry_delay: Duration, } impl PoVRecovery @@ -236,6 +243,7 @@ where relay_chain_interface: RCInterface, para_id: ParaId, recovery_chan_rx: Receiver>, + retry_delay: Duration, ) -> Self { Self { candidates: HashMap::new(), @@ -249,6 +257,7 @@ where para_id, blocks_in_retry: HashSet::new(), recovery_chan_rx, + retry_delay, } } @@ -353,15 +362,15 @@ where // Retry recovery after 6 seconds. After that time, the // block should be available. // TODO: Double check to use relay chain slot duration here. - let duration = Duration::from_secs(6); tracing::debug!( target: LOG_TARGET, ?block_hash, "Retrying block recovery in {:?} seconds", - duration + self.retry_delay ); self.blocks_in_retry.insert(block_hash); - self.candidate_recovery_queue.push_unordered_candidate(block_hash, duration); + self.candidate_recovery_queue + .push_unordered_recovery(block_hash, self.retry_delay); return } else { tracing::debug!( @@ -545,7 +554,7 @@ where if do_recover { for hash in to_recover.into_iter().rev() { - self.candidate_recovery_queue.push_ordered_candidate(hash, delay.duration()); + self.candidate_recovery_queue.push_ordered_recovery(hash, delay.duration()); } } } @@ -599,7 +608,7 @@ where return; } }, - next_to_recover = self.candidate_recovery_queue.next_candidate().fuse() => { + next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => { self.recover_candidate(next_to_recover).await; }, (block_hash, available_data) = diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index bd43f677e2c..d81fc178b03 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -128,6 +128,7 @@ where relay_chain_interface.clone(), para_id, recovery_chan_rx, + relay_chain_slot_duration, ); task_manager @@ -218,6 +219,7 @@ where relay_chain_interface, para_id, recovery_chan_rx, + relay_chain_slot_duration, ); task_manager From c3ae356e47c935b61f41abac953a6c2a10d9e8a9 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Fri, 3 Feb 2023 16:40:35 +0100 Subject: [PATCH 13/27] Make retries testable --- Cargo.lock | 8 +++- client/pov-recovery/Cargo.toml | 1 + .../src/active_candidate_recovery.rs | 13 ++--- client/pov-recovery/src/lib.rs | 27 +++++++---- client/service/src/lib.rs | 22 ++++----- parachain-template/node/src/service.rs | 6 +++ polkadot-parachain/src/service.rs | 16 +++++++ test/service/Cargo.toml | 3 ++ test/service/src/cli.rs | 3 ++ test/service/src/lib.rs | 48 +++++++++++++++++++ test/service/src/main.rs | 4 ++ zombienet/tests/0002-pov_recovery.feature | 2 + zombienet/tests/0002-pov_recovery.toml | 8 ++++ 13 files changed, 133 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e76bb66c9b..ed993ed713e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -372,9 +372,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.63" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff18d764974428cf3a9328e23fc5c986f5fbed46e6cd4cdf42544df5d297ec1" +checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" dependencies = [ "proc-macro2", "quote", @@ -1850,6 +1850,7 @@ dependencies = [ name = "cumulus-client-pov-recovery" version = "0.1.0" dependencies = [ + "async-trait", "cumulus-primitives-core", "cumulus-relay-chain-interface", "cumulus-test-service", @@ -2340,6 +2341,7 @@ dependencies = [ "cumulus-client-consensus-common", "cumulus-client-consensus-relay-chain", "cumulus-client-network", + "cumulus-client-pov-recovery", "cumulus-client-service", "cumulus-primitives-core", "cumulus-primitives-parachain-inherent", @@ -2357,6 +2359,8 @@ dependencies = [ "parachains-common", "parity-scale-codec", "polkadot-cli", + "polkadot-node-subsystem", + "polkadot-overseer", "polkadot-primitives", "polkadot-service", "polkadot-test-service", diff --git a/client/pov-recovery/Cargo.toml b/client/pov-recovery/Cargo.toml index 1ec225c969d..2cce470f823 100644 --- a/client/pov-recovery/Cargo.toml +++ b/client/pov-recovery/Cargo.toml @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = # Cumulus cumulus-primitives-core = { path = "../../primitives/core" } cumulus-relay-chain-interface = {path = "../relay-chain-interface"} +async-trait = "0.1.64" [dev-dependencies] tokio = { version = "1.24.2", features = ["macros"] } diff --git a/client/pov-recovery/src/active_candidate_recovery.rs b/client/pov-recovery/src/active_candidate_recovery.rs index caae3615a85..cdbd3d85e99 100644 --- a/client/pov-recovery/src/active_candidate_recovery.rs +++ b/client/pov-recovery/src/active_candidate_recovery.rs @@ -18,12 +18,13 @@ use sp_runtime::traits::Block as BlockT; use polkadot_node_primitives::AvailableData; use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; -use polkadot_overseer::Handle as OverseerHandle; use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use std::{collections::HashSet, pin::Pin}; +use crate::RecoveryHandle; + /// The active candidate recovery. /// /// This handles the candidate recovery and tracks the activate recoveries. @@ -34,12 +35,12 @@ pub(crate) struct ActiveCandidateRecovery { >, /// The block hashes of the candidates currently being recovered. candidates: HashSet, - overseer_handle: OverseerHandle, + recovery_handle: Box, } impl ActiveCandidateRecovery { - pub fn new(overseer_handle: OverseerHandle) -> Self { - Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle } + pub fn new(recovery_handle: Box) -> Self { + Self { recoveries: Default::default(), candidates: Default::default(), recovery_handle } } /// Recover the given `candidate`. @@ -50,8 +51,8 @@ impl ActiveCandidateRecovery { ) { let (tx, rx) = oneshot::channel(); - self.overseer_handle - .send_msg( + self.recovery_handle + .recover( AvailabilityRecoveryMessage::RecoverAvailableData( candidate.receipt.clone(), candidate.session_index, diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index ca857b21744..95ef4446b07 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -47,6 +47,7 @@ use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT}; +use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{ CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, SessionIndex, @@ -74,6 +75,18 @@ use active_candidate_recovery::ActiveCandidateRecovery; const LOG_TARGET: &str = "cumulus-pov-recovery"; +#[async_trait::async_trait] +pub trait RecoveryHandle: Send { + async fn recover(&mut self, message: AvailabilityRecoveryMessage, origin: &'static str); +} + +#[async_trait::async_trait] +impl RecoveryHandle for OverseerHandle { + async fn recover(&mut self, message: AvailabilityRecoveryMessage, origin: &'static str) { + self.send_msg(message, origin).await; + } +} + /// Type of recovery to trigger. #[derive(Debug, PartialEq)] pub enum RecoveryKind { @@ -236,7 +249,7 @@ where { /// Create a new instance. pub fn new( - overseer_handle: OverseerHandle, + recovery_handle: Box, recovery_delay: RecoveryDelay, parachain_client: Arc, parachain_import_queue: Box>, @@ -248,7 +261,7 @@ where Self { candidates: HashMap::new(), candidate_recovery_queue: RecoveryQueue::new(), - active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle), + active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle), recovery_delay, waiting_for_parent: HashMap::new(), parachain_client, @@ -357,11 +370,8 @@ where self.blocks_in_retry.remove(&block_hash); data }, - None => { + None => if !self.blocks_in_retry.contains(&block_hash) { - // Retry recovery after 6 seconds. After that time, the - // block should be available. - // TODO: Double check to use relay chain slot duration here. tracing::debug!( target: LOG_TARGET, ?block_hash, @@ -376,14 +386,13 @@ where tracing::debug!( target: LOG_TARGET, ?block_hash, - "Unable to recover block after retry. Block should be available by now, this is likely a bug.", + "Unable to recover block after retry.", ); self.blocks_in_retry.remove(&block_hash); self.clear_waiting_for_parent(block_hash); self.clear_waiting_recovery(&block_hash); return - } - }, + }, }; let raw_block_data = match sp_maybe_compressed_blob::decompress( diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index d81fc178b03..c2ee75c1106 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -20,7 +20,7 @@ use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_common::ParachainConsensus; -use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelay}; +use cumulus_client_pov_recovery::{PoVRecovery, RecoveryHandle, RecoveryDelay}; use cumulus_primitives_core::{CollectCollationInfo, ParaId}; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; @@ -59,6 +59,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn pub import_queue: Box>, pub collator_key: CollatorPair, pub relay_chain_slot_duration: Duration, + pub recovery_handle: Box, } /// Start a collator node for a parachain. @@ -79,6 +80,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner import_queue, collator_key, relay_chain_slot_duration, + recovery_handle, }: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>, ) -> sc_service::error::Result<()> where @@ -113,12 +115,8 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); - let overseer_handle = relay_chain_interface - .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - let pov_recovery = PoVRecovery::new( - overseer_handle.clone(), + recovery_handle, // We want that collators wait at maximum the relay chain slot duration before starting // to recover blocks. Additionally, we wait at least half the slot time to give the // relay chain the chance to increase availability. @@ -134,6 +132,10 @@ where task_manager .spawn_essential_handle() .spawn("cumulus-pov-recovery", None, pov_recovery.run()); + + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams { runtime_api: client, block_status, @@ -158,6 +160,7 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> { pub announce_block: Arc>) + Send + Sync>, pub relay_chain_slot_duration: Duration, pub import_queue: Box>, + pub recovery_handle: Box, } /// Start a full node for a parachain. @@ -173,6 +176,7 @@ pub fn start_full_node( para_id, relay_chain_slot_duration, import_queue, + recovery_handle, }: StartFullNodeParams, ) -> sc_service::error::Result<()> where @@ -202,12 +206,8 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); - let overseer_handle = relay_chain_interface - .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - let pov_recovery = PoVRecovery::new( - overseer_handle, + recovery_handle, // Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and // in maximum 5 minutes before starting to recover blocks. Collators should already start // the recovery way before full nodes try to recover a certain block and then share the diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index e5258e675e2..4c6f61838c3 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -256,6 +256,10 @@ async fn start_node_impl( let relay_chain_slot_duration = Duration::from_secs(6); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + if validator { let parachain_consensus = build_consensus( client.clone(), @@ -284,6 +288,7 @@ async fn start_node_impl( import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), }; start_collator(params).await?; @@ -296,6 +301,7 @@ async fn start_node_impl( relay_chain_interface, relay_chain_slot_duration, import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), }; start_full_node(params)?; diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index 8f2789bf8ab..9f4a0ef825d 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -437,6 +437,10 @@ where let relay_chain_slot_duration = Duration::from_secs(6); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + if validator { let parachain_consensus = build_consensus( client.clone(), @@ -465,6 +469,7 @@ where import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), }; start_collator(params).await?; @@ -477,6 +482,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), }; start_full_node(params)?; @@ -637,6 +643,9 @@ where let relay_chain_slot_duration = Duration::from_secs(6); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; if validator { let parachain_consensus = build_consensus( client.clone(), @@ -665,6 +674,7 @@ where import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), }; start_collator(params).await?; @@ -677,6 +687,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), }; start_full_node(params)?; @@ -1410,6 +1421,9 @@ where let relay_chain_slot_duration = Duration::from_secs(6); + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; if validator { let parachain_consensus = build_consensus( client.clone(), @@ -1438,6 +1452,7 @@ where import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), }; start_collator(params).await?; @@ -1450,6 +1465,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), }; start_full_node(params)?; diff --git a/test/service/Cargo.toml b/test/service/Cargo.toml index 30668f90ab3..f01b97f4522 100644 --- a/test/service/Cargo.toml +++ b/test/service/Cargo.toml @@ -54,6 +54,8 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-test-service = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-cli = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" } # Cumulus cumulus-client-cli = { path = "../../client/cli" } @@ -70,6 +72,7 @@ cumulus-relay-chain-rpc-interface = { path = "../../client/relay-chain-rpc-inter cumulus-test-relay-validation-worker-provider = { path = "../relay-validation-worker-provider" } cumulus-test-runtime = { path = "../runtime" } cumulus-relay-chain-minimal-node = { path = "../../client/relay-chain-minimal-node" } +cumulus-client-pov-recovery = { path = "../../client/pov-recovery" } [dev-dependencies] futures = "0.3.25" diff --git a/test/service/src/cli.rs b/test/service/src/cli.rs index 31829a66d26..3cf99496576 100644 --- a/test/service/src/cli.rs +++ b/test/service/src/cli.rs @@ -49,6 +49,9 @@ pub struct TestCollatorCli { #[arg(long)] pub disable_block_announcements: bool, + + #[arg(long)] + pub fail_pov_recovery: bool, } #[derive(Debug, clap::Subcommand)] diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 42a6b594dc0..e85d67c4425 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -34,6 +34,7 @@ use cumulus_client_consensus_common::{ ParachainBlockImport as TParachainBlockImport, ParachainCandidate, ParachainConsensus, }; use cumulus_client_network::BlockAnnounceValidator; +use cumulus_client_pov_recovery::RecoveryHandle; use cumulus_client_service::{ prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams, }; @@ -45,6 +46,8 @@ use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node; use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi}; use frame_system_rpc_runtime_api::AccountNonceApi; +use polkadot_node_subsystem::{errors::RecoveryError, messages::AvailabilityRecoveryMessage}; +use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{CollatorPair, Hash as PHash, PersistedValidationData}; use polkadot_service::ProvideRuntimeApi; use sc_client_api::execution_extensions::ExecutionStrategies; @@ -76,6 +79,8 @@ pub use cumulus_test_runtime as runtime; pub use genesis::*; pub use sp_keyring::Sr25519Keyring as Keyring; +const LOG_TARGET: &str = "cumulus-test-service"; + /// A consensus that will never produce any block. #[derive(Clone)] struct NullConsensus; @@ -126,6 +131,35 @@ pub type ParachainBlockImport = TParachainBlockImport, Backen /// Transaction pool type used by the test service pub type TransactionPool = Arc>; +/// Recovery handle that fails regularly to simulate unavailable povs. +pub struct FailingRecoveryHandle { + overseer_handle: OverseerHandle, + counter: u32, +} + +impl FailingRecoveryHandle { + /// Create a new FailingRecoveryHandle + pub fn new(overseer_handle: OverseerHandle) -> Self { + Self { overseer_handle, counter: 1 } + } +} + +#[async_trait::async_trait] +impl RecoveryHandle for FailingRecoveryHandle { + async fn recover(&mut self, message: AvailabilityRecoveryMessage, origin: &'static str) { + if self.counter % 5 == 0 { + let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, back_sender) = message; + tracing::info!(target: LOG_TARGET, "Failing pov recovery."); + back_sender + .send(Err(RecoveryError::Unavailable)) + .expect("Return channel should work here."); + } else { + self.overseer_handle.send_msg(message, origin).await; + } + self.counter += 1; + } +} + /// Starts a `ServiceBuilder` for a full service. /// /// Use this macro if you don't actually need the full service, but just the builder in order to @@ -236,6 +270,7 @@ pub async fn start_node_impl( relay_chain_config: Configuration, para_id: ParaId, wrap_announce_block: Option AnnounceBlockFn>>, + fail_pov_recovery: bool, rpc_ext_builder: RB, consensus: Consensus, collator_options: CollatorOptions, @@ -320,6 +355,16 @@ where .unwrap_or_else(|| announce_block); let relay_chain_interface_for_closure = relay_chain_interface.clone(); + + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + let recovery_handle: Box = if fail_pov_recovery { + Box::new(FailingRecoveryHandle::new(overseer_handle)) + } else { + Box::new(overseer_handle) + }; + if let Some(collator_key) = collator_key { let parachain_consensus: Box> = match consensus { Consensus::RelayChain => { @@ -374,6 +419,7 @@ where collator_key, import_queue: import_queue_service, relay_chain_slot_duration: Duration::from_secs(6), + recovery_handle: recovery_handle, }; start_collator(params).await?; @@ -386,6 +432,7 @@ where relay_chain_interface, import_queue: import_queue_service, relay_chain_slot_duration: Duration::from_secs(6), + recovery_handle: recovery_handle, }; start_full_node(params)?; @@ -597,6 +644,7 @@ impl TestNodeBuilder { relay_chain_config, self.para_id, self.wrap_announce_block, + false, |_| Ok(jsonrpsee::RpcModule::new(())), self.consensus, collator_options, diff --git a/test/service/src/main.rs b/test/service/src/main.rs index 40deccc27de..760caece1ae 100644 --- a/test/service/src/main.rs +++ b/test/service/src/main.rs @@ -123,6 +123,9 @@ fn main() -> Result<(), sc_cli::Error> { "Is collating: {}", if config.role.is_authority() { "yes" } else { "no" } ); + if cli.fail_pov_recovery { + tracing::info!("PoV recovery failure enabled"); + } let collator_key = config.role.is_authority().then(|| CollatorPair::generate().0); @@ -141,6 +144,7 @@ fn main() -> Result<(), sc_cli::Error> { polkadot_config, parachain_id, cli.disable_block_announcements.then(wrap_announce_block), + cli.fail_pov_recovery, |_| Ok(jsonrpsee::RpcModule::new(())), consensus, collator_options, diff --git a/zombienet/tests/0002-pov_recovery.feature b/zombienet/tests/0002-pov_recovery.feature index be451d563c1..108f437bd3c 100644 --- a/zombienet/tests/0002-pov_recovery.feature +++ b/zombienet/tests/0002-pov_recovery.feature @@ -12,6 +12,7 @@ bob: is up within 60 seconds charlie: is up within 60 seconds one: is up within 60 seconds two: is up within 60 seconds +eve: is up within 60 seconds # wait 20 blocks and register parachain validator-3: reports block height is at least 20 within 250 seconds @@ -24,3 +25,4 @@ alice: reports block height is at least 20 within 600 seconds charlie: reports block height is at least 20 within 600 seconds one: reports block height is at least 20 within 800 seconds two: reports block height is at least 20 within 800 seconds +eve: reports block height is at least 20 within 800 seconds diff --git a/zombienet/tests/0002-pov_recovery.toml b/zombienet/tests/0002-pov_recovery.toml index cf9a7332688..0df0293e348 100644 --- a/zombienet/tests/0002-pov_recovery.toml +++ b/zombienet/tests/0002-pov_recovery.toml @@ -50,6 +50,14 @@ add_to_genesis = false command = "test-parachain" args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}","--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + # we fail recovery for eve from time to time to test retries + [[parachains.collators]] + name = "eve" + validator = true # collator + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--fail-pov-recovery", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"] + # run one as a RPC collator who does not produce blocks [[parachains.collators]] name = "one" From 508bee6a802441446e9222347137e222b4cf8110 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Fri, 3 Feb 2023 16:47:53 +0100 Subject: [PATCH 14/27] fmt --- client/service/src/lib.rs | 2 +- test/service/src/lib.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index c2ee75c1106..84c188436e9 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -20,7 +20,7 @@ use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_common::ParachainConsensus; -use cumulus_client_pov_recovery::{PoVRecovery, RecoveryHandle, RecoveryDelay}; +use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelay, RecoveryHandle}; use cumulus_primitives_core::{CollectCollationInfo, ParaId}; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index e85d67c4425..5996a03b266 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -419,7 +419,7 @@ where collator_key, import_queue: import_queue_service, relay_chain_slot_duration: Duration::from_secs(6), - recovery_handle: recovery_handle, + recovery_handle, }; start_collator(params).await?; @@ -432,7 +432,7 @@ where relay_chain_interface, import_queue: import_queue_service, relay_chain_slot_duration: Duration::from_secs(6), - recovery_handle: recovery_handle, + recovery_handle, }; start_full_node(params)?; From d40ff11fcbe0b367132d6dcd0c552072fe9e9a31 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Fri, 3 Feb 2023 17:00:41 +0100 Subject: [PATCH 15/27] Improve docs --- client/pov-recovery/src/lib.rs | 11 ++++++++--- client/service/src/lib.rs | 4 ++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 95ef4446b07..c08096d57b7 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -29,14 +29,18 @@ //! //! 1. For every included relay chain block we note the backed candidate of our parachain. If the //! block belonging to the PoV is already known, we do nothing. Otherwise we start -//! a timer that waits a random time between 0..relay_chain_slot_length before starting to recover +//! a timer that waits for a randomized time inside a specified interval before starting to recover //! the PoV. //! //! 2. If between starting and firing the timer the block is imported, we skip the recovery of the //! PoV. //! -//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol. After it -//! is recovered, we restore the block and import it. +//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol. +//! +//! 4a. After it is recovered, we restore the block and import it. +//! +//! 4b. Since we are trying to recover pending candidates, availability is not guaranteed. If the block +//! PoV is not yet available, we retry. //! //! If we need to recover multiple PoV blocks (which should hopefully not happen in real life), we //! make sure that the blocks are imported in the correct order. @@ -75,6 +79,7 @@ use active_candidate_recovery::ActiveCandidateRecovery; const LOG_TARGET: &str = "cumulus-pov-recovery"; +/// Test-friendly wrapper trait for the overseer handle. #[async_trait::async_trait] pub trait RecoveryHandle: Send { async fn recover(&mut self, message: AvailabilityRecoveryMessage, origin: &'static str); diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 84c188436e9..7a1d0582d58 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -126,7 +126,7 @@ where relay_chain_interface.clone(), para_id, recovery_chan_rx, - relay_chain_slot_duration, + relay_chain_slot_duration / 2, ); task_manager @@ -219,7 +219,7 @@ where relay_chain_interface, para_id, recovery_chan_rx, - relay_chain_slot_duration, + relay_chain_slot_duration / 2, ); task_manager From be9d523b3c65bd9a97017d6f60848110ebb1cba4 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 6 Feb 2023 10:07:42 +0100 Subject: [PATCH 16/27] Improve docs --- .../consensus/common/src/parachain_consensus.rs | 3 ++- .../pov-recovery/src/active_candidate_recovery.rs | 2 +- client/pov-recovery/src/lib.rs | 15 ++++++++++++--- test/service/src/lib.rs | 10 ++++++++-- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 6fd50ce8ccb..2f2de06ac4e 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -34,6 +34,7 @@ use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamE use std::{sync::Arc, time::Duration}; const LOG_TARGET: &str = "cumulus-consensus"; +const FINALIZATION_CACHE_SIZE: u32 = 40; // Delay range to trigger explicit requests. // The chosen value doesn't have any special meaning, a random delay within the order of @@ -118,7 +119,7 @@ where // For example, a block that has been recovered via PoV-Recovery // on a full node can have several minutes delay. With this cache // we have some "memory" of recently finalized blocks. - let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(40)); + let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE)); loop { select! { diff --git a/client/pov-recovery/src/active_candidate_recovery.rs b/client/pov-recovery/src/active_candidate_recovery.rs index cdbd3d85e99..16b33fdf5a6 100644 --- a/client/pov-recovery/src/active_candidate_recovery.rs +++ b/client/pov-recovery/src/active_candidate_recovery.rs @@ -52,7 +52,7 @@ impl ActiveCandidateRecovery { let (tx, rx) = oneshot::channel(); self.recovery_handle - .recover( + .send_recovery_msg( AvailabilityRecoveryMessage::RecoverAvailableData( candidate.receipt.clone(), candidate.session_index, diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index c08096d57b7..21ac76959d7 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -80,14 +80,23 @@ use active_candidate_recovery::ActiveCandidateRecovery; const LOG_TARGET: &str = "cumulus-pov-recovery"; /// Test-friendly wrapper trait for the overseer handle. +/// Can be used to simulate failing recovery requests. #[async_trait::async_trait] pub trait RecoveryHandle: Send { - async fn recover(&mut self, message: AvailabilityRecoveryMessage, origin: &'static str); + async fn send_recovery_msg( + &mut self, + message: AvailabilityRecoveryMessage, + origin: &'static str, + ); } #[async_trait::async_trait] impl RecoveryHandle for OverseerHandle { - async fn recover(&mut self, message: AvailabilityRecoveryMessage, origin: &'static str) { + async fn send_recovery_msg( + &mut self, + message: AvailabilityRecoveryMessage, + origin: &'static str, + ) { self.send_msg(message, origin).await; } } @@ -388,7 +397,7 @@ where .push_unordered_recovery(block_hash, self.retry_delay); return } else { - tracing::debug!( + tracing::warn!( target: LOG_TARGET, ?block_hash, "Unable to recover block after retry.", diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 5996a03b266..dbc429ae8da 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -140,13 +140,19 @@ pub struct FailingRecoveryHandle { impl FailingRecoveryHandle { /// Create a new FailingRecoveryHandle pub fn new(overseer_handle: OverseerHandle) -> Self { - Self { overseer_handle, counter: 1 } + Self { overseer_handle, counter: 0 } } } #[async_trait::async_trait] impl RecoveryHandle for FailingRecoveryHandle { - async fn recover(&mut self, message: AvailabilityRecoveryMessage, origin: &'static str) { + async fn send_recovery_msg( + &mut self, + message: AvailabilityRecoveryMessage, + origin: &'static str, + ) { + // For every 5th block we immediately return [`RecoveryError::Unavailable`] to trigger + // a retry. if self.counter % 5 == 0 { let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, back_sender) = message; tracing::info!(target: LOG_TARGET, "Failing pov recovery."); From 2bf082419e90dea651c01efad1f197623ebd9d7f Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 6 Feb 2023 10:58:59 +0100 Subject: [PATCH 17/27] Simplify RecoveryQueue --- client/pov-recovery/src/lib.rs | 59 +++++++++------------------------- 1 file changed, 15 insertions(+), 44 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 21ac76959d7..b43bde96ca2 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -152,10 +152,10 @@ struct Candidate { } struct RecoveryQueue { - // Queue that keeps the hashes of blocks to be recovered + // Queue that keeps the hashes of blocks to be recovered. recovery_queue: VecDeque, // Futures that resolve when a new recovery should be started. - signaling_queue: FuturesUnordered> + Send>>>, + signaling_queue: FuturesUnordered + Send>>>, } /// Queue that is used to decide when to start PoV-recovery operations. @@ -164,28 +164,9 @@ impl RecoveryQueue { return Self { recovery_queue: VecDeque::new(), signaling_queue: FuturesUnordered::new() } } - /// Add hash of a block that should be recovered after `delay` has passed. - /// In contrast to [`push_ordered_candidate`] this will start recovering `hash`, - /// ignoring the queue position. - pub fn push_unordered_recovery(&mut self, hash: Block::Hash, delay: Duration) { - tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Adding block to queue and adding new recovery slot in {:?} sec", - delay.as_secs(), - ); - self.signaling_queue.push( - async move { - Delay::new(delay).await; - Some(hash) - } - .boxed(), - ); - } - /// Add hash of a block that should go to the end of the recovery queue. /// A new recovery will be signaled after `delay` has passed. - pub fn push_ordered_recovery(&mut self, hash: Block::Hash, delay: Duration) { + pub fn push_recovery(&mut self, hash: Block::Hash, delay: Duration) { tracing::debug!( target: LOG_TARGET, block_hash = ?hash, @@ -196,7 +177,6 @@ impl RecoveryQueue { self.signaling_queue.push( async move { Delay::new(delay).await; - None } .boxed(), ); @@ -205,25 +185,17 @@ impl RecoveryQueue { /// Get the next hash for block recovery. pub async fn next_recovery(&mut self) -> Block::Hash { loop { - if let Some(result) = self.signaling_queue.next().await { - // If the return future resolves to `None`, we want to process the queue - match result { - Some(hash) => return hash, - None => { - if let Some(hash) = self.recovery_queue.pop_front() { - return hash - } else { - tracing::warn!( - target: LOG_TARGET, - "Recovery was signaled, but no candidate hash available." - ); - futures::pending!() - }; - }, - } - } else { - futures::pending!() + if let Some(_) = self.signaling_queue.next().await { + if let Some(hash) = self.recovery_queue.pop_front() { + return hash + } else { + tracing::warn!( + target: LOG_TARGET, + "Recovery was signaled, but no candidate hash available. This is a bug." + ); + }; } + futures::pending!() } } } @@ -393,8 +365,7 @@ where self.retry_delay ); self.blocks_in_retry.insert(block_hash); - self.candidate_recovery_queue - .push_unordered_recovery(block_hash, self.retry_delay); + self.candidate_recovery_queue.push_recovery(block_hash, self.retry_delay); return } else { tracing::warn!( @@ -577,7 +548,7 @@ where if do_recover { for hash in to_recover.into_iter().rev() { - self.candidate_recovery_queue.push_ordered_recovery(hash, delay.duration()); + self.candidate_recovery_queue.push_recovery(hash, delay.duration()); } } } From f744bece49139728bd6de0f2906ebed433ce47e8 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 6 Feb 2023 11:39:34 +0100 Subject: [PATCH 18/27] Remove unwanted changes --- .../common/src/parachain_consensus.rs | 6 +++--- client/pov-recovery/src/lib.rs | 19 +++++++------------ test/service/src/lib.rs | 3 ++- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 2f2de06ac4e..8ae657f6670 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -235,7 +235,7 @@ async fn follow_new_best( B: Backend, { let new_best_heads = match new_best_heads(relay_chain, para_id).await { - Ok(best_relay_heads_stream) => best_relay_heads_stream.fuse(), + Ok(best_heads_stream) => best_heads_stream.fuse(), Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream."); return @@ -254,8 +254,8 @@ async fn follow_new_best( select! { h = new_best_heads.next() => { match h { - Some(header) => handle_new_best_parachain_head( - header, + Some(h) => handle_new_best_parachain_head( + h, &*parachain, &mut unset_best_header, recovery_chan_tx.as_mut(), diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index b43bde96ca2..885c2d0d864 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -223,7 +223,7 @@ pub struct PoVRecovery { /// Explicit block recovery requests channel. recovery_chan_rx: Receiver>, /// Blocks that we are retrying currently - blocks_in_retry: HashSet, + candidates_in_retry: HashSet, /// Waiting time before a retry retry_delay: Duration, } @@ -254,7 +254,7 @@ where parachain_import_queue, relay_chain_interface, para_id, - blocks_in_retry: HashSet::new(), + candidates_in_retry: HashSet::new(), recovery_chan_rx, retry_delay, } @@ -353,18 +353,13 @@ where ) { let available_data = match available_data { Some(data) => { - self.blocks_in_retry.remove(&block_hash); + self.candidates_in_retry.remove(&block_hash); data }, None => - if !self.blocks_in_retry.contains(&block_hash) { - tracing::debug!( - target: LOG_TARGET, - ?block_hash, - "Retrying block recovery in {:?} seconds", - self.retry_delay - ); - self.blocks_in_retry.insert(block_hash); + if !self.candidates_in_retry.contains(&block_hash) { + tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying."); + self.candidates_in_retry.insert(block_hash); self.candidate_recovery_queue.push_recovery(block_hash, self.retry_delay); return } else { @@ -373,7 +368,7 @@ where ?block_hash, "Unable to recover block after retry.", ); - self.blocks_in_retry.remove(&block_hash); + self.candidates_in_retry.remove(&block_hash); self.clear_waiting_for_parent(block_hash); self.clear_waiting_recovery(&block_hash); return diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index dbc429ae8da..3a7e4fba32e 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -151,7 +151,7 @@ impl RecoveryHandle for FailingRecoveryHandle { message: AvailabilityRecoveryMessage, origin: &'static str, ) { - // For every 5th block we immediately return [`RecoveryError::Unavailable`] to trigger + // For every 5th block we immediately signal unavailability to trigger // a retry. if self.counter % 5 == 0 { let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, back_sender) = message; @@ -365,6 +365,7 @@ where let overseer_handle = relay_chain_interface .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + let recovery_handle: Box = if fail_pov_recovery { Box::new(FailingRecoveryHandle::new(overseer_handle)) } else { From 2512d09f92ae9bd28d69dcd8032d0b140d0ff1b4 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 6 Feb 2023 14:48:17 +0100 Subject: [PATCH 19/27] Adjust to comments --- client/consensus/common/src/parachain_consensus.rs | 6 +++--- client/pov-recovery/src/lib.rs | 14 +++++++------- client/service/src/lib.rs | 9 ++++++--- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 8ae657f6670..9fb273f3c76 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -23,7 +23,7 @@ use sp_blockchain::Error as ClientError; use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; -use cumulus_client_pov_recovery::{RecoveryDelay, RecoveryKind, RecoveryRequest}; +use cumulus_client_pov_recovery::{RecoveryDelayRange, RecoveryKind, RecoveryRequest}; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; @@ -40,8 +40,8 @@ const FINALIZATION_CACHE_SIZE: u32 = 40; // The chosen value doesn't have any special meaning, a random delay within the order of // seconds in practice should be a good enough to allow a quick recovery without DOSing // the relay chain. -const RECOVERY_DELAY: RecoveryDelay = - RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) }; +const RECOVERY_DELAY: RecoveryDelayRange = + RecoveryDelayRange { min: Duration::ZERO, max: Duration::from_secs(30) }; fn handle_new_finalized_head( parachain: &Arc

, diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 885c2d0d864..aaf35fac458 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -65,7 +65,7 @@ use futures::{ channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt, }; use futures_timer::Delay; -use rand::{thread_rng, Rng}; +use rand::{distributions::Uniform, prelude::Distribution, thread_rng}; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -118,24 +118,24 @@ pub struct RecoveryRequest { /// can be used to prevent self-DOSing if the recovery request is part of a /// distributed protocol and there is the possibility that multiple actors are /// requiring to perform the recovery action at approximately the same time. - pub delay: RecoveryDelay, + pub delay: RecoveryDelayRange, /// Recovery type. pub kind: RecoveryKind, } /// The delay between observing an unknown block and triggering the recovery of a block. #[derive(Clone, Copy)] -pub struct RecoveryDelay { +pub struct RecoveryDelayRange { /// Start recovering after `min` delay. pub min: Duration, /// Start recovering before `max` delay. pub max: Duration, } -impl RecoveryDelay { +impl RecoveryDelayRange { /// Produce a randomized duration between `min` and `max`. fn duration(&self) -> Duration { - self.min + self.max.saturating_sub(self.min).mul_f64(thread_rng().gen()) + Uniform::from(self.min..=self.max).sample(&mut thread_rng()) } } @@ -215,7 +215,7 @@ pub struct PoVRecovery { /// /// Uses parent -> blocks mapping. waiting_for_parent: HashMap>, - recovery_delay: RecoveryDelay, + recovery_delay: RecoveryDelayRange, parachain_client: Arc, parachain_import_queue: Box>, relay_chain_interface: RC, @@ -236,7 +236,7 @@ where /// Create a new instance. pub fn new( recovery_handle: Box, - recovery_delay: RecoveryDelay, + recovery_delay: RecoveryDelayRange, parachain_client: Arc, parachain_import_queue: Box>, relay_chain_interface: RCInterface, diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 7a1d0582d58..9a6ddbbc0a8 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -20,7 +20,7 @@ use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_common::ParachainConsensus; -use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelay, RecoveryHandle}; +use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandle}; use cumulus_primitives_core::{CollectCollationInfo, ParaId}; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; @@ -120,7 +120,7 @@ where // We want that collators wait at maximum the relay chain slot duration before starting // to recover blocks. Additionally, we wait at least half the slot time to give the // relay chain the chance to increase availability. - RecoveryDelay { min: relay_chain_slot_duration / 2, max: relay_chain_slot_duration }, + RecoveryDelayRange { min: relay_chain_slot_duration / 2, max: relay_chain_slot_duration }, client.clone(), import_queue, relay_chain_interface.clone(), @@ -213,7 +213,10 @@ where // the recovery way before full nodes try to recover a certain block and then share the // block with the network using "the normal way". Full nodes are just the "last resort" // for block recovery. - RecoveryDelay { min: relay_chain_slot_duration * 25, max: relay_chain_slot_duration * 50 }, + RecoveryDelayRange { + min: relay_chain_slot_duration * 25, + max: relay_chain_slot_duration * 50, + }, client, import_queue, relay_chain_interface, From 2af35d9e1845ceb48769f9b213cd50ed3ca39020 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 7 Feb 2023 19:35:45 +0100 Subject: [PATCH 20/27] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/consensus/common/src/parachain_consensus.rs | 1 + client/pov-recovery/src/lib.rs | 9 ++++----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 9fb273f3c76..f5e71eb41f1 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -140,6 +140,7 @@ where if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() { tracing::debug!( target: LOG_TARGET, + block_hash = ?imported_block.hash, "Setting newly imported block as finalized.", ); diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index aaf35fac458..e6c82e4a907 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -151,6 +151,7 @@ struct Candidate { waiting_recovery: bool, } +/// Queue that is used to decide when to start PoV-recovery operations. struct RecoveryQueue { // Queue that keeps the hashes of blocks to be recovered. recovery_queue: VecDeque, @@ -158,10 +159,9 @@ struct RecoveryQueue { signaling_queue: FuturesUnordered + Send>>>, } -/// Queue that is used to decide when to start PoV-recovery operations. impl RecoveryQueue { pub fn new() -> Self { - return Self { recovery_queue: VecDeque::new(), signaling_queue: FuturesUnordered::new() } + Self { recovery_queue: Default::default(), signaling_queue: Default::default() } } /// Add hash of a block that should go to the end of the recovery queue. @@ -189,7 +189,7 @@ impl RecoveryQueue { if let Some(hash) = self.recovery_queue.pop_front() { return hash } else { - tracing::warn!( + tracing::error!( target: LOG_TARGET, "Recovery was signaled, but no candidate hash available. This is a bug." ); @@ -357,9 +357,8 @@ where data }, None => - if !self.candidates_in_retry.contains(&block_hash) { + if self.candidates_in_retry.insert(block_hash) { tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying."); - self.candidates_in_retry.insert(block_hash); self.candidate_recovery_queue.push_recovery(block_hash, self.retry_delay); return } else { From 70ab63b7f7266df37dad788f8ec9cd4b2162ed08 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 8 Feb 2023 08:46:33 +0100 Subject: [PATCH 21/27] Move recovery delay into the queue --- .../common/src/parachain_consensus.rs | 14 ++---- client/pov-recovery/src/lib.rs | 43 ++++++++----------- client/service/src/lib.rs | 2 - 3 files changed, 22 insertions(+), 37 deletions(-) diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index f5e71eb41f1..dbf11f4836c 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -23,7 +23,7 @@ use sp_blockchain::Error as ClientError; use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; -use cumulus_client_pov_recovery::{RecoveryDelayRange, RecoveryKind, RecoveryRequest}; +use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest}; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; @@ -31,18 +31,11 @@ use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; use codec::Decode; use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt}; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; const LOG_TARGET: &str = "cumulus-consensus"; const FINALIZATION_CACHE_SIZE: u32 = 40; -// Delay range to trigger explicit requests. -// The chosen value doesn't have any special meaning, a random delay within the order of -// seconds in practice should be a good enough to allow a quick recovery without DOSing -// the relay chain. -const RECOVERY_DELAY: RecoveryDelayRange = - RecoveryDelayRange { min: Duration::ZERO, max: Duration::from_secs(30) }; - fn handle_new_finalized_head( parachain: &Arc

, finalized_head: Vec, @@ -415,8 +408,7 @@ async fn handle_new_best_parachain_head( // Best effort channel to actively encourage block recovery. // An error here is not fatal; the relay chain continuously re-announces // the best block, thus we will have other opportunities to retry. - let req = - RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full }; + let req = RecoveryRequest { hash, kind: RecoveryKind::Full }; if let Err(err) = recovery_chan_tx.try_send(req) { tracing::warn!( target: LOG_TARGET, diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index e6c82e4a907..36b40fd731d 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -114,16 +114,15 @@ pub enum RecoveryKind { pub struct RecoveryRequest { /// Hash of the last block to recover. pub hash: Block::Hash, - /// Recovery delay range. Randomizing the start of the recovery within this interval - /// can be used to prevent self-DOSing if the recovery request is part of a - /// distributed protocol and there is the possibility that multiple actors are - /// requiring to perform the recovery action at approximately the same time. - pub delay: RecoveryDelayRange, /// Recovery type. pub kind: RecoveryKind, } /// The delay between observing an unknown block and triggering the recovery of a block. +/// Randomizing the start of the recovery within this interval +/// can be used to prevent self-DOSing if the recovery request is part of a +/// distributed protocol and there is the possibility that multiple actors are +/// requiring to perform the recovery action at approximately the same time. #[derive(Clone, Copy)] pub struct RecoveryDelayRange { /// Start recovering after `min` delay. @@ -153,6 +152,7 @@ struct Candidate { /// Queue that is used to decide when to start PoV-recovery operations. struct RecoveryQueue { + recovery_delay_range: RecoveryDelayRange, // Queue that keeps the hashes of blocks to be recovered. recovery_queue: VecDeque, // Futures that resolve when a new recovery should be started. @@ -160,13 +160,18 @@ struct RecoveryQueue { } impl RecoveryQueue { - pub fn new() -> Self { - Self { recovery_queue: Default::default(), signaling_queue: Default::default() } + pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self { + Self { + recovery_delay_range, + recovery_queue: Default::default(), + signaling_queue: Default::default(), + } } /// Add hash of a block that should go to the end of the recovery queue. /// A new recovery will be signaled after `delay` has passed. - pub fn push_recovery(&mut self, hash: Block::Hash, delay: Duration) { + pub fn push_recovery(&mut self, hash: Block::Hash) { + let delay = self.recovery_delay_range.duration(); tracing::debug!( target: LOG_TARGET, block_hash = ?hash, @@ -215,7 +220,6 @@ pub struct PoVRecovery { /// /// Uses parent -> blocks mapping. waiting_for_parent: HashMap>, - recovery_delay: RecoveryDelayRange, parachain_client: Arc, parachain_import_queue: Box>, relay_chain_interface: RC, @@ -224,8 +228,6 @@ pub struct PoVRecovery { recovery_chan_rx: Receiver>, /// Blocks that we are retrying currently candidates_in_retry: HashSet, - /// Waiting time before a retry - retry_delay: Duration, } impl PoVRecovery @@ -236,19 +238,17 @@ where /// Create a new instance. pub fn new( recovery_handle: Box, - recovery_delay: RecoveryDelayRange, + recovery_delay_range: RecoveryDelayRange, parachain_client: Arc, parachain_import_queue: Box>, relay_chain_interface: RCInterface, para_id: ParaId, recovery_chan_rx: Receiver>, - retry_delay: Duration, ) -> Self { Self { candidates: HashMap::new(), - candidate_recovery_queue: RecoveryQueue::new(), + candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range), active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle), - recovery_delay, waiting_for_parent: HashMap::new(), parachain_client, parachain_import_queue, @@ -256,7 +256,6 @@ where para_id, candidates_in_retry: HashSet::new(), recovery_chan_rx, - retry_delay, } } @@ -302,11 +301,7 @@ where // If required, triggers a lazy recovery request that will eventually be blocked // if in the meantime the block is imported. - self.recover(RecoveryRequest { - hash, - delay: self.recovery_delay, - kind: RecoveryKind::Simple, - }); + self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple }); } /// Block is no longer waiting for recovery @@ -359,7 +354,7 @@ where None => if self.candidates_in_retry.insert(block_hash) { tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying."); - self.candidate_recovery_queue.push_recovery(block_hash, self.retry_delay); + self.candidate_recovery_queue.push_recovery(block_hash); return } else { tracing::warn!( @@ -500,7 +495,7 @@ where /// Attempts an explicit recovery of one or more blocks. pub fn recover(&mut self, req: RecoveryRequest) { - let RecoveryRequest { mut hash, delay, kind } = req; + let RecoveryRequest { mut hash, kind } = req; let mut to_recover = Vec::new(); let do_recover = loop { @@ -542,7 +537,7 @@ where if do_recover { for hash in to_recover.into_iter().rev() { - self.candidate_recovery_queue.push_recovery(hash, delay.duration()); + self.candidate_recovery_queue.push_recovery(hash); } } } diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 9a6ddbbc0a8..ab95352f21d 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -126,7 +126,6 @@ where relay_chain_interface.clone(), para_id, recovery_chan_rx, - relay_chain_slot_duration / 2, ); task_manager @@ -222,7 +221,6 @@ where relay_chain_interface, para_id, recovery_chan_rx, - relay_chain_slot_duration / 2, ); task_manager From 0da863872955711bbe0cabbb202fce8ce1d7925b Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 8 Feb 2023 09:18:49 +0100 Subject: [PATCH 22/27] Check for finalized number --- client/consensus/common/src/parachain_consensus.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index dbf11f4836c..17e07fb3b91 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -60,8 +60,9 @@ fn handle_new_finalized_head( let hash = header.hash(); last_seen_finalized_hashes.insert(hash, ()); - // don't finalize the same block multiple times. - if parachain.usage_info().chain.finalized_hash != hash { + + // Only finalize if we are below the incoming finalized parachain head + if parachain.usage_info().chain.finalized_number < *header.number() { tracing::debug!( target: LOG_TARGET, block_hash = ?hash, From 6f0d8efccb8fca3929e1f93602fbae1b0fda7437 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 8 Feb 2023 13:05:10 +0100 Subject: [PATCH 23/27] Clean up --- .../src/active_candidate_recovery.rs | 2 +- client/pov-recovery/src/lib.rs | 34 ++++++++----------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/client/pov-recovery/src/active_candidate_recovery.rs b/client/pov-recovery/src/active_candidate_recovery.rs index 16b33fdf5a6..f96646da673 100644 --- a/client/pov-recovery/src/active_candidate_recovery.rs +++ b/client/pov-recovery/src/active_candidate_recovery.rs @@ -92,7 +92,7 @@ impl ActiveCandidateRecovery { } /// Returns if the given `candidate` is being recovered. - pub fn is_being_recovered(&self, candidate: &Block::Hash) -> bool { + pub fn is_recovering(&self, candidate: &Block::Hash) -> bool { self.candidates.contains(candidate) } diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 36b40fd731d..a944e7531f3 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -328,9 +328,9 @@ where } } - /// Clear `waiting_for_parent` from the given `hash` and do this recursively for all child - /// blocks. - fn clear_waiting_for_parent(&mut self, hash: Block::Hash) { + /// Clear `waiting_for_parent` and `waiting_recovery` for the candidate with `hash`. + /// Also clears children blocks waiting for this parent. + fn reset_candidate(&mut self, hash: Block::Hash) { let mut blocks_to_delete = vec![hash]; while let Some(delete) = blocks_to_delete.pop() { @@ -338,6 +338,7 @@ where blocks_to_delete.extend(childs.iter().map(BlockT::hash)); } } + self.clear_waiting_recovery(&hash); } /// Handle a recovered candidate. @@ -363,8 +364,7 @@ where "Unable to recover block after retry.", ); self.candidates_in_retry.remove(&block_hash); - self.clear_waiting_for_parent(block_hash); - self.clear_waiting_recovery(&block_hash); + self.reset_candidate(block_hash); return }, }; @@ -377,9 +377,7 @@ where Err(error) => { tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV"); - self.clear_waiting_for_parent(block_hash); - self.clear_waiting_recovery(&block_hash); - + self.reset_candidate(block_hash); return }, }; @@ -393,9 +391,7 @@ where "Failed to decode parachain block data from recovered PoV", ); - self.clear_waiting_recovery(&block_hash); - self.clear_waiting_for_parent(block_hash); - + self.reset_candidate(block_hash); return }, }; @@ -406,10 +402,11 @@ where match self.parachain_client.block_status(parent) { Ok(BlockStatus::Unknown) => { - // If the parent block is currently being recovered or is scheduled to be recovered, we want to wait for the parent. + // If the parent block is currently being recovered or is scheduled to be recovered, + // we want to wait for the parent. let parent_scheduled_for_recovery = self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery); - if self.active_candidate_recovery.is_being_recovered(&parent) || + if self.active_candidate_recovery.is_recovering(&parent) || parent_scheduled_for_recovery { tracing::debug!( @@ -430,9 +427,7 @@ where "Parent not found while trying to import recovered block.", ); - self.clear_waiting_for_parent(block_hash); - self.clear_waiting_recovery(&block_hash); - + self.reset_candidate(block_hash); return } }, @@ -444,15 +439,16 @@ where "Error while checking block status", ); - self.clear_waiting_for_parent(block_hash); - self.clear_waiting_recovery(&block_hash); - + self.reset_candidate(block_hash); return }, // Any other status is fine to "ignore/accept" _ => (), } + // This is not necessary in the happy case, as the flag will be cleared on import + // If import fails for any reason however, we still want to have the flag cleared. + self.clear_waiting_recovery(&block_hash); self.import_block(block).await; } From 3809eed840d3a09d54212f99486782ff80cdc1c9 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 9 Feb 2023 09:11:25 +0100 Subject: [PATCH 24/27] Use timer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/pov-recovery/src/lib.rs | 41 +++++++++++++++------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index a944e7531f3..733c979e5db 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -155,8 +155,7 @@ struct RecoveryQueue { recovery_delay_range: RecoveryDelayRange, // Queue that keeps the hashes of blocks to be recovered. recovery_queue: VecDeque, - // Futures that resolve when a new recovery should be started. - signaling_queue: FuturesUnordered + Send>>>, + timer: Option, } impl RecoveryQueue { @@ -164,7 +163,7 @@ impl RecoveryQueue { Self { recovery_delay_range, recovery_queue: Default::default(), - signaling_queue: Default::default(), + timer: Default::default(), } } @@ -172,35 +171,31 @@ impl RecoveryQueue { /// A new recovery will be signaled after `delay` has passed. pub fn push_recovery(&mut self, hash: Block::Hash) { let delay = self.recovery_delay_range.duration(); - tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Adding block to queue and adding new recovery slot in {:?} sec", - delay.as_secs(), - ); self.recovery_queue.push_back(hash); - self.signaling_queue.push( - async move { - Delay::new(delay).await; - } - .boxed(), - ); + if self.timer.is_none() { + self.timer = Some(Delay::new(delay)); + } } /// Get the next hash for block recovery. pub async fn next_recovery(&mut self) -> Block::Hash { loop { - if let Some(_) = self.signaling_queue.next().await { + if let Some(timer) = &mut self.timer { + (&mut timer).await; + if let Some(hash) = self.recovery_queue.pop_front() { + if self.recovery_queue.is_empty() { + drop(timer); + self.timer.take(); + } else { + timer.reset(self.recovery_delay_range.duration()); + } + return hash - } else { - tracing::error!( - target: LOG_TARGET, - "Recovery was signaled, but no candidate hash available. This is a bug." - ); - }; + } + } else { + futures::pending!(); } - futures::pending!() } } } From 5bf500a169a9cbb7351f4baed6bfffae7c7f1ec1 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 8 Feb 2023 17:43:10 +0100 Subject: [PATCH 25/27] Simplify implementation --- client/pov-recovery/src/lib.rs | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 733c979e5db..61fa2acdcd8 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -401,9 +401,7 @@ where // we want to wait for the parent. let parent_scheduled_for_recovery = self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery); - if self.active_candidate_recovery.is_recovering(&parent) || - parent_scheduled_for_recovery - { + if parent_scheduled_for_recovery { tracing::debug!( target: LOG_TARGET, ?block_hash, @@ -441,9 +439,6 @@ where _ => (), } - // This is not necessary in the happy case, as the flag will be cleared on import - // If import fails for any reason however, we still want to have the flag cleared. - self.clear_waiting_recovery(&block_hash); self.import_block(block).await; } @@ -489,7 +484,7 @@ where let RecoveryRequest { mut hash, kind } = req; let mut to_recover = Vec::new(); - let do_recover = loop { + loop { let candidate = match self.candidates.get_mut(&hash) { Some(candidate) => candidate, None => { @@ -498,7 +493,7 @@ where block_hash = ?hash, "Cound not recover. Block was never announced as candidate" ); - break false + return }, }; @@ -507,29 +502,28 @@ where candidate.waiting_recovery = true; to_recover.push(hash); }, - Ok(_) => break true, + Ok(_) => break, Err(e) => { + candidate.waiting_recovery = false; tracing::error!( target: LOG_TARGET, error = ?e, block_hash = ?hash, "Failed to get block status", ); - break false + return }, } if kind == RecoveryKind::Simple { - break true + break } hash = candidate.parent_hash; - }; + } - if do_recover { - for hash in to_recover.into_iter().rev() { - self.candidate_recovery_queue.push_recovery(hash); - } + for hash in to_recover.into_iter().rev() { + self.candidate_recovery_queue.push_recovery(hash); } } From 49710ce04b3c9de20ad46aac66cde04f617447b8 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 9 Feb 2023 10:39:25 +0100 Subject: [PATCH 26/27] Revert "Use timer" This reverts commit 3809eed840d3a09d54212f99486782ff80cdc1c9. --- client/pov-recovery/src/lib.rs | 41 +++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 61fa2acdcd8..8be1ce00a19 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -155,7 +155,8 @@ struct RecoveryQueue { recovery_delay_range: RecoveryDelayRange, // Queue that keeps the hashes of blocks to be recovered. recovery_queue: VecDeque, - timer: Option, + // Futures that resolve when a new recovery should be started. + signaling_queue: FuturesUnordered + Send>>>, } impl RecoveryQueue { @@ -163,7 +164,7 @@ impl RecoveryQueue { Self { recovery_delay_range, recovery_queue: Default::default(), - timer: Default::default(), + signaling_queue: Default::default(), } } @@ -171,31 +172,35 @@ impl RecoveryQueue { /// A new recovery will be signaled after `delay` has passed. pub fn push_recovery(&mut self, hash: Block::Hash) { let delay = self.recovery_delay_range.duration(); + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Adding block to queue and adding new recovery slot in {:?} sec", + delay.as_secs(), + ); self.recovery_queue.push_back(hash); - if self.timer.is_none() { - self.timer = Some(Delay::new(delay)); - } + self.signaling_queue.push( + async move { + Delay::new(delay).await; + } + .boxed(), + ); } /// Get the next hash for block recovery. pub async fn next_recovery(&mut self) -> Block::Hash { loop { - if let Some(timer) = &mut self.timer { - (&mut timer).await; - + if let Some(_) = self.signaling_queue.next().await { if let Some(hash) = self.recovery_queue.pop_front() { - if self.recovery_queue.is_empty() { - drop(timer); - self.timer.take(); - } else { - timer.reset(self.recovery_delay_range.duration()); - } - return hash - } - } else { - futures::pending!(); + } else { + tracing::error!( + target: LOG_TARGET, + "Recovery was signaled, but no candidate hash available. This is a bug." + ); + }; } + futures::pending!() } } } From 78ccb2a211620903932b4c32126cbb30ac06b1f7 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 9 Feb 2023 10:57:00 +0100 Subject: [PATCH 27/27] Properly clear `to_recover` flag --- client/pov-recovery/src/active_candidate_recovery.rs | 5 ----- client/pov-recovery/src/lib.rs | 4 +++- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/client/pov-recovery/src/active_candidate_recovery.rs b/client/pov-recovery/src/active_candidate_recovery.rs index f96646da673..feb09d005ce 100644 --- a/client/pov-recovery/src/active_candidate_recovery.rs +++ b/client/pov-recovery/src/active_candidate_recovery.rs @@ -91,11 +91,6 @@ impl ActiveCandidateRecovery { ); } - /// Returns if the given `candidate` is being recovered. - pub fn is_recovering(&self, candidate: &Block::Hash) -> bool { - self.candidates.contains(candidate) - } - /// Waits for the next recovery. /// /// If the returned [`AvailableData`] is `None`, it means that the recovery failed. diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 8be1ce00a19..60fbdab310c 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -509,13 +509,15 @@ where }, Ok(_) => break, Err(e) => { - candidate.waiting_recovery = false; tracing::error!( target: LOG_TARGET, error = ?e, block_hash = ?hash, "Failed to get block status", ); + for hash in to_recover { + self.clear_waiting_recovery(&hash); + } return }, }