Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#![warn(missing_docs)]

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;

use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
Expand Down Expand Up @@ -78,7 +78,7 @@ struct RequestFromBackersPhase {
struct RequestChunksPhase {
// a random shuffling of the validators which indicates the order in which we connect to the validators and
// request the chunk from them.
shuffling: Vec<ValidatorIndex>,
shuffling: VecDeque<ValidatorIndex>,
received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
requesting_chunks: FuturesUnordered<BoxFuture<
'static,
Expand Down Expand Up @@ -198,20 +198,33 @@ impl RequestChunksPhase {
shuffling.shuffle(&mut rand::thread_rng());

RequestChunksPhase {
shuffling,
shuffling: shuffling.into(),
received_chunks: HashMap::new(),
requesting_chunks: FuturesUnordered::new(),
}
}

fn is_unavailable(&self, params: &InteractionParams) -> bool {
is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.len(),
self.shuffling.len(),
params.threshold,
)
}

fn can_conclude(&self, params: &InteractionParams) -> bool {
self.received_chunks.len() >= params.threshold || self.is_unavailable(params)
}

async fn launch_parallel_requests(
&mut self,
params: &InteractionParams,
sender: &mut impl SubsystemSender,
) {
let max_requests = std::cmp::min(N_PARALLEL, params.threshold);
while self.requesting_chunks.len() < max_requests {
if let Some(validator_index) = self.shuffling.pop() {
if let Some(validator_index) = self.shuffling.pop_back() {
let validator = params.validator_authority_keys[validator_index.0 as usize].clone();
tracing::trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -255,10 +268,8 @@ impl RequestChunksPhase {
&mut self,
params: &InteractionParams,
) {
// Poll for new updates from requesting_chunks.
while let Poll::Ready(Some(request_result))
= futures::poll!(self.requesting_chunks.next())
{
// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
while let Some(request_result) = self.requesting_chunks.next().await {
match request_result {
Ok(Some(chunk)) => {
// Check merkle proofs of any received chunks.
Expand Down Expand Up @@ -306,11 +317,15 @@ impl RequestChunksPhase {
match e {
RequestError::InvalidResponse(_) => {}
RequestError::NetworkError(_) | RequestError::Canceled(_) => {
self.shuffling.push(validator_index);
self.shuffling.push_front(validator_index);
}
}
}
}

// Stop waiting for requests when we either can already recover the data
// or have gotten firm 'No' responses from enough validators.
if self.can_conclude(params) { break }
}
}

Expand All @@ -320,12 +335,7 @@ impl RequestChunksPhase {
sender: &mut impl SubsystemSender,
) -> Result<AvailableData, RecoveryError> {
loop {
if is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.len(),
self.shuffling.len(),
params.threshold,
) {
if self.is_unavailable(&params) {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Loop:

* If the phase is `InteractionPhase::RequestChunks`:
* If `received_chunks + requesting_chunks + shuffling` lengths are less than the threshold, break and return `Err(Unavailable)`.
* Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, push onto the back of `shuffling` to be retried.
* Poll for new updates from `requesting_chunks`. Check merkle proofs of any received chunks. If the request simply fails due to network issues, insert into the front of `shuffling` to be retried.
* If `received_chunks` has more than `threshold` entries, attempt to recover the data. If that fails, or a re-encoding produces an incorrect erasure-root, break and issue a `Err(RecoveryError::Invalid)`. If correct, break and issue `Ok(available_data)`.
* While there are fewer than `N_PARALLEL` entries in `requesting_chunks`,
* Pop the next item from `shuffling`. If it's empty and `requesting_chunks` is empty, return `Err(RecoveryError::Unavailable)`.
Expand Down