From 731762450a59b0af94200156bf48458c9e4921b8 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 17 Apr 2023 21:09:49 +0200 Subject: [PATCH 1/2] Allow to skip availability-store --- node/network/availability-recovery/src/lib.rs | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 9d6092ef84e5..6a6c925ff5fa 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -101,6 +101,12 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100); /// The Availability Recovery Subsystem. pub struct AvailabilityRecoverySubsystem { + /// Do not request data from the availability store. + /// This is the useful for nodes where the + /// availability-store subsystem is not expected to run, + /// such as collators. + bypass_availability_store: bool, + fast_path: bool, /// Receiver for available data requests. req_receiver: IncomingRequestReceiver, @@ -147,6 +153,9 @@ struct RecoveryParams { /// Metrics to report metrics: Metrics, + + /// Do not request data from availability-store + bypass_availability_store: bool, } /// Source the availability data either by means @@ -384,6 +393,7 @@ impl RequestChunksFromValidators { metrics.on_chunk_request_succeeded(); gum::trace!( target: LOG_TARGET, + candidate_hash = ?params.candidate_hash, validator_index = ?chunk.index, "Received valid chunk", @@ -467,7 +477,7 @@ impl RequestChunksFromValidators { let metrics = ¶ms.metrics; // First query the store for any chunks we've got. - { + if !params.bypass_availability_store { let (tx, rx) = oneshot::channel(); sender .send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx)) @@ -668,7 +678,7 @@ where { async fn run(mut self) -> Result { // First just see if we have the data available locally. - { + if !self.params.bypass_availability_store { let (tx, rx) = oneshot::channel(); self.sender .send_message(AvailabilityStoreMessage::QueryAvailableData( @@ -856,6 +866,7 @@ async fn launch_recovery_task( receipt: CandidateReceipt, backing_group: Option, response_sender: oneshot::Sender>, + bypass_availability_store: bool, metrics: &Metrics, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -867,6 +878,7 @@ async fn launch_recovery_task( candidate_hash, erasure_root: receipt.descriptor.erasure_root, metrics: metrics.clone(), + bypass_availability_store, }; let phase = backing_group @@ -906,6 +918,7 @@ async fn handle_recover( session_index: SessionIndex, backing_group: Option, response_sender: oneshot::Sender>, + bypass_availability_store: bool, metrics: &Metrics, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -949,6 +962,7 @@ async fn handle_recover( receipt, backing_group, response_sender, + bypass_availability_store, metrics, ) .await, @@ -977,13 +991,22 @@ async fn query_full_data( #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)] impl AvailabilityRecoverySubsystem { + /// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the + /// `AvailabilityStoreSubsystem` subsystem. + pub fn with_availability_store_skip( + req_receiver: IncomingRequestReceiver, + metrics: Metrics, + ) -> Self { + Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics } + } + /// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to /// request data from backers. pub fn with_fast_path( req_receiver: IncomingRequestReceiver, metrics: Metrics, ) -> Self { - Self { fast_path: true, req_receiver, metrics } + Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics } } /// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks @@ -991,12 +1014,12 @@ impl AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver, metrics: Metrics, ) -> Self { - Self { fast_path: false, req_receiver, metrics } + Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics } } async fn run(self, mut ctx: Context) -> SubsystemResult<()> { let mut state = State::default(); - let Self { fast_path, mut req_receiver, metrics } = self; + let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self; loop { let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse(); @@ -1025,6 +1048,7 @@ impl AvailabilityRecoverySubsystem { session_index, maybe_backing_group.filter(|_| fast_path), response_sender, + bypass_availability_store, &metrics, ).await { gum::warn!( @@ -1041,6 +1065,14 @@ impl AvailabilityRecoverySubsystem { in_req = recv_req => { match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? { Ok(req) => { + if bypass_availability_store { + gum::debug!( + target: LOG_TARGET, + "Skipping request to availability-store.", + ); + let _ = req.send_response(None.into()); + continue + } match query_full_data(&mut ctx, req.payload.candidate_hash).await { Ok(res) => { let _ = req.send_response(res.into()); From 70f480b4cc6c865fa580c98ab495cdddbef332e6 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 25 Apr 2023 14:48:50 +0200 Subject: [PATCH 2/2] Update node/network/availability-recovery/src/lib.rs Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> --- node/network/availability-recovery/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 6a6c925ff5fa..f028080d10e5 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -393,7 +393,6 @@ impl RequestChunksFromValidators { metrics.on_chunk_request_succeeded(); gum::trace!( target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, validator_index = ?chunk.index, "Received valid chunk",