Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
6f2e0e9
WIP
eskimor Feb 4, 2021
da850f7
availability distribution, still very wip.
eskimor Feb 9, 2021
fbf0ec1
Some docs on what I intend to do.
eskimor Feb 9, 2021
ac543c1
Checkpoint of session cache implementation
eskimor Feb 10, 2021
07f6bc3
More work, mostly on cache
eskimor Feb 11, 2021
ef84ea5
Only derive MallocSizeOf and Debug for std.
eskimor Feb 12, 2021
1e35804
availability-distribution: Cache feature complete.
eskimor Feb 12, 2021
d8fda81
Sketch out logic in `FetchTask` for actual fetching.
eskimor Feb 13, 2021
47036c9
Format cleanup.
eskimor Feb 13, 2021
4ad902f
More format fixes.
eskimor Feb 13, 2021
fee9476
Almost feature complete `fetch_task`.
eskimor Feb 15, 2021
b9aa906
Finish FetchTask so far.
eskimor Feb 15, 2021
a65562f
Directly use AuthorityDiscoveryId in protocol and cache.
eskimor Feb 16, 2021
4a43561
Resolve `AuthorityDiscoveryId` on sending requests.
eskimor Feb 16, 2021
6543b30
Rework fetch_task
eskimor Feb 17, 2021
256e559
From<u32> implementation for `ValidatorIndex`.
eskimor Feb 17, 2021
f8d5fef
Fixes and more integration work.
eskimor Feb 17, 2021
5e77fb4
Make session cache proper lru cache.
eskimor Feb 18, 2021
72704ee
Use proper lru cache.
eskimor Feb 18, 2021
60a2faf
Requester finished.
eskimor Feb 18, 2021
452b55f
ProtocolState -> Requester
eskimor Feb 18, 2021
2b9b983
Cleanup + fixes.
eskimor Feb 18, 2021
d683f10
Remove unused functions
eskimor Feb 18, 2021
d7a8a31
availability-distribution responding side.
eskimor Feb 19, 2021
3fed607
Cleanup + Fixes.
eskimor Feb 19, 2021
39d6bc2
More fixes.
eskimor Feb 19, 2021
49b1764
More fixes.
eskimor Feb 19, 2021
a141330
Some docs.
eskimor Feb 19, 2021
fad4586
Docs.
eskimor Feb 19, 2021
e617e91
Fix reporting of bad guys.
eskimor Feb 19, 2021
789a3e9
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 19, 2021
a4eef9b
Fix tests
eskimor Feb 19, 2021
ea5f6a4
Make all tests compile.
eskimor Feb 19, 2021
00e2f69
Fix test.
eskimor Feb 19, 2021
c837d98
Cleanup + get rid of some warnings.
eskimor Feb 19, 2021
09fa9fe
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 22, 2021
8945fbb
state -> requester
eskimor Feb 22, 2021
c9984fb
Mostly doc fixes.
eskimor Feb 22, 2021
7707759
Fix test suite.
eskimor Feb 22, 2021
e7623d4
Get rid of now redundant message types.
eskimor Feb 22, 2021
e8d7e44
WIP
eskimor Feb 22, 2021
5fb8418
Rob's review remarks.
eskimor Feb 22, 2021
41f60e4
Merge branch 'rk-drop-redundant-messages-2306' into rk-availability-d…
eskimor Feb 22, 2021
9780f3a
Fix test suite.
eskimor Feb 22, 2021
5bbcea4
core.relay_parent -> leaf for session request.
eskimor Feb 22, 2021
b792a89
Style fix.
eskimor Feb 22, 2021
75e6af8
Decrease request timeout.
eskimor Feb 22, 2021
ca7c182
Merge branch 'rk-metrics-2306' into rk-availability-distribution-2306
eskimor Feb 23, 2021
53fdeb3
Cleanup obsolete errors.
eskimor Feb 23, 2021
ce21a10
Metrics + don't fail on non fatal errors.
eskimor Feb 23, 2021
64d7246
requester.rs -> requester/mod.rs
eskimor Feb 23, 2021
2a9650f
Panic on invalid BadValidator report.
eskimor Feb 23, 2021
4d05d00
Fix indentation.
eskimor Feb 23, 2021
aadc80f
Use typed default timeout constant.
eskimor Feb 23, 2021
e45f61c
Make channel size 0, as each sender gets one slot anyways.
eskimor Feb 23, 2021
43dfd1c
Fix incorrect metrics initialization.
eskimor Feb 23, 2021
e262782
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 23, 2021
5353157
Fix build after merge.
eskimor Feb 23, 2021
ff94444
More fixes.
eskimor Feb 23, 2021
6b71e54
Hopefully valid metrics names.
eskimor Feb 24, 2021
190adaa
Better metrics names.
eskimor Feb 25, 2021
8901344
Some tests that already work.
eskimor Feb 25, 2021
1d29b5c
Slightly better docs.
eskimor Feb 25, 2021
83ff666
Some more tests.
eskimor Feb 26, 2021
e9210e5
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 26, 2021
a0e01ec
Fix network bridge test.
eskimor Feb 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ProtocolState -> Requester
Also make sure to not fetch our own chunk.
  • Loading branch information
eskimor committed Feb 18, 2021
commit 452b55f2cd7582f10ebaa25bf4dbd7818dbc0dae
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/network/availability-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ sp-application-crypto = { git = "https://github.com/paritytech/substrate", branc
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
thiserror = "1.0.23"
itertools = "0.10.0"
rand = "0.8.3"
lru = "0.6.5"

Expand Down
12 changes: 4 additions & 8 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ mod error;
pub use error::Error;
use error::Result;

/// The actual implementation of running availability distribution.
mod state;
/// State of a running availability-distribution subsystem.
use state::ProtocolState;

/// A task fetching a particular chunk.
mod fetch_task;
/// `Requester` taking care of requesting chunks for candidates pending availability.
mod requester;
use requester::Requester;

/// Cache for session information.
mod session_cache;
Expand Down Expand Up @@ -85,7 +81,7 @@ impl AvailabilityDistributionSubsystem {
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
let mut state = ProtocolState::new(self.keystore.clone()).fuse();
let mut state = Requester::new(self.keystore.clone()).fuse();
loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! `ProtocolState` representing a running availability distribution subsystem.
//!
//! We keep track of [`FetchTask`]s, which get created on [`ActiveLeavesUpdate`]s for each occupied
//! core in the leaves, if we have not yet created it before. We keep track for which
//! relay parents a `FetchTask` is considered live (corresponding slot is occupied with the
//! candidate fetched). Once there is no relay parent left for which that task is considered live,
//! it gets removed.
//!
//! We keep that task around as long as its corresponding candidate is considered pending
//! availability, even if we fetched our chunk already. This is so we won't fetch our piece again,
//! just because the candidate is still pending availability in the next block.
//!
//! We are also dependent on session information. We need to know which validators are in a
//! particular validator group, backing our candidate, so we can request our erasure chunk from
//! them.
//!
//! We want to randomize the list of validators in each group, so we get a
//! random order of validators to try to get the chunk from. This is to ensure load balancing, each
//! requesting validator should have a different order, thus trying different validators.
//!
//! But We would like to keep that randomized order around for an entire session, so our particular
//! validator will always request from the same validators, thus making sure it will find an open
//! network connection on each request.
//!
//! (TODO: What to do on session boundaries? Initial delay acceptable? Connect with some fake
//! request to future validators? Use a peer set after all and connect that to the future session?)
//!
//! So we need to keep some customized session info around, which seems to be a good idea for
//! performance reasons anyway. That's where `SessionCache` comes into play. It is used to keep
//! session information around as long as we need it. But how long do we need it? How do we manage
//! that cache? We can't rely on `ActiveLeavesUpdate`s heads alone, as we might get occupied slots
//! for heads we never got an `ActiveLeavesUpdate` from, therefore we don't populate the session
//! cache with sessions our leaves correspond to, but directly with the sessions of the relay
//! parents of our `CandidateDescriptor`s. So, its clear how to populate the cache, but when can we
//! get rid of cached session information? If for sure is safe to do when there is no
//! candidate/FetchTask around anymore which references it. Thus the cache simply consists of
//! `Weak` pointers to the actual session infos and the `FetchTask`s keep `Rc`s, therefore we know
//! exactly when we can get rid of a cache entry by means of the Weak pointer evaluating to `None`.
//! Requester takes care of requesting erasure chunks for candidates that are pending
//! availability.

use std::collections::{
hash_map::{Entry, HashMap},
Expand Down Expand Up @@ -83,21 +47,26 @@ use polkadot_subsystem::{
SubsystemContext, SubsystemError,
};

use super::{
error::recv_runtime,
fetch_task::{FetchTask, FetchTaskConfig, FromFetchTask},
session_cache::SessionCache,
Result, LOG_TARGET,
};
use super::{error::recv_runtime, session_cache::SessionCache, Result, LOG_TARGET};

/// A task fetching a particular chunk.
mod fetch_task;
use fetch_task::{FetchTask, FetchTaskConfig, FromFetchTask};

/// A running instance of this subsystem.
pub struct ProtocolState {
/// Requester takes care of requesting erasure chunks from backing groups and stores them in the
/// av store.
///
/// It implements a stream that needs to be advanced for it making progress.
pub struct Requester {
/// Candidates we need to fetch our chunk for.
///
/// We keep those around as long as a candidate is pending availability on some leaf, so we
/// won't fetch chunks multiple times.
fetches: HashMap<CandidateHash, FetchTask>,

/// Localized information about sessions we are currently interested in.
///
/// This is usually the current one and at session boundaries also the last one.
/// This is the current one and the last one.
session_cache: SessionCache,

/// Sender to be cloned for `FetchTask`s.
Expand All @@ -107,11 +76,15 @@ pub struct ProtocolState {
rx: mpsc::Receiver<FromFetchTask>,
}

impl ProtocolState {
impl Requester {
/// Create a new `Requester`.
///
/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress
/// by advancing the stream.
pub(crate) fn new(keystore: SyncCryptoStorePtr) -> Self {
// All we do is forwarding messages, no need to make this big.
let (tx, rx) = mpsc::channel(1);
ProtocolState {
Requester {
fetches: HashMap::new(),
session_cache: SessionCache::new(keystore),
tx,
Expand All @@ -120,7 +93,7 @@ impl ProtocolState {
}
/// Update heads that need availability distribution.
///
/// For all active heads we will be fetching our chunk for availabilty distribution.
/// For all active heads we will be fetching our chunks for availabilty distribution.
pub(crate) async fn update_fetching_heads<Context>(
&mut self,
ctx: &mut Context,
Expand Down Expand Up @@ -159,8 +132,6 @@ impl ProtocolState {

/// Stop requesting chunks for obsolete heads.
///
/// Returns relay_parents which became irrelevant for availability fetching (are not
/// referenced by any candidate anymore).
fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator<Item = Hash>) {
let obsolete_leaves: HashSet<_> = obsolete_leaves.into_iter().collect();
self.fetches.retain(|&c_hash, task| {
Expand Down Expand Up @@ -213,7 +184,7 @@ impl ProtocolState {
}
}

impl Stream for ProtocolState {
impl Stream for Requester {
type Item = Result<AllMessages>;

fn poll_next(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use polkadot_subsystem::{
Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};

use super::{
use crate::{
error::{Error, Result},
session_cache::{BadValidators, SessionInfo},
LOG_TARGET,
Expand All @@ -54,7 +54,7 @@ use super::{
/// This exists to separate preparation of a `FetchTask` from actual starting it, which is
/// beneficial as this allows as for taking session info by reference.
pub struct FetchTaskConfig {
prepared_running: RunningTask,
prepared_running: Option<RunningTask>,
live_in: HashSet<Hash>,
}

Expand Down Expand Up @@ -129,7 +129,17 @@ impl FetchTaskConfig {
sender: mpsc::Sender<FromFetchTask>,
session_info: &SessionInfo,
) -> Self {
let prepared_running = RunningTask {
let live_in = vec![leaf].into_iter().collect();

// Don't run tasks for our backing group:
if session_info.our_group == core.group_responsible {
return FetchTaskConfig {
live_in,
prepared_running: None,
};
}

let prepared_running = RunningTask {
session_index: session_info.session_index,
group_index: core.group_responsible,
group: session_info.validator_groups.get(core.group_responsible.0 as usize).expect("The responsible group of a candidate should be available in the corresponding session. qed.").clone(),
Expand All @@ -142,8 +152,8 @@ impl FetchTaskConfig {
sender,
};
FetchTaskConfig {
live_in: vec![leaf].into_iter().collect(),
prepared_running,
live_in,
prepared_running: Some(prepared_running),
}
}
}
Expand All @@ -158,14 +168,24 @@ impl FetchTask {
prepared_running,
live_in,
} = config;
let (handle, kill) = oneshot::channel();
ctx.spawn("chunk-fetcher", prepared_running.run(kill).boxed())
.await
.map_err(|e| Error::SpawnTask(e))?;
Ok(FetchTask {
live_in,
state: FetchedState::Started(handle),
})

if let Some(running) = prepared_running {
let (handle, kill) = oneshot::channel();

ctx.spawn("chunk-fetcher", running.run(kill).boxed())
.await
.map_err(|e| Error::SpawnTask(e))?;

Ok(FetchTask {
live_in,
state: FetchedState::Started(handle),
})
} else {
Ok(FetchTask {
live_in,
state: FetchedState::Canceled,
})
}
}

/// Add the given leaf to the relay parents which are making this task relevant.
Expand Down
32 changes: 24 additions & 8 deletions node/network/availability-distribution/src/session_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ pub struct SessionInfo {
/// Information about ourself:
pub our_index: ValidatorIndex,

//// Remember to which group we blong, so we won't start fetching chunks for candidates we
//// backed our selves.
// TODO: Implement this:
// pub our_group: GroupIndex,
/// Remember to which group we belong, so we won't start fetching chunks for candidates those
/// candidates (We should have them via PoV distribution).
pub our_group: GroupIndex,
}

/// Report of bad validators.
Expand Down Expand Up @@ -133,10 +132,10 @@ impl SessionCache {
ctx: &mut Context,
parent: Hash,
with_info: F,
) -> Result<Option<R>>
where
) -> Result<Option<R>>
where
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R
F: FnOnce(&SessionInfo) -> R,
{
let session_index = match self.session_index_cache.get(&parent) {
Some(index) => *index,
Expand All @@ -150,7 +149,7 @@ impl SessionCache {
};

if let Some(info) = self.session_info_cache.get(&session_index) {
return Ok(Some(with_info(info)))
return Ok(Some(with_info(info)));
}

if let Some(info) = self
Expand Down Expand Up @@ -205,6 +204,22 @@ impl SessionCache {
.ok_or(Error::NoSuchSession(session_index))?;

if let Some(our_index) = self.get_our_index(validators).await {
// Get our group index:
let our_group = validator_groups
.iter()
.enumerate()
.find_map(|(i, g)| {
g.iter().find_map(|v| {
if *v == our_index {
Some(GroupIndex(i as u32))
} else {
None
}
})
})
// TODO: Make sure this is correct and should be enforced:
.expect("Every validator should be in a validator group. qed.");

// Shuffle validators in groups:
let mut rng = thread_rng();
for g in validator_groups.iter_mut() {
Expand All @@ -228,6 +243,7 @@ impl SessionCache {
validator_groups,
our_index,
session_index,
our_group,
};
return Ok(Some(info));
}
Expand Down