Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
6f2e0e9
WIP
eskimor Feb 4, 2021
da850f7
availability distribution, still very wip.
eskimor Feb 9, 2021
fbf0ec1
Some docs on what I intend to do.
eskimor Feb 9, 2021
ac543c1
Checkpoint of session cache implementation
eskimor Feb 10, 2021
07f6bc3
More work, mostly on cache
eskimor Feb 11, 2021
ef84ea5
Only derive MallocSizeOf and Debug for std.
eskimor Feb 12, 2021
1e35804
availability-distribution: Cache feature complete.
eskimor Feb 12, 2021
d8fda81
Sketch out logic in `FetchTask` for actual fetching.
eskimor Feb 13, 2021
47036c9
Format cleanup.
eskimor Feb 13, 2021
4ad902f
More format fixes.
eskimor Feb 13, 2021
fee9476
Almost feature complete `fetch_task`.
eskimor Feb 15, 2021
b9aa906
Finish FetchTask so far.
eskimor Feb 15, 2021
a65562f
Directly use AuthorityDiscoveryId in protocol and cache.
eskimor Feb 16, 2021
4a43561
Resolve `AuthorityDiscoveryId` on sending requests.
eskimor Feb 16, 2021
6543b30
Rework fetch_task
eskimor Feb 17, 2021
256e559
From<u32> implementation for `ValidatorIndex`.
eskimor Feb 17, 2021
f8d5fef
Fixes and more integration work.
eskimor Feb 17, 2021
5e77fb4
Make session cache proper lru cache.
eskimor Feb 18, 2021
72704ee
Use proper lru cache.
eskimor Feb 18, 2021
60a2faf
Requester finished.
eskimor Feb 18, 2021
452b55f
ProtocolState -> Requester
eskimor Feb 18, 2021
2b9b983
Cleanup + fixes.
eskimor Feb 18, 2021
d683f10
Remove unused functions
eskimor Feb 18, 2021
d7a8a31
availability-distribution responding side.
eskimor Feb 19, 2021
3fed607
Cleanup + Fixes.
eskimor Feb 19, 2021
39d6bc2
More fixes.
eskimor Feb 19, 2021
49b1764
More fixes.
eskimor Feb 19, 2021
a141330
Some docs.
eskimor Feb 19, 2021
fad4586
Docs.
eskimor Feb 19, 2021
e617e91
Fix reporting of bad guys.
eskimor Feb 19, 2021
789a3e9
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 19, 2021
a4eef9b
Fix tests
eskimor Feb 19, 2021
ea5f6a4
Make all tests compile.
eskimor Feb 19, 2021
00e2f69
Fix test.
eskimor Feb 19, 2021
c837d98
Cleanup + get rid of some warnings.
eskimor Feb 19, 2021
09fa9fe
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 22, 2021
8945fbb
state -> requester
eskimor Feb 22, 2021
c9984fb
Mostly doc fixes.
eskimor Feb 22, 2021
7707759
Fix test suite.
eskimor Feb 22, 2021
e7623d4
Get rid of now redundant message types.
eskimor Feb 22, 2021
e8d7e44
WIP
eskimor Feb 22, 2021
5fb8418
Rob's review remarks.
eskimor Feb 22, 2021
41f60e4
Merge branch 'rk-drop-redundant-messages-2306' into rk-availability-d…
eskimor Feb 22, 2021
9780f3a
Fix test suite.
eskimor Feb 22, 2021
5bbcea4
core.relay_parent -> leaf for session request.
eskimor Feb 22, 2021
b792a89
Style fix.
eskimor Feb 22, 2021
75e6af8
Decrease request timeout.
eskimor Feb 22, 2021
ca7c182
Merge branch 'rk-metrics-2306' into rk-availability-distribution-2306
eskimor Feb 23, 2021
53fdeb3
Cleanup obsolete errors.
eskimor Feb 23, 2021
ce21a10
Metrics + don't fail on non fatal errors.
eskimor Feb 23, 2021
64d7246
requester.rs -> requester/mod.rs
eskimor Feb 23, 2021
2a9650f
Panic on invalid BadValidator report.
eskimor Feb 23, 2021
4d05d00
Fix indentation.
eskimor Feb 23, 2021
aadc80f
Use typed default timeout constant.
eskimor Feb 23, 2021
e45f61c
Make channel size 0, as each sender gets one slot anyways.
eskimor Feb 23, 2021
43dfd1c
Fix incorrect metrics initialization.
eskimor Feb 23, 2021
e262782
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 23, 2021
5353157
Fix build after merge.
eskimor Feb 23, 2021
ff94444
More fixes.
eskimor Feb 23, 2021
6b71e54
Hopefully valid metrics names.
eskimor Feb 24, 2021
190adaa
Better metrics names.
eskimor Feb 25, 2021
8901344
Some tests that already work.
eskimor Feb 25, 2021
1d29b5c
Slightly better docs.
eskimor Feb 25, 2021
83ff666
Some more tests.
eskimor Feb 26, 2021
e9210e5
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 26, 2021
a0e01ec
Fix network bridge test.
eskimor Feb 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Mostly doc fixes.
  • Loading branch information
eskimor committed Feb 22, 2021
commit c9984fbd6638cc5cfd87077a5db7072e3baee40a
8 changes: 5 additions & 3 deletions node/network/availability-distribution/src/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl Requester {
/// Stop requesting chunks for obsolete heads.
///
fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator<Item = Hash>) {
let obsolete_leaves: HashSet<_> = obsolete_leaves.into_iter().collect();
let obsolete_leaves: HashSet<_> = obsolete_leaves.collect();
self.fetches.retain(|_, task| {
task.remove_leaves(&obsolete_leaves);
task.is_live()
Expand Down Expand Up @@ -155,6 +155,7 @@ impl Requester {
}
Entry::Vacant(e) => {
let tx = self.tx.clone();

let task_cfg = self
.session_cache
.with_session_info(
Expand All @@ -163,6 +164,7 @@ impl Requester {
|info| FetchTaskConfig::new(leaf, &core, tx, info),
)
.await?;

if let Some(task_cfg) = task_cfg {
e.insert(FetchTask::start(task_cfg, ctx).await?);
}
Expand Down Expand Up @@ -200,8 +202,8 @@ impl Stream for Requester {
}
}

///// Query all hashes and descriptors of candidates pending availability at a particular block.
// #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
/// Query all hashes and descriptors of candidates pending availability at a particular block.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_occupied_cores<Context>(
ctx: &mut Context,
relay_parent: Hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct FetchTaskConfig {
live_in: HashSet<Hash>,
}

/// Information about a task fetching an erasure chunk.
pub struct FetchTask {
/// For what relay parents this task is relevant.
///
Expand Down Expand Up @@ -90,7 +91,9 @@ struct RunningTask {
/// For what session we have been spawned.
session_index: SessionIndex,

/// Index of validator group.
/// Index of validator group to fetch the chunk from.
///
/// Needef for reporting bad validators.
group_index: GroupIndex,

/// Validators to request the chunk from.
Expand Down Expand Up @@ -134,7 +137,9 @@ impl FetchTaskConfig {
let prepared_running = RunningTask {
session_index: session_info.session_index,
group_index: core.group_responsible,
group: session_info.validator_groups.get(core.group_responsible.0 as usize).expect("The responsible group of a candidate should be available in the corresponding session. qed.").clone(),
group: session_info.validator_groups.get(core.group_responsible.0 as usize)
.expect("The responsible group of a candidate should be available in the corresponding session. qed.")
.clone(),
request: AvailabilityFetchingRequest {
candidate_hash: core.candidate_hash,
index: session_info.our_index,
Expand Down Expand Up @@ -184,7 +189,7 @@ impl FetchTask {

/// Add the given leaf to the relay parents which are making this task relevant.
///
/// This is for book keeping, so we know we are already fetching a chunk.
/// This is for book keeping, so we know we are already fetching a given chunk.
pub fn add_leaf(&mut self, leaf: Hash) {
self.live_in.insert(leaf);
}
Expand Down
1 change: 1 addition & 0 deletions node/network/availability-distribution/src/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ where
req.send_response(response).map_err(|_| Error::SendResponse)
}

/// Query chunk from the availability store.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_chunk<Context>(
ctx: &mut Context,
Expand Down
29 changes: 17 additions & 12 deletions node/network/availability-distribution/src/session_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use super::{
/// Caching of session info as needed by availability distribution.
///
/// It should be ensured that a cached session stays live in the cache as long as we might need it.
/// A warning will be logged, if an already dead entry gets fetched.
pub struct SessionCache {
/// Get the session index for a given relay parent.
///
Expand All @@ -52,7 +51,8 @@ pub struct SessionCache {
///
/// Note: Performance of fetching is really secondary here, but we need to ensure we are going
/// to get any existing cache entry, before fetching new information, as we should not mess up
/// the order of validators. (We want live TCP connections wherever possible.)
/// the order of validators in `SessionInfo::validator_groups`. (We want live TCP connections
/// wherever possible.)
session_info_cache: LruCache<SessionIndex, SessionInfo>,

/// Key store for determining whether we are a validator and what `ValidatorIndex` we have.
Expand All @@ -64,32 +64,38 @@ pub struct SessionCache {
pub struct SessionInfo {
/// The index of this session.
pub session_index: SessionIndex,

/// Validator groups of the current session.
///
/// Each group's order is randomized. This way we achieve load balancing when requesting
/// chunks, as the validators in a group will be tried in that randomized order. Each node
/// should arrive at a different order, therefore we distribute the load.
/// should arrive at a different order, therefore we distribute the load on individual
/// validators.
pub validator_groups: Vec<Vec<AuthorityDiscoveryId>>,

/// Information about ourself:
pub our_index: ValidatorIndex,

/// Remember to which group we belong, so we won't start fetching chunks for candidates those
/// candidates (We should have them via PoV distribution).
/// Remember to which group we belong, so we won't start fetching chunks for candidates with
/// our group being responsible. (We should have that chunk already.)
pub our_group: GroupIndex,
}

/// Report of bad validators.
///
/// Fetching tasks will report back validators that did not respond as expected, so we can re-order
/// them.
pub struct BadValidators {
/// The session index that was used.
pub session_index: SessionIndex,
/// The group the not properly responding validators are.
/// The group, the not properly responding validators belong to.
pub group_index: GroupIndex,
/// The indeces of the bad validators.
/// The list of bad validators.
pub bad_validators: Vec<AuthorityDiscoveryId>,
}

impl SessionCache {
/// Create a new `SessionCache`.
pub fn new(keystore: SyncCryptoStorePtr) -> Self {
SessionCache {
// 5 relatively conservative, 1 to 2 should suffice:
Expand All @@ -104,7 +110,7 @@ impl SessionCache {
///
/// If this node is not a validator, the function will return `None`.
///
/// Use this function over `fetch_session_info` if all you need is a reference to
/// Use this function over any `fetch_session_info` if all you need is a reference to
/// `SessionInfo`, as it avoids an expensive clone.
pub async fn with_session_info<Context, F, R>(
&mut self,
Expand Down Expand Up @@ -170,7 +176,7 @@ impl SessionCache {
/// Query needed information from runtime.
///
/// We need to pass in the relay parent for our call to `request_session_info_ctx`. We should
/// actually don't need that, I suppose it is used for internal caching based on relay parents,
/// actually don't need that: I suppose it is used for internal caching based on relay parents,
/// which we don't use here. It should not do any harm though.
async fn query_info_from_runtime<Context>(
&self,
Expand Down Expand Up @@ -204,7 +210,6 @@ impl SessionCache {
}
})
})
// TODO: Make sure this is correct and should be enforced:
.expect("Every validator should be in a validator group. qed.");

// Shuffle validators in groups:
Expand Down Expand Up @@ -237,9 +242,9 @@ impl SessionCache {
return Ok(None);
}

/// Get our validator id and the validators in the current session.
/// Get our `ValidatorIndex`.
///
/// Returns: Ok(None) if we are not a validator.
/// Returns: None if we are not a validator.
async fn get_our_index(&self, validators: Vec<ValidatorId>) -> Option<ValidatorIndex> {
for (i, v) in validators.iter().enumerate() {
if CryptoStore::has_keys(&*self.keystore, &[(v.to_raw_vec(), ValidatorId::ID)])
Expand Down