Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
6f2e0e9
WIP
eskimor Feb 4, 2021
da850f7
availability distribution, still very wip.
eskimor Feb 9, 2021
fbf0ec1
Some docs on what I intend to do.
eskimor Feb 9, 2021
ac543c1
Checkpoint of session cache implementation
eskimor Feb 10, 2021
07f6bc3
More work, mostly on cache
eskimor Feb 11, 2021
ef84ea5
Only derive MallocSizeOf and Debug for std.
eskimor Feb 12, 2021
1e35804
availability-distribution: Cache feature complete.
eskimor Feb 12, 2021
d8fda81
Sketch out logic in `FetchTask` for actual fetching.
eskimor Feb 13, 2021
47036c9
Format cleanup.
eskimor Feb 13, 2021
4ad902f
More format fixes.
eskimor Feb 13, 2021
fee9476
Almost feature complete `fetch_task`.
eskimor Feb 15, 2021
b9aa906
Finish FetchTask so far.
eskimor Feb 15, 2021
a65562f
Directly use AuthorityDiscoveryId in protocol and cache.
eskimor Feb 16, 2021
4a43561
Resolve `AuthorityDiscoveryId` on sending requests.
eskimor Feb 16, 2021
6543b30
Rework fetch_task
eskimor Feb 17, 2021
256e559
From<u32> implementation for `ValidatorIndex`.
eskimor Feb 17, 2021
f8d5fef
Fixes and more integration work.
eskimor Feb 17, 2021
5e77fb4
Make session cache proper lru cache.
eskimor Feb 18, 2021
72704ee
Use proper lru cache.
eskimor Feb 18, 2021
60a2faf
Requester finished.
eskimor Feb 18, 2021
452b55f
ProtocolState -> Requester
eskimor Feb 18, 2021
2b9b983
Cleanup + fixes.
eskimor Feb 18, 2021
d683f10
Remove unused functions
eskimor Feb 18, 2021
d7a8a31
availability-distribution responding side.
eskimor Feb 19, 2021
3fed607
Cleanup + Fixes.
eskimor Feb 19, 2021
39d6bc2
More fixes.
eskimor Feb 19, 2021
49b1764
More fixes.
eskimor Feb 19, 2021
a141330
Some docs.
eskimor Feb 19, 2021
fad4586
Docs.
eskimor Feb 19, 2021
e617e91
Fix reporting of bad guys.
eskimor Feb 19, 2021
789a3e9
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 19, 2021
a4eef9b
Fix tests
eskimor Feb 19, 2021
ea5f6a4
Make all tests compile.
eskimor Feb 19, 2021
00e2f69
Fix test.
eskimor Feb 19, 2021
c837d98
Cleanup + get rid of some warnings.
eskimor Feb 19, 2021
09fa9fe
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 22, 2021
8945fbb
state -> requester
eskimor Feb 22, 2021
c9984fb
Mostly doc fixes.
eskimor Feb 22, 2021
7707759
Fix test suite.
eskimor Feb 22, 2021
e7623d4
Get rid of now redundant message types.
eskimor Feb 22, 2021
e8d7e44
WIP
eskimor Feb 22, 2021
5fb8418
Rob's review remarks.
eskimor Feb 22, 2021
41f60e4
Merge branch 'rk-drop-redundant-messages-2306' into rk-availability-d…
eskimor Feb 22, 2021
9780f3a
Fix test suite.
eskimor Feb 22, 2021
5bbcea4
core.relay_parent -> leaf for session request.
eskimor Feb 22, 2021
b792a89
Style fix.
eskimor Feb 22, 2021
75e6af8
Decrease request timeout.
eskimor Feb 22, 2021
ca7c182
Merge branch 'rk-metrics-2306' into rk-availability-distribution-2306
eskimor Feb 23, 2021
53fdeb3
Cleanup obsolete errors.
eskimor Feb 23, 2021
ce21a10
Metrics + don't fail on non fatal errors.
eskimor Feb 23, 2021
64d7246
requester.rs -> requester/mod.rs
eskimor Feb 23, 2021
2a9650f
Panic on invalid BadValidator report.
eskimor Feb 23, 2021
4d05d00
Fix indentation.
eskimor Feb 23, 2021
aadc80f
Use typed default timeout constant.
eskimor Feb 23, 2021
e45f61c
Make channel size 0, as each sender gets one slot anyways.
eskimor Feb 23, 2021
43dfd1c
Fix incorrect metrics initialization.
eskimor Feb 23, 2021
e262782
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 23, 2021
5353157
Fix build after merge.
eskimor Feb 23, 2021
ff94444
More fixes.
eskimor Feb 23, 2021
6b71e54
Hopefully valid metrics names.
eskimor Feb 24, 2021
190adaa
Better metrics names.
eskimor Feb 25, 2021
8901344
Some tests that already work.
eskimor Feb 25, 2021
1d29b5c
Slightly better docs.
eskimor Feb 25, 2021
83ff666
Some more tests.
eskimor Feb 26, 2021
e9210e5
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 26, 2021
a0e01ec
Fix network bridge test.
eskimor Feb 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Almost feature complete fetch_task.
Missing:

- Check for cancel
- Actual querying of peer ids.
  • Loading branch information
eskimor committed Feb 15, 2021
commit fee9476e5665887b89e02672aae5da40a4eede6b
1 change: 1 addition & 0 deletions node/network/availability-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master",
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
assert_matches = "1.4.0"
maplit = "1.0"
146 changes: 109 additions & 37 deletions node/network/availability-distribution/src/fetch_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;
use std::pin::Pin;
use std::rc::Rc;

use futures::channel::mpsc;
use futures::channel::oneshot;
use v1::AvailabilityFetchingResponse;

use super::{session_cache::SessionInfo, LOG_TARGET};
use polkadot_node_network_protocol::request_response::v1;
use sc_network::PeerId;

use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::request_response::{
request::{OutgoingRequest, RequestError, Requests},
v1::{AvailabilityFetchingRequest, AvailabilityFetchingResponse},
};
use polkadot_primitives::v1::{
BlakeTwo256, CandidateDescriptor, CandidateHash, CoreState, ErasureChunk, Hash, HashT,
OccupiedCore, SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID,
BlakeTwo256, CandidateDescriptor, CandidateHash, CoreState, ErasureChunk, GroupIndex, Hash,
HashT, OccupiedCore, SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID,
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
Expand All @@ -36,6 +42,8 @@ use polkadot_subsystem::{
Subsystem, SubsystemContext, SubsystemError,
};

use super::{session_cache::SessionInfo, LOG_TARGET};

pub struct FetchTask {
/// For what relay parents this task is relevant.
///
Expand All @@ -54,12 +62,10 @@ pub struct FetchTask {

/// State of a particular candidate chunk fetching process.
enum FetchedState {
/// Chunk is currently being fetched.
/// Chunk fetch has started.
///
/// Once the contained `Sender` is dropped, any still running task will be canceled.
Fetching(oneshot::Sender<()>),
/// Chunk has already been fetched successfully.
Fetched,
Started(oneshot::Sender<()>),
/// All relevant live_in have been removed, before we were able to get our chunk.
Canceled,
}
Expand Down Expand Up @@ -98,14 +104,17 @@ struct RunningTask {
group: Vec<ValidatorIndex>,

/// The request to send.
request: v1::AvailabilityFetchingRequest,
request: AvailabilityFetchingRequest,

/// Root hash, for verifying the chunks validity.
erasure_root: Hash,

/// Relay parent of the candidate to fetch.
relay_parent: Hash,

/// Hash of the candidate we are fetching our chunk for.
candidate_hash: CandidateHash,

/// Sender for communicating with other subsystems and reporting results.
sender: mpsc::Sender<FromFetchTask>,

Expand All @@ -130,20 +139,21 @@ impl FetchTask {
session_index: session_info.session_index,
group_index: core.group_responsible,
group: session_info.validator_groups.get(core.group_responsible).expect("The responsible group of a candidate should be available in the corresponding session. qed.").clone(),
request: v1::AvailabilityFetchingRequest {
request: AvailabilityFetchingRequest {
candidate_hash: core.candidate_hash,
index: session_info.our_index,
},
erasure_root: core.candidate_descriptor.erasure_root,
relay_parent: core.candidate_descriptor.relay_parent,
candidate_hash: core.candidate_hash,
sender,
receiver,
};
ctx.spawn("chunk-fetcher", Pin::new(Box::new(running.run())))
.await?;
FetchTask {
live_in: HashSet::from(leaf),
state: FetchedState::Fetching(handle),
state: FetchedState::Started(handle),
session: session_info,
}
}
Expand All @@ -165,11 +175,11 @@ impl FetchTask {

/// Whether or not this task can be considered finished.
///
/// That is, it is either canceled or succeeded fetching the chunk.
/// That is, it is either canceled, succeeded or failed.
pub fn is_finished(&self) -> bool {
match self.state {
FetchedState::Fetched | FetchedState::Canceled => true,
FetchedState::Fetching => false,
FetchedState::Canceled => true,
FetchedState::Started(sender) => sender.is_canceled(),
}
}

Expand All @@ -192,46 +202,53 @@ enum TaskError {
type Result<T> = std::result::Result<T, TaskError>;

impl RunningTask {
/// Fetch and store chunk.
///
/// Try validators in backing group in order.
async fn run(self) {
let bad_validators = Vec::new();
// Try validators in order:
for index in self.group {
// Send request:
let resp = match do_request(index).await {
let peer_id = self.get_peer_id(index)?;
let resp = match self.do_request(peer_id).await {
Ok(resp) => resp,
Err(TaskError::ShuttingDown) => {
tracking::info("Node seems to be shutting down, canceling fetch task");
tracing::info!(
target: LOG_TARGET,
"Node seems to be shutting down, canceling fetch task"
);
return;
}
Err(TaskError::PeerError) => {
bad_validators.push(index);
continue;
}
};
let chunk = match resp {
AvailabilityFetchingResponse::Chunk(chunk) => chunk,
};

// Data valid?
if !self.validate_response(&resp) {
// Data genuine?
if !self.validate_chunk(peer_id, &chunk) {
bad_validators.push(index);
continue;
}

// Ok, let's store it and be happy.
store_response(resp);
// Ok, let's store it and be happy:
self.store_chunk(chunk).await;
break;
}
conclude(bad_validators);
self.conclude(bad_validators);
}

/// Do request and return response, if successful.
///
/// Will also report peer if not successful.
async fn do_request(
&self,
validator: ValidatorIndex,
) -> std::result::Result<v1::AvailabilityFetchingResponse, TaskError> {
let peer = self.get_peer_id(index)?;
let (full_request, response_recv) =
Requests::AvailabilityFetching(OutgoingRequest::new(peer, self.request));
peer: PeerId,
) -> std::result::Result<AvailabilityFetchingResponse, TaskError> {
let (full_request, response_recv) = OutgoingRequest::new(peer, self.request);
let requests = Requests::AvailabilityFetching(Vec::from(full_request));

self.sender
.send(FromFetchTask::Message(
Expand All @@ -240,20 +257,75 @@ impl RunningTask {
.await
.map_err(|| TaskError::ShuttingDown)?;

// TODO: Also handle receiver cancel.
match response_recv.await {
Ok(resp) => Some(resp),
Err(RequestError::InvalidResponse(err)) => {}
Err(RequestError::NetworkError(err)) => {}
Err(RequestError::Canceled(err)) => {}
Ok(resp) => Ok(resp),
Err(RequestError::InvalidResponse(err)) => {
tracing::warn!(
target: LOG_TARGET,
"Peer sent us invalid erasure chunk data"
);
Err(TaskError::PeerError)
}
Err(RequestError::NetworkError(err)) => {
tracing::warn!(
target: LOG_TARGET,
"Some network error occurred when fetching erasure chunk"
);
Err(TaskError::PeerError)
}
Err(RequestError::Canceled(err)) => {
tracing::warn!(target: LOG_TARGET, "Erasure chunk request got canceled");
Err(TaskError::PeerError)
}
}
}

fn validate_chunk(&self, peer_id: &PeerId, chunk: &ErasureChunk) -> bool {
let anticipated_hash =
match branch_hash(&self.erasure_root, &chunk.proof, chunk.index as usize) {
Ok(hash) => hash,
Err(e) => {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?self.candidate_hash,
origin = ?peer_id,
error = ?e,
"Failed to calculate chunk merkle proof",
);
return false;
}
};
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
tracing::warn!(target: LOG_TARGET, origin = ?peer_id, "Received chunk does not match merkle tree");
return false;
}
Err(PeerError)
true
}

fn get_peer_id(index: ValidatorIndex) -> Result<PeerId> {
panic!("TO BE IMPLEMENTED");
}

/// Store given chunk and log any error.
async fn store_chunk(&self, chunk: ErasureChunk) {
let (tx, rx) = oneshot::channel();
self.sender
.send(FromFetchTask::Message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash: self.candidate_hash,
relay_parent: self.relay_parent,
chunk,
tx,
},
)))
.await;

if let Err(oneshot::Canceled) = rx.await {
tracing::error!(target: LOG_TARGET, "Storing erasure chunk failed");
}
}

/// Tell subsystem we are done.
async fn conclude(&self, bad_validators: Vec<ValidatorIndex>) {
let payload = if bad_validators.is_empty() {
Expand All @@ -267,8 +339,8 @@ impl RunningTask {
};
if let Err(err) = self.sender.send(FromFetchTask::Concluded(payload)).await {
tracing::warn!(
LOG_TARGET,
err: ?err,
target: LOG_TARGET,
err= ?err,
"Sending concluded message for task failed"
);
}
Expand Down