Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
6f2e0e9
WIP
eskimor Feb 4, 2021
da850f7
availability distribution, still very wip.
eskimor Feb 9, 2021
fbf0ec1
Some docs on what I intend to do.
eskimor Feb 9, 2021
ac543c1
Checkpoint of session cache implementation
eskimor Feb 10, 2021
07f6bc3
More work, mostly on cache
eskimor Feb 11, 2021
ef84ea5
Only derive MallocSizeOf and Debug for std.
eskimor Feb 12, 2021
1e35804
availability-distribution: Cache feature complete.
eskimor Feb 12, 2021
d8fda81
Sketch out logic in `FetchTask` for actual fetching.
eskimor Feb 13, 2021
47036c9
Format cleanup.
eskimor Feb 13, 2021
4ad902f
More format fixes.
eskimor Feb 13, 2021
fee9476
Almost feature complete `fetch_task`.
eskimor Feb 15, 2021
b9aa906
Finish FetchTask so far.
eskimor Feb 15, 2021
a65562f
Directly use AuthorityDiscoveryId in protocol and cache.
eskimor Feb 16, 2021
4a43561
Resolve `AuthorityDiscoveryId` on sending requests.
eskimor Feb 16, 2021
6543b30
Rework fetch_task
eskimor Feb 17, 2021
256e559
From<u32> implementation for `ValidatorIndex`.
eskimor Feb 17, 2021
f8d5fef
Fixes and more integration work.
eskimor Feb 17, 2021
5e77fb4
Make session cache proper lru cache.
eskimor Feb 18, 2021
72704ee
Use proper lru cache.
eskimor Feb 18, 2021
60a2faf
Requester finished.
eskimor Feb 18, 2021
452b55f
ProtocolState -> Requester
eskimor Feb 18, 2021
2b9b983
Cleanup + fixes.
eskimor Feb 18, 2021
d683f10
Remove unused functions
eskimor Feb 18, 2021
d7a8a31
availability-distribution responding side.
eskimor Feb 19, 2021
3fed607
Cleanup + Fixes.
eskimor Feb 19, 2021
39d6bc2
More fixes.
eskimor Feb 19, 2021
49b1764
More fixes.
eskimor Feb 19, 2021
a141330
Some docs.
eskimor Feb 19, 2021
fad4586
Docs.
eskimor Feb 19, 2021
e617e91
Fix reporting of bad guys.
eskimor Feb 19, 2021
789a3e9
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 19, 2021
a4eef9b
Fix tests
eskimor Feb 19, 2021
ea5f6a4
Make all tests compile.
eskimor Feb 19, 2021
00e2f69
Fix test.
eskimor Feb 19, 2021
c837d98
Cleanup + get rid of some warnings.
eskimor Feb 19, 2021
09fa9fe
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 22, 2021
8945fbb
state -> requester
eskimor Feb 22, 2021
c9984fb
Mostly doc fixes.
eskimor Feb 22, 2021
7707759
Fix test suite.
eskimor Feb 22, 2021
e7623d4
Get rid of now redundant message types.
eskimor Feb 22, 2021
e8d7e44
WIP
eskimor Feb 22, 2021
5fb8418
Rob's review remarks.
eskimor Feb 22, 2021
41f60e4
Merge branch 'rk-drop-redundant-messages-2306' into rk-availability-d…
eskimor Feb 22, 2021
9780f3a
Fix test suite.
eskimor Feb 22, 2021
5bbcea4
core.relay_parent -> leaf for session request.
eskimor Feb 22, 2021
b792a89
Style fix.
eskimor Feb 22, 2021
75e6af8
Decrease request timeout.
eskimor Feb 22, 2021
ca7c182
Merge branch 'rk-metrics-2306' into rk-availability-distribution-2306
eskimor Feb 23, 2021
53fdeb3
Cleanup obsolete errors.
eskimor Feb 23, 2021
ce21a10
Metrics + don't fail on non fatal errors.
eskimor Feb 23, 2021
64d7246
requester.rs -> requester/mod.rs
eskimor Feb 23, 2021
2a9650f
Panic on invalid BadValidator report.
eskimor Feb 23, 2021
4d05d00
Fix indentation.
eskimor Feb 23, 2021
aadc80f
Use typed default timeout constant.
eskimor Feb 23, 2021
e45f61c
Make channel size 0, as each sender gets one slot anyways.
eskimor Feb 23, 2021
43dfd1c
Fix incorrect metrics initialization.
eskimor Feb 23, 2021
e262782
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 23, 2021
5353157
Fix build after merge.
eskimor Feb 23, 2021
ff94444
More fixes.
eskimor Feb 23, 2021
6b71e54
Hopefully valid metrics names.
eskimor Feb 24, 2021
190adaa
Better metrics names.
eskimor Feb 25, 2021
8901344
Some tests that already work.
eskimor Feb 25, 2021
1d29b5c
Slightly better docs.
eskimor Feb 25, 2021
83ff666
Some more tests.
eskimor Feb 26, 2021
e9210e5
Merge branch 'master' into rk-availability-distribution-2306
eskimor Feb 26, 2021
a0e01ec
Fix network bridge test.
eskimor Feb 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Sketch out logic in FetchTask for actual fetching.
- Compile fixes.
- Cleanup.
  • Loading branch information
eskimor committed Feb 13, 2021
commit d8fda81ec6726172da3a72b6b0fb1b33539f9b22
32 changes: 28 additions & 4 deletions node/network/availability-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use thiserror::Error;
use futures::channel::oneshot;

use polkadot_node_subsystem_util::Error as UtilError;
use polkadot_subsystem::{SubsystemError};
use polkadot_subsystem::{
errors::{ChainApiError, RuntimeApiError},
SubsystemError,
};

#[derive(Debug, Error)]
pub enum Error {
Expand All @@ -36,9 +39,17 @@ pub enum Error {
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),

/// Some request to the runtime in the session cache failed.
#[error("Session cache runtime request failed")]
SessionCacheRuntimRequest(#[source] UtilError),
/// Some request to utility functions failed.
#[error("Runtime request failed")]
UtilRequest(#[source] UtilError),

/// Some request to the runtime failed.
#[error("Runtime request failed")]
RuntimeRequestCanceled(#[source] oneshot::Canceled),

/// Some request to the runtime failed.
#[error("Runtime request failed")]
RuntimeRequest(#[source] RuntimeApiError),
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -48,3 +59,16 @@ impl From<SubsystemError> for Error {
Self::IncomingMessageChannel(err)
}
}

/// Receive a response from a runtime request and convert errors.
pub(crate) async fn recv_runtime<V>(
r: std::result::Result<
oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
UtilError,
>,
) -> Result<V> {
r.map_err(Error::UtilRequest)?
.await
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
}
216 changes: 190 additions & 26 deletions node/network/availability-distribution/src/fetch_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,27 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::rc::Rc;
use std::collections::HashSet;
use std::rc::Rc;

use futures::channel::oneshot;
use v1::AvailabilityFetchingResponse;

use super::{session_cache::SessionInfo, LOG_TARGET};
use polkadot_node_network_protocol::request_response::v1;
use polkadot_primitives::v1::{
BlakeTwo256, CoreState, ErasureChunk, Hash, HashT,
SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
CandidateDescriptor, OccupiedCore,
};
use polkadot_subsystem::{
jaeger, errors::{ChainApiError, RuntimeApiError}, PerLeafSpan,
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError,
BlakeTwo256, CandidateDescriptor, CandidateHash, CoreState, ErasureChunk, Hash, HashT,
OccupiedCore, SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID,
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent
NetworkBridgeEvent, NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_subsystem::{
errors::{ChainApiError, RuntimeApiError},
jaeger, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, SpawnedSubsystem,
Subsystem, SubsystemContext, SubsystemError,
};
use super::{session_cache::SessionInfo, LOG_TARGET};

pub struct FetchTask {
/// For what relay parents this task is relevant.
Expand All @@ -42,37 +44,109 @@ pub struct FetchTask {
/// stop keeping track of that candidate/chunk.
live_in: HashSet<Hash>,

/// The relay parent providing the context for the candidate.
relay_parent: Hash,

/// Some details about the to be fetched candidate.
descriptor: CandidateDescriptor,

/// We keep the task around in state `Fetched` until `live_in` becomes empty, to make
/// sure we won't re-fetch an already fetched candidate.
state: FetchedState,

session: Rc<SessionInfo>
/// Session information.
session: Rc<SessionInfo>,
}

/// State of a particular candidate chunk fetching process.
enum FetchedState {
/// Chunk is currently being fetched.
Fetching,
///
/// Once the contained `Sender` is dropped, any still running task will be canceled.
Fetching(oneshot::Sender<()>),
/// Chunk has already been fetched successfully.
Fetched,
/// All relevant live_in have been removed, before we were able to get our chunk.
Canceled,
}

/// Messages sent from `FetchTask`s to be handled/forwarded.
pub enum FromFetchTask {
/// Message to other subsystem.
Message(AllMessages),

/// Concluded with result.
///
/// In case of `None` everything was fine, in case of `Some` some validators in the group
/// did not serve us our chunk as expected.
Concluded(Option<BadValidators>),
}

/// Report of bad validators.
pub struct BadValidators {
/// The session index that was used.
pub session_index: SessionIndex,
/// The group the not properly responding validators are.
pub group_index: GroupIndex,
/// The indeces of the bad validators.
pub bad_validators: Vec<ValidatorIndex>,
}

/// Information a running task needs.
struct RunningTask {
/// For what session we have been spawned.
session_index: SessionIndex,

/// Index of validator group.
group_index: GroupIndex,

/// Validators to request the chunk from.
group: Vec<ValidatorIndex>,

/// The request to send.
request: v1::AvailabilityFetchingRequest,

/// Root hash, for verifying the chunks validity.
erasure_root: Hash,

/// Relay parent of the candidate to fetch.
relay_parent: Hash,

/// Sender for communicating with other subsystems and reporting results.
sender: mpsc::Sender<FromFetchTask>,

/// Receive `Canceled` errors here.
receiver: oneshot::Receiver<()>,
}

impl FetchTask {
// /// Start fetching a chunk.
// pub async fn start<Context>(ctx: &mut Context, leaf: Hash, core: OccupiedCore) -> Self
// where
// Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
// {
// panic
// }
/// Start fetching a chunk.
pub async fn start<Context>(
ctx: &mut Context,
leaf: Hash,
core: OccupiedCore,
session_info: Rc<SessionInfo>,
sender: mpsc::Sender<FromFetchTask>,
) -> Self
where
Context: SubsystemContext,
{
let (handle, receiver) = oneshot::channel();
let running = RunningTask {
session_index: session_info.session_index,
group_index: core.group_responsible,
group: session_info.validator_groups.get(core.group_responsible).expect("The responsible group of a candidate should be available in the corresponding session. qed.").clone(),
request: v1::AvailabilityFetchingRequest {
candidate_hash: core.candidate_hash,
index: session_info.our_index,
},
erasure_root: core.candidate_descriptor.erasure_root,
relay_parent: core.candidate_descriptor.relay_parent,
sender,
receiver,
};
ctx.spawn("chunk-fetcher", Pin::new(Box::new(running.run())))
.await?;
FetchTask {
live_in: HashSet::from(leaf),
state: FetchedState::Fetching(handle),
session: session_info,
}
}

/// Add the given leaf to the relay parents which are making this task relevant.
pub fn add_leaf(&mut self, leaf: Hash) {
Expand Down Expand Up @@ -101,6 +175,96 @@ impl FetchTask {

/// Retrieve the relay parent providing the context for this candidate.
pub fn get_relay_parent(&self) -> Hash {
self.relay_parent
self.descriptor.relay_parent
}
}

/// Things that can go wrong in task execution.
#[derive(Debug)]
enum TaskError {
/// The peer failed to deliver a correct chunk for some reason (has been reported as
/// appropriate).
PeerError,
/// This very node is seemingly shutting down (sending of message failed).
ShuttingDown,
}

type Result<T> = std::result::Result<T, TaskError>;

impl RunningTask {
async fn run(self) {
let bad_validators = Vec::new();
// Try validators in order:
for index in self.group {

// Send request:
let resp = match do_request(index).await {
Ok(resp) => resp,
Err(TaskError::ShuttingDown) => {
tracking::info("Node seems to be shutting down, canceling fetch task");
return;
}
Err(TaskError::PeerError) => {
bad_validators.push(index);
continue
}
};

// Data valid?
if !self.validate_response(&resp) {
bad_validators.push(index);
continue
}

// Ok, let's store it and be happy.
store_response(resp);
break;
}
conclude(bad_validators);
}

/// Do request and return response, if successful.
///
/// Will also report peer if not successful.
async fn do_request(&self, validator: ValidatorIndex) -> std::result::Result<v1::AvailabilityFetchingResponse, TaskError> {
let peer = self.get_peer_id(index)?;
let (full_request, response_recv) =
Requests::AvailabilityFetching(OutgoingRequest::new(peer, self.request));

self.sender.send(FromFetchTask::Message(
AllMessages::NetworkBridgeMessage::SendRequests(Vec::from(full_request)),
)).await.map_err(|| TaskError::ShuttingDown)?;

match response_recv.await {
Ok(resp) => Some(resp),
Err(RequestError::InvalidResponse(err)) => {
},
Err(RequestError::NetworkError(err)) => {
}
Err(RequestError::Canceled(err)) => {
}
}
Err(PeerError)
}

fn get_peer_id(index: ValidatorIndex) -> Result<PeerId> {
panic!("TO BE IMPLEMENTED");
}

/// Tell subsystem we are done.
async fn conclude(&self, bad_validators: Vec<ValidatorIndex>) {
let payload = if bad_validators.is_empty() {
None
}
else {
Some(BadValidators {
session_index: self.session_index,
group_index: self.group_index,
bad_validators,
})
};
if let Err(err) = self.sender.send(FromFetchTask::Concluded(payload)).await {
tracing::warn!(LOG_TARGET, err: ?err, "Sending concluded message for task failed");
}
}
}
7 changes: 2 additions & 5 deletions node/network/availability-distribution/src/session_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,15 @@ pub struct SessionCache {

/// Localized session information, tailored for the needs of availability distribution.
pub struct SessionInfo {
/// The index of this session.
pub session_index: SessionIndex,
/// Validator groups of the current session.
///
/// Each group's order is randomized. This way we achieve load balancing when requesting
/// chunks, as the validators in a group will be tried in that randomized order. Each node
/// should arrive at a different order, therefore we distribute the load.
pub validator_groups: Vec<Vec<ValidatorIndex>>,

/// All validators of that session.
///
/// Needed for authority discovery and finding ourselves.
pub validators: Vec<ValidatorId>,

/// Information about ourself:
pub our_index: ValidatorIndex,
}
Expand Down
Loading