Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
df828ac
Indentation fix.
eskimor Mar 18, 2021
06d4d90
Prepare request-response for PoV fetching.
eskimor Mar 18, 2021
6a940eb
Drop old PoV distribution.
eskimor Mar 18, 2021
a7fc368
WIP: Fetch PoV directly from backing.
eskimor Mar 18, 2021
9847b81
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 19, 2021
e03ff75
Backing compiles.
eskimor Mar 19, 2021
a49b4d4
Runtime access and connection management for PoV distribution.
eskimor Mar 23, 2021
545e950
Get rid of seemingly dead code.
eskimor Mar 23, 2021
47d9f5f
Implement PoV fetching.
eskimor Mar 23, 2021
0a283ab
Don't send `ConnectToValidators` for empty list.
eskimor Mar 24, 2021
afd795f
Even better - no need to check over and over again.
eskimor Mar 24, 2021
1c3eec8
PoV fetching implemented.
eskimor Mar 24, 2021
fb24855
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 24, 2021
cceddce
Check PoV hash upon reception.
eskimor Mar 24, 2021
89f0bf9
Implement retry of PoV fetching in backing.
eskimor Mar 25, 2021
ab75fea
Avoid pointless validation spawning.
eskimor Mar 25, 2021
10da891
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 25, 2021
3915a57
Add jaeger span to pov requesting.
eskimor Mar 25, 2021
fa6409e
Add back tracing.
eskimor Mar 25, 2021
8b9c2d4
Review remarks.
eskimor Mar 25, 2021
2d27be5
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 25, 2021
4af7d2e
Whitespace.
eskimor Mar 25, 2021
5c09829
Whitespace again.
eskimor Mar 26, 2021
ea9bde4
Cleanup + fix tests.
eskimor Mar 27, 2021
4207eaf
Log to log target in overseer.
eskimor Mar 27, 2021
3691061
Fix more tests.
eskimor Mar 27, 2021
b1a201a
Don't fail if group cannot be found.
eskimor Mar 27, 2021
298fe9d
Simple test for PoV fetcher.
eskimor Mar 27, 2021
af9f12c
Handle missing group membership better.
eskimor Mar 27, 2021
0c30792
Add test for retry functionality.
eskimor Mar 27, 2021
eb47465
Fix flaky test.
eskimor Mar 27, 2021
071bcca
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 27, 2021
3fa5791
Spaces again.
eskimor Mar 28, 2021
82d4a11
Guide updates.
eskimor Mar 28, 2021
b58a2ab
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 28, 2021
a0609e7
Spaces.
eskimor Mar 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Runtime access and connection management for PoV distribution.
  • Loading branch information
eskimor committed Mar 23, 2021
commit a49b4d431b2a58b525df2b67199f734f23017d42
35 changes: 26 additions & 9 deletions node/network/availability-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use thiserror::Error;
use futures::channel::oneshot;

use polkadot_node_subsystem_util::Error as UtilError;
use polkadot_primitives::v1::SessionIndex;
use polkadot_primitives::v1::{CompressedPoVError, SessionIndex};
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};

use crate::LOG_TARGET;

/// Errors of this subsystem.
#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -56,24 +58,28 @@ pub enum Error {
/// Sending response failed.
#[error("Sending a request's response failed.")]
SendResponse,
}

/// Error that we should handle gracefully by logging it.
#[derive(Debug)]
pub enum NonFatalError {
/// Some request to utility functions failed.
/// This can be either `RuntimeRequestCanceled` or `RuntimeApiError`.
#[error("Utility request failed")]
UtilRequest(UtilError),

/// Runtime API subsystem is down, which means we're shutting down.
#[error("Runtime request canceled")]
RuntimeRequestCanceled(oneshot::Canceled),

/// Some request to the runtime failed.
/// For example if we prune a block we're requesting info about.
#[error("Runtime API error")]
RuntimeRequest(RuntimeApiError),

/// We tried fetching a session info which was not available.
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),

/// Decompressing PoV failed.
#[error("PoV could not be decompressed")]
PoVDecompression(CompressedPoVError),
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -90,9 +96,20 @@ pub(crate) async fn recv_runtime<V>(
oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
UtilError,
>,
) -> std::result::Result<V, NonFatalError> {
r.map_err(NonFatalError::UtilRequest)?
) -> std::result::Result<V, Error> {
r.map_err(Error::UtilRequest)?
.await
.map_err(NonFatalError::RuntimeRequestCanceled)?
.map_err(NonFatalError::RuntimeRequest)
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
}


/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) {
if let Err(error) = result {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
}
}
35 changes: 24 additions & 11 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,20 @@ use polkadot_subsystem::{
/// Error and [`Result`] type for this subsystem.
mod error;
pub use error::Error;
use error::Result;
use error::{Result, log_error};

/// Runtime requests.
mod runtime;
use runtime::Runtime;

/// `Requester` taking care of requesting chunks for candidates pending availability.
mod requester;
use requester::Requester;

/// Handing requests for PoVs during backing.
mod pov_requester;
use pov_requester::PoVRequester;

/// Responding to erasure chunk requests:
mod responder;
use responder::{answer_chunk_request_log, answer_pov_request_log};
Expand All @@ -52,6 +60,8 @@ const LOG_TARGET: &'static str = "parachain::availability-distribution";
pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: SyncCryptoStorePtr,
/// Easy and efficient runtime access for this subsystem.
runtime: Runtime,
/// Prometheus metrics.
metrics: Metrics,
}
Expand All @@ -74,17 +84,20 @@ where
}

impl AvailabilityDistributionSubsystem {

/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
let runtime = Runtime::new(keystore.clone());
Self { keystore, runtime, metrics }
}

/// Start processing work as passed on from the Overseer.
async fn run<Context>(self, mut ctx: Context) -> Result<()>
async fn run<Context>(mut self, mut ctx: Context) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
let mut requester = Requester::new(self.keystore.clone(), self.metrics.clone()).fuse();
let mut pov_requester = PoVRequester::new();
loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
Expand All @@ -107,14 +120,14 @@ impl AvailabilityDistributionSubsystem {
};
match message {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
// Update the relay chain heads we are fetching our pieces for:
if let Some(e) = requester
.get_mut()
.update_fetching_heads(&mut ctx, update)
.await?
{
tracing::debug!(target: LOG_TARGET, "Error processing ActiveLeavesUpdate: {:?}", e);
}
log_error(
pov_requester.update_connected_validators(&mut ctx, &mut self.runtime, &update).await,
"PoVRequester::update_connected_validators"
);
log_error(
requester.get_mut().update_fetching_heads(&mut ctx, update).await,
"Error in Requester::update_fetching_heads"
);
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => {
Expand Down
128 changes: 128 additions & 0 deletions node/network/availability-distribution/src/pov_requester/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! PoV requester takes care of requesting PoVs from validators of a backing group.

use futures::channel::mpsc;
use lru::LruCache;

use polkadot_node_network_protocol::{PeerId, peer_set::PeerSet};
use polkadot_primitives::v1::{AuthorityDiscoveryId, Hash, SessionIndex};
use polkadot_subsystem::{ActiveLeavesUpdate, SubsystemContext, messages::{AllMessages, NetworkBridgeMessage}};

use crate::runtime::Runtime;

/// Number of sessions we want to keep in the LRU.
const NUM_SESSIONS: usize = 2;

pub struct PoVRequester {

/// We only ever care about being connected to validators of at most two sessions.
///
/// So we keep an LRU for managing connection requests of size 2.
connected_validators: LruCache<SessionIndex, mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,
}

impl PoVRequester {
/// Create a new requester for PoVs.
pub fn new() -> Self {
Self {
connected_validators: LruCache::new(NUM_SESSIONS),
}
}

/// Make sure we are connected to the right set of validators.
///
/// On every `ActiveLeavesUpdate`, we check whether we are connected properly to our current
/// validator group.
pub async fn update_connected_validators<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut Runtime,
update: &ActiveLeavesUpdate,
) -> super::Result<()>
where
Context: SubsystemContext,
{
let activated = update.activated.iter().map(|(h, _)| h);
let activated_sessions =
get_activated_sessions(ctx, runtime, activated).await?;

for (parent, session_index) in activated_sessions {
if self.connected_validators.contains(&session_index) {
continue
}
self.connected_validators.put(
session_index,
connect_to_relevant_validators(ctx, runtime, parent, session_index).await?
);
}
Ok(())
}

}

async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut Runtime, new_heads: impl Iterator<Item = &Hash>)
-> super::Result<impl Iterator<Item = (Hash, SessionIndex)>>
where
Context: SubsystemContext,
{
let mut sessions = Vec::new();
for parent in new_heads {
sessions.push((*parent, runtime.get_session_index(ctx, *parent).await?));
}
Ok(sessions.into_iter())
}

async fn connect_to_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
parent: Hash,
session: SessionIndex
)
-> super::Result<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>
where
Context: SubsystemContext,
{
let validator_ids = determine_relevant_validators(ctx, runtime, parent, session).await?;
// We don't actually care about `PeerId`s, just keeping receiver so we stay connected:
let (tx, rx) = mpsc::channel(0);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
validator_ids, peer_set: PeerSet::Validation, connected: tx
})).await;
Ok(rx)
}

async fn determine_relevant_validators<Context>(
ctx: &mut Context,
runtime: &mut Runtime,
parent: Hash,
session: SessionIndex,
)
-> super::Result<Vec<AuthorityDiscoveryId>>
where
Context: SubsystemContext,
{
let info = runtime.get_session_info_by_index(ctx, parent, session).await?;
if let Some(validator_info) = &info.validator_info {
let indeces = info.session_info.validator_groups.get(validator_info.our_group.0 as usize)
.expect("Our group got retrieved from that session info, it must exist. qed.")
.clone();
Ok(indeces.into_iter().map(|i| info.session_info.discovery_keys[i.0 as usize].clone()).collect())
} else {
Ok(Vec::new())
}
}
35 changes: 13 additions & 22 deletions node/network/availability-distribution/src/requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use polkadot_subsystem::{
};

use super::{error::recv_runtime, session_cache::SessionCache, LOG_TARGET, Metrics};
use crate::error::NonFatalError;
use crate::error::Error;

/// A task fetching a particular chunk.
mod fetch_task;
Expand Down Expand Up @@ -97,7 +97,7 @@ impl Requester {
&mut self,
ctx: &mut Context,
update: ActiveLeavesUpdate,
) -> super::Result<Option<NonFatalError>>
) -> super::Result<()>
where
Context: SubsystemContext,
{
Expand All @@ -107,30 +107,25 @@ impl Requester {
} = update;
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
let err = self.start_requesting_chunks(ctx, activated.into_iter()).await?;
self.start_requesting_chunks(ctx, activated.into_iter()).await?;
self.stop_requesting_chunks(deactivated.into_iter());
Ok(err)
Ok(())
}

/// Start requesting chunks for newly imported heads.
async fn start_requesting_chunks<Context>(
&mut self,
ctx: &mut Context,
new_heads: impl Iterator<Item = (Hash, Arc<jaeger::Span>)>,
) -> super::Result<Option<NonFatalError>>
) -> super::Result<()>
where
Context: SubsystemContext,
{
for (leaf, _) in new_heads {
let cores = match query_occupied_cores(ctx, leaf).await {
Err(err) => return Ok(Some(err)),
Ok(cores) => cores,
};
if let Some(err) = self.add_cores(ctx, leaf, cores).await? {
return Ok(Some(err));
}
let cores = query_occupied_cores(ctx, leaf).await?;
self.add_cores(ctx, leaf, cores).await?;
}
Ok(None)
Ok(())
}

/// Stop requesting chunks for obsolete heads.
Expand All @@ -155,7 +150,7 @@ impl Requester {
ctx: &mut Context,
leaf: Hash,
cores: impl IntoIterator<Item = OccupiedCore>,
) -> super::Result<Option<NonFatalError>>
) -> super::Result<()>
where
Context: SubsystemContext,
{
Expand All @@ -170,7 +165,7 @@ impl Requester {
let tx = self.tx.clone();
let metrics = self.metrics.clone();

let task_cfg = match self
let task_cfg = self
.session_cache
.with_session_info(
ctx,
Expand All @@ -180,11 +175,7 @@ impl Requester {
leaf,
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info),
)
.await
{
Err(err) => return Ok(Some(err)),
Ok(task_cfg) => task_cfg,
};
.await?;

if let Some(task_cfg) = task_cfg {
e.insert(FetchTask::start(task_cfg, ctx).await?);
Expand All @@ -193,7 +184,7 @@ impl Requester {
}
}
}
Ok(None)
Ok(())
}
}

Expand Down Expand Up @@ -228,7 +219,7 @@ impl Stream for Requester {
async fn query_occupied_cores<Context>(
ctx: &mut Context,
relay_parent: Hash,
) -> Result<Vec<OccupiedCore>, NonFatalError>
) -> Result<Vec<OccupiedCore>, Error>
where
Context: SubsystemContext,
{
Expand Down
Loading