Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
6f2e0e9
WIP
eskimor Feb 4, 2021
da850f7
availability distribution, still very wip.
eskimor Feb 9, 2021
fbf0ec1
Some docs on what I intend to do.
eskimor Feb 9, 2021
ac543c1
Checkpoint of session cache implementation
eskimor Feb 10, 2021
07f6bc3
More work, mostly on cache
eskimor Feb 11, 2021
ef84ea5
Only derive MallocSizeOf and Debug for std.
eskimor Feb 12, 2021
1e35804
availability-distribution: Cache feature complete.
eskimor Feb 12, 2021
d8fda81
Sketch out logic in `FetchTask` for actual fetching.
eskimor Feb 13, 2021
47036c9
Format cleanup.
eskimor Feb 13, 2021
4ad902f
More format fixes.
eskimor Feb 13, 2021
fee9476
Almost feature complete `fetch_task`.
eskimor Feb 15, 2021
b9aa906
Finish FetchTask so far.
eskimor Feb 15, 2021
a65562f
Directly use AuthorityDiscoveryId in protocol and cache.
eskimor Feb 16, 2021
4a43561
Resolve `AuthorityDiscoveryId` on sending requests.
eskimor Feb 16, 2021
6543b30
Rework fetch_task
eskimor Feb 17, 2021
256e559
From<u32> implementation for `ValidatorIndex`.
eskimor Feb 17, 2021
f8d5fef
Fixes and more integration work.
eskimor Feb 17, 2021
5e77fb4
Make session cache proper lru cache.
eskimor Feb 18, 2021
72704ee
Use proper lru cache.
eskimor Feb 18, 2021
60a2faf
Requester finished.
eskimor Feb 18, 2021
452b55f
ProtocolState -> Requester
eskimor Feb 18, 2021
2b9b983
Cleanup + fixes.
eskimor Feb 18, 2021
d683f10
Remove unused functions
eskimor Feb 18, 2021
d7a8a31
availability-distribution responding side.
eskimor Feb 19, 2021
3fed607
Cleanup + Fixes.
eskimor Feb 19, 2021
39d6bc2
More fixes.
eskimor Feb 19, 2021
49b1764
More fixes.
eskimor Feb 19, 2021
a141330
Some docs.
eskimor Feb 19, 2021
fad4586
Docs.
eskimor Feb 19, 2021
e617e91
Fix reporting of bad guys.
eskimor Feb 19, 2021
789a3e9
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 19, 2021
a4eef9b
Fix tests
eskimor Feb 19, 2021
ea5f6a4
Make all tests compile.
eskimor Feb 19, 2021
00e2f69
Fix test.
eskimor Feb 19, 2021
c837d98
Cleanup + get rid of some warnings.
eskimor Feb 19, 2021
09fa9fe
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 22, 2021
8945fbb
state -> requester
eskimor Feb 22, 2021
c9984fb
Mostly doc fixes.
eskimor Feb 22, 2021
7707759
Fix test suite.
eskimor Feb 22, 2021
e7623d4
Get rid of now redundant message types.
eskimor Feb 22, 2021
e8d7e44
WIP
eskimor Feb 22, 2021
5fb8418
Rob's review remarks.
eskimor Feb 22, 2021
41f60e4
Merge branch 'rk-drop-redundant-messages-2306' into rk-availability-d…
eskimor Feb 22, 2021
9780f3a
Fix test suite.
eskimor Feb 22, 2021
5bbcea4
core.relay_parent -> leaf for session request.
eskimor Feb 22, 2021
b792a89
Style fix.
eskimor Feb 22, 2021
75e6af8
Decrease request timeout.
eskimor Feb 22, 2021
ca7c182
Merge branch 'rk-metrics-2306' into rk-availability-distribution-2306
eskimor Feb 23, 2021
53fdeb3
Cleanup obsolete errors.
eskimor Feb 23, 2021
ce21a10
Metrics + don't fail on non fatal errors.
eskimor Feb 23, 2021
64d7246
requester.rs -> requester/mod.rs
eskimor Feb 23, 2021
2a9650f
Panic on invalid BadValidator report.
eskimor Feb 23, 2021
4d05d00
Fix indentation.
eskimor Feb 23, 2021
aadc80f
Use typed default timeout constant.
eskimor Feb 23, 2021
e45f61c
Make channel size 0, as each sender gets one slot anyways.
eskimor Feb 23, 2021
43dfd1c
Fix incorrect metrics initialization.
eskimor Feb 23, 2021
e262782
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 23, 2021
5353157
Fix build after merge.
eskimor Feb 23, 2021
ff94444
More fixes.
eskimor Feb 23, 2021
6b71e54
Hopefully valid metrics names.
eskimor Feb 24, 2021
190adaa
Better metrics names.
eskimor Feb 25, 2021
8901344
Some tests that already work.
eskimor Feb 25, 2021
1d29b5c
Slightly better docs.
eskimor Feb 25, 2021
83ff666
Some more tests.
eskimor Feb 26, 2021
e9210e5
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 26, 2021
a0e01ec
Fix network bridge test.
eskimor Feb 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/network/availability-distribution/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ struct MetricsInner {
}

impl Metrics {
/// Create new dummy metrics, not reporting anything.
pub fn new_dummy() -> Self {
Metrics(None)
}

/// Increment counter on fetched labels.
pub fn on_fetch(&self, label: &'static str) {
if let Some(metrics) = &self.0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ use crate::{
metrics::{Metrics, SUCCEEDED, FAILED},
};

#[cfg(test)]
mod tests;

/// Configuration for a `FetchTask`
///
/// This exists to separate preparation of a `FetchTask` from actual starting it, which is
Expand Down Expand Up @@ -355,7 +358,7 @@ impl RunningTask {
match branch_hash(&self.erasure_root, &chunk.proof, chunk.index.0 as usize) {
Ok(hash) => hash,
Err(e) => {
tracing::trace!(
tracing::warn!(
target: LOG_TARGET,
candidate_hash = ?self.request.candidate_hash,
origin = ?validator,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::sync::Arc;

use parity_scale_codec::Encode;

use futures::channel::{mpsc, oneshot};
use futures::{executor, Future, FutureExt, StreamExt, select};
use futures::task::{Poll, Context, noop_waker};

use polkadot_erasure_coding::{obtain_chunks_v1 as obtain_chunks, branches};
use sc_network as network;
use sp_keyring::Sr25519Keyring;

use polkadot_primitives::v1::{AvailableData, BlockData, CandidateHash, HeadData, PersistedValidationData, PoV, ValidatorIndex};
use polkadot_node_network_protocol::request_response::v1;
use polkadot_subsystem::messages::AllMessages;

use crate::metrics::Metrics;
use super::*;

#[test]
fn task_can_be_canceled() {
let (task, _rx) = get_test_running_task();
let (handle, kill) = oneshot::channel();
std::mem::drop(handle);
let running_task = task.run(kill);
futures::pin_mut!(running_task);
let waker = noop_waker();
let mut ctx = Context::from_waker(&waker);
assert!(running_task.poll(&mut ctx) == Poll::Ready(()), "Task is immediately finished");
}

/// Make sure task won't accept a chunk that has is invalid.
#[test]
fn task_does_not_accept_invalid_chunk() {
let (mut task, rx) = get_test_running_task();
let validators = vec![Sr25519Keyring::Alice.public().into()];
task.group = validators;
let test = TestRun {
chunk_responses: {
let mut m = HashMap::new();
m.insert(
Sr25519Keyring::Alice.public().into(),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: vec![1,2,3],
proof: vec![vec![9,8,2], vec![2,3,4]],
}
)
);
m
},
valid_chunks: HashSet::new(),
};
test.run(task, rx);
}

#[test]
fn task_stores_valid_chunk() {
let (mut task, rx) = get_test_running_task();
let (root_hash, chunk) = get_valid_chunk_data();
task.erasure_root = root_hash;
task.request.index = chunk.index;

let validators = vec![Sr25519Keyring::Alice.public().into()];
task.group = validators;

let test = TestRun {
chunk_responses: {
let mut m = HashMap::new();
m.insert(
Sr25519Keyring::Alice.public().into(),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: chunk.chunk.clone(),
proof: chunk.proof,
}
)
);
m
},
valid_chunks: {
let mut s = HashSet::new();
s.insert(chunk.chunk);
s
},
};
test.run(task, rx);
}

#[test]
fn task_does_not_accept_wrongly_indexed_chunk() {
let (mut task, rx) = get_test_running_task();
let (root_hash, chunk) = get_valid_chunk_data();
task.erasure_root = root_hash;
task.request.index = ValidatorIndex(chunk.index.0+1);

let validators = vec![Sr25519Keyring::Alice.public().into()];
task.group = validators;

let test = TestRun {
chunk_responses: {
let mut m = HashMap::new();
m.insert(
Sr25519Keyring::Alice.public().into(),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: chunk.chunk.clone(),
proof: chunk.proof,
}
)
);
m
},
valid_chunks: HashSet::new(),
};
test.run(task, rx);
}

/// Task stores chunk, if there is at least one validator having a valid chunk.
#[test]
fn task_stores_valid_chunk_if_there_is_one() {
let (mut task, rx) = get_test_running_task();
let (root_hash, chunk) = get_valid_chunk_data();
task.erasure_root = root_hash;
task.request.index = chunk.index;

let validators = [
// Only Alice has valid chunk - should succeed, even though she is tried last.
Sr25519Keyring::Alice,
Sr25519Keyring::Bob, Sr25519Keyring::Charlie,
Sr25519Keyring::Dave, Sr25519Keyring::Eve,
]
.iter().map(|v| v.public().into()).collect::<Vec<_>>();
task.group = validators;

let test = TestRun {
chunk_responses: {
let mut m = HashMap::new();
m.insert(
Sr25519Keyring::Alice.public().into(),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: chunk.chunk.clone(),
proof: chunk.proof,
}
)
);
m.insert(
Sr25519Keyring::Bob.public().into(),
AvailabilityFetchingResponse::NoSuchChunk
);
m.insert(
Sr25519Keyring::Charlie.public().into(),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: vec![1,2,3],
proof: vec![vec![9,8,2], vec![2,3,4]],
}
)
);

m
},
valid_chunks: {
let mut s = HashSet::new();
s.insert(chunk.chunk);
s
},
};
test.run(task, rx);
}

struct TestRun {
/// Response to deliver for a given validator index.
/// None means, answer with NetworkError.
chunk_responses: HashMap<AuthorityDiscoveryId, AvailabilityFetchingResponse>,
/// Set of chunks that should be considered valid:
valid_chunks: HashSet<Vec<u8>>,
}


impl TestRun {
fn run(self, task: RunningTask, rx: mpsc::Receiver<FromFetchTask>) {
sp_tracing::try_init_simple();
let mut rx = rx.fuse();
let task = task.run_inner().fuse();
futures::pin_mut!(task);
executor::block_on(async {
let mut end_ok = false;
loop {
let msg = select!(
from_task = rx.next() => {
match from_task {
Some(msg) => msg,
None => break,
}
},
() = task =>
break,
);
match msg {
FromFetchTask::Concluded(_) => break,
FromFetchTask::Message(msg) =>
end_ok = self.handle_message(msg).await,
}
}
if !end_ok {
panic!("Task ended prematurely (failed to store valid chunk)!");
}
});
}

/// Returns true, if after processing of the given message it would be ok for the stream to
/// end.
async fn handle_message(&self, msg: AllMessages) -> bool {
match msg {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs)) => {
let mut valid_responses = 0;
for req in reqs {
let req = match req {
Requests::AvailabilityFetching(req) => req,
};
let response = self.chunk_responses.get(&req.peer)
.ok_or(network::RequestFailure::Refused);

if let Ok(AvailabilityFetchingResponse::Chunk(resp)) = &response {
if self.valid_chunks.contains(&resp.chunk) {
valid_responses += 1;
}
}
req.pending_response.send(response.map(Encode::encode))
.expect("Sending response should succeed");
}
return (valid_responses == 0) && self.valid_chunks.is_empty()
}
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk { chunk, tx, .. }
) => {
assert!(self.valid_chunks.contains(&chunk.chunk));
tx.send(Ok(())).expect("Answering fetching task should work");
return true
}
_ => {
tracing::debug!(target: LOG_TARGET, "Unexpected message");
return false
}
}
}
}

/// Get a `RunningTask` filled with dummy values.
fn get_test_running_task() -> (RunningTask, mpsc::Receiver<FromFetchTask>) {
let (tx,rx) = mpsc::channel(0);

(
RunningTask {
session_index: 0,
group_index: GroupIndex(0),
group: Vec::new(),
request: AvailabilityFetchingRequest {
candidate_hash: CandidateHash([43u8;32].into()),
index: ValidatorIndex(0),
},
erasure_root: Hash::repeat_byte(99),
relay_parent: Hash::repeat_byte(71),
sender: tx,
metrics: Metrics::new_dummy(),
},
rx
)
}

fn get_valid_chunk_data() -> (Hash, ErasureChunk) {
let fake_validator_count = 10;
let persisted = PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
relay_parent_number: Default::default(),
max_pov_size: 1024,
relay_parent_storage_root: Default::default(),
};
let pov_block = PoV {
block_data: BlockData(vec![45, 46, 47]),
};
let available_data = AvailableData {
validation_data: persisted, pov: Arc::new(pov_block),
};
let chunks = obtain_chunks(fake_validator_count, &available_data).unwrap();
let branches = branches(chunks.as_ref());
let root = branches.root();
let chunk = branches.enumerate()
.map(|(index, (proof, chunk))| ErasureChunk {
chunk: chunk.to_vec(),
index: ValidatorIndex(index as _),
proof,
})
.next().expect("There really should be 10 chunks.");
(root, chunk)
}
Loading