Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
857883b
Wip
eskimor Mar 29, 2021
4f11772
Merge branch 'master' into rk-req-res-runtime
eskimor Mar 30, 2021
7dc3c40
Increase proposer timeout.
eskimor Apr 1, 2021
7f428aa
WIP.
eskimor Apr 1, 2021
c045773
Better timeout values now that we are going to be connected to all no…
eskimor Mar 31, 2021
fe1f0a1
Better and more consistent sizes.
eskimor Apr 1, 2021
8a0b22f
Introduce statement fetching request.
eskimor Apr 1, 2021
86b147c
WIP
eskimor Apr 1, 2021
d68d646
Statement cache retrieval logic.
eskimor Apr 2, 2021
707c96b
Review remarks by @rphmeier
eskimor Apr 3, 2021
ded3f78
Fixes.
eskimor Apr 5, 2021
a64bfef
Better requester logic.
eskimor Apr 6, 2021
5aa7dba
WIP: Handle requester messages.
eskimor Apr 6, 2021
afc7f08
Missing dep.
eskimor Apr 6, 2021
10d7b8f
Fix request launching logic.
eskimor Apr 6, 2021
351a0d1
Finish fetching logic.
eskimor Apr 7, 2021
d9b80cc
Sending logic.
eskimor Apr 7, 2021
03e11ca
Redo code size calculations.
eskimor Apr 7, 2021
0a57c1f
Update Cargo.lock (new dep)
eskimor Apr 7, 2021
6ff978c
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 7, 2021
f4a1748
Get request receiver to statement distribution.
eskimor Apr 7, 2021
ad907dc
Expose new functionality for responding to requests.
eskimor Apr 7, 2021
186ab0e
Cleanup.
eskimor Apr 7, 2021
77c6d8c
Responder logic.
eskimor Apr 7, 2021
41243fa
Fixes + Cleanup.
eskimor Apr 7, 2021
6d76bbf
Cargo.lock
eskimor Apr 7, 2021
52d1a69
Whitespace.
eskimor Apr 7, 2021
095f78e
Add lost copyright.
eskimor Apr 7, 2021
129b0fa
Launch responder task.
eskimor Apr 7, 2021
ae3c492
Typo.
eskimor Apr 7, 2021
49b4eae
info -> warn
eskimor Apr 7, 2021
2e37f70
Typo.
eskimor Apr 7, 2021
c3e911a
Fix.
eskimor Apr 7, 2021
98d366e
Fix.
eskimor Apr 7, 2021
8118d0c
Update comment.
eskimor Apr 7, 2021
3687401
Doc fix.
eskimor Apr 7, 2021
e3610d2
Better large statement heuristics.
eskimor Apr 8, 2021
f901615
Fix tests.
eskimor Apr 8, 2021
8b99d30
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
e66dc8b
Fix network bridge tests.
eskimor Apr 8, 2021
ce8e5b5
Add test for size estimate.
eskimor Apr 8, 2021
12f78a1
Very simple tests that checks we get LargeStatement.
eskimor Apr 8, 2021
b76af99
Basic check, that fetching of large candidates is performed.
eskimor Apr 8, 2021
7f2936b
More tests.
eskimor Apr 8, 2021
f2883ff
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
2a56490
Basic metrics for responder.
eskimor Apr 8, 2021
653ea6a
More metrics.
eskimor Apr 8, 2021
861e8d8
Use Encode::encoded_size().
eskimor Apr 8, 2021
ba8b7fb
Some useful spans.
eskimor Apr 8, 2021
42945ce
Get rid of redundant metrics.
eskimor Apr 8, 2021
2fcde8f
Don't add peer on duplicate.
eskimor Apr 8, 2021
6acc8ab
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
0b51e89
Properly check hash
eskimor Apr 8, 2021
606ff6b
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
ca11986
Preserve ordering + better flood protection.
eskimor Apr 9, 2021
3180253
Get rid of redundant clone.
eskimor Apr 9, 2021
5af1636
Don't shutdown responder on failed query.
eskimor Apr 9, 2021
5c4e316
Smaller fixes.
eskimor Apr 9, 2021
1454d6a
Quotes.
eskimor Apr 9, 2021
48cac47
Better queue size calculation.
eskimor Apr 9, 2021
0f3a21c
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 9, 2021
e506d64
A bit saner response sizes.
eskimor Apr 9, 2021
25df93f
Fixes.
eskimor Apr 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions node/core/proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ use sp_transaction_pool::TransactionPool;
use prometheus_endpoint::Registry as PrometheusRegistry;
use std::{fmt, pin::Pin, sync::Arc, time};

/// How long proposal can take before we give up and err out
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(2500);
/// How long propsing can take, before we give up and err out. We need a relatively large timeout
/// here as long as we have large payload in statement distribution. Assuming we can reach most
/// nodes within two hops, we will take about 4 seconds for transferring statemens of a size of
/// about 5 Meg given validator bandwidth of 500 Mbit/s.
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(4000);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What needs to happen after that 4 seconds and can it fit within the remaining two seconds?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invoking the on_initialize, adding transactions, and then on_finalize of the block. 2 seconds is a bit of a squeeze, honestly.

When this timeout is hit, it means that no parachain stuff will be included which probably makes the 2 seconds more acceptable.


/// Custom Proposer factory for Polkadot
pub struct ProposerFactory<TxPool, Backend, Client> {
Expand Down
15 changes: 10 additions & 5 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use futures::channel::mpsc;
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;

use polkadot_subsystem::{
ActiveLeavesUpdate, ActivatedLeaf, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult, SubsystemSender, OverseerSignal, FromOverseer,
};
use polkadot_subsystem::{ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SubsystemSender, messages::StatementDistributionMessage};
use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages,
CollatorProtocolMessage, NetworkBridgeEvent,
Expand Down Expand Up @@ -842,12 +839,16 @@ where

let NetworkBridge {
network_service,
request_multiplexer,
mut request_multiplexer,
authority_discovery_service,
metrics,
sync_oracle,
} = bridge;

let statement_receiver = request_multiplexer
.get_statement_fetching()
.expect("Gets initialized, must be `Some` on startup. qed.");

let (validation_worker_tx, validation_worker_rx) = mpsc::channel(1024);

let (remote, network_event_handler) = handle_network_messages(
Expand All @@ -861,6 +862,10 @@ where

ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?;

ctx.send_message(AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(statement_receiver)
)).await;

let subsystem_event_handler = handle_subsystem_messages(
ctx,
network_service,
Expand Down
25 changes: 24 additions & 1 deletion node/network/bridge/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ use polkadot_subsystem::messages::AllMessages;
/// type, useful for the network bridge to send them via the `Overseer` to other subsystems.
///
/// The resulting stream will end once any of its input ends.
///
/// TODO: Get rid of this: https://github.com/paritytech/polkadot/issues/2842
pub struct RequestMultiplexer {
receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>,
statement_fetching: Option<mpsc::Receiver<network::IncomingRequest>>,
next_poll: usize,
}

Expand All @@ -58,21 +61,38 @@ impl RequestMultiplexer {
/// `RequestMultiplexer` from it. The returned `RequestResponseConfig`s must be passed to the
/// network implementation.
pub fn new() -> (Self, Vec<RequestResponseConfig>) {
let (receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter()
let (mut receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter()
.map(|p| {
let (rx, cfg) = p.get_config();
((p, rx), cfg)
})
.unzip();

let index = receivers.iter().enumerate().find_map(|(i, (p, _))|
if let Protocol::StatementFetching = p {
Some(i)
} else {
None
}
).expect("Statement fetching must be registered. qed.");
let statement_fetching = Some(receivers.remove(index).1);

(
Self {
receivers,
statement_fetching,
next_poll: 0,
},
cfgs,
)
}

/// Get the receiver for handling statement fetching requests.
///
/// This function will only return `Some` once.
pub fn get_statement_fetching(&mut self) -> Option<mpsc::Receiver<network::IncomingRequest>> {
std::mem::take(&mut self.statement_fetching)
}
}

impl Stream for RequestMultiplexer {
Expand Down Expand Up @@ -151,6 +171,9 @@ fn multiplex_single(
decode_with_peer::<v1::AvailableDataFetchingRequest>(peer, payload)?,
pending_response,
)),
Protocol::StatementFetching => {
panic!("Statement fetching requests are handled directly. qed.");
}
};
Ok(r)
}
Expand Down
41 changes: 36 additions & 5 deletions node/network/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,7 @@ pub mod v1 {
use parity_scale_codec::{Encode, Decode};
use std::convert::TryFrom;

use polkadot_primitives::v1::{
CandidateIndex, CollatorId, Hash, Id as ParaId, SignedAvailabilityBitfield,
CollatorSignature,
};
use polkadot_primitives::v1::{CandidateHash, CandidateIndex, CollatorId, CollatorSignature, Hash, Id as ParaId, SignedAvailabilityBitfield, ValidatorIndex, ValidatorSignature};
use polkadot_node_primitives::{
approval::{IndirectAssignmentCert, IndirectSignedApprovalVote},
SignedFullStatement,
Expand All @@ -313,7 +310,41 @@ pub mod v1 {
pub enum StatementDistributionMessage {
/// A signed full statement under a given relay-parent.
#[codec(index = 0)]
Statement(Hash, SignedFullStatement)
Statement(Hash, SignedFullStatement),
/// Seconded statement with large payload (e.g. containing a runtime upgrade).
///
/// We only gossip the hash in that case, actual payloads can be fetched from sending node
/// via req/response.
#[codec(index = 1)]
LargeStatement(StatementMetadata),
}

/// Data that maes a statement unique.
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, Hash)]
pub struct StatementMetadata {
/// Relayt parent this statement is relevant under.
pub relay_parent: Hash,
/// Hash of the candidate that got validated.
pub candidate_hash: CandidateHash,
/// Validator that attested the valididty.
pub signed_by: ValidatorIndex,
/// Signature of seconding validator.
pub signature: ValidatorSignature,
}

impl StatementDistributionMessage {
/// Get meta data of the given `StatementDistributionMessage`.
pub fn get_metadata(&self) -> StatementMetadata {
match self {
Self::Statement(relay_parent, statement) => StatementMetadata {
relay_parent: *relay_parent,
candidate_hash: statement.payload().candidate_hash(),
signed_by: statement.validator_index(),
signature: statement.signature().clone(),
},
Self::LargeStatement(metadata) => metadata.clone(),
}
}
}

/// Network messages used by the approval distribution subsystem.
Expand Down
62 changes: 58 additions & 4 deletions node/network/protocol/src/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
//!
//! Versioned (v1 module): The actual requests and responses as sent over the network.

use std::borrow::Cow;
use std::{borrow::Cow, u64};
use std::time::Duration;

use futures::channel::mpsc;
use polkadot_node_primitives::MAX_COMPRESSED_POV_SIZE;
use polkadot_primitives::v1::MAX_CODE_SIZE;
use strum::EnumIter;

pub use sc_network::config as network;
Expand Down Expand Up @@ -64,8 +65,15 @@ pub enum Protocol {
PoVFetching,
/// Protocol for fetching available data.
AvailableDataFetching,
/// Fetching of statements that are too large for gossip.
StatementFetching,
}


/// Minimum bandwidth we expect for validators - 500Mbit/s is the recommendation, so approximately
/// 50Meg bytes per second:
const MIN_BANDWIDTH_BYTES: u64 = 50 * 1024 * 1024;

/// Default request timeout in seconds.
///
/// When decreasing this value, take into account that the very first request might need to open a
Expand All @@ -78,14 +86,22 @@ const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
/// peer set as well).
const DEFAULT_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_secs(1);

/// Minimum bandwidth we expect for validators - 500Mbit/s is the recommendation, so approximately
/// 50Meg bytes per second:
const MIN_BANDWIDTH_BYTES: u64 = 50 * 1024 * 1024;
/// Timeout for PoV like data, 2 times what it should take, assuming we can fully utilize the
/// bandwidth. This amounts to two seconds right now.
const POV_REQUEST_TIMEOUT_CONNECTED: Duration =
Duration::from_millis(2 * 1000 * (MAX_COMPRESSED_POV_SIZE as u64) / MIN_BANDWIDTH_BYTES);

/// We want timeout statement requests fast, so we don't waste time on slow nodes. Responders will
/// try their best to either serve within that timeout or return an error immediately. (We need to
/// fit statement distribution within a block of 6 seconds.)
const STATEMENTS_TIMEOUT: Duration = Duration::from_secs(1);

/// We don't want a slow peer to slow down all the others, at the same time we want to get out the
/// data quickly in full to at least some peers (as this will reduce load on us as they then can
/// start serving the data). So this value is a tradeoff. 3 seems to be sensible. So we would need
/// to have 3 slow noded connected, to delay transfer for others by `STATEMENTS_TIMEOUT`.
const MAX_PARALLEL_STATEMENT_REQUESTS: u32 = 3;

impl Protocol {
/// Get a configuration for a given Request response protocol.
///
Expand Down Expand Up @@ -134,6 +150,23 @@ impl Protocol {
request_timeout: POV_REQUEST_TIMEOUT_CONNECTED,
inbound_queue: Some(tx),
},
Protocol::StatementFetching => RequestResponseConfig {
name: p_name,
max_request_size: 1_000,
// Available data size is dominated by the PoV size.
max_response_size: MAX_CODE_SIZE as u64,
// We need statement fetching to be fast and will try our best at the responding
// side to answer requests within that timeout, assuming a bandwidth of 500Mbit/s
// - which is the recommended minimum bandwidth for nodes on Kusama as of April
// 2021.
// Responders will reject requests, if it is unlikely they can serve them within
// the timeout, so the requester can immediately try another node, instead of
// waiting for timeout on an overloaded node. Fetches from slow nodes will likely
// fail, but this is desired, so we can quickly move on to a faster one - we should
// also decrease its reputation.
request_timeout: Duration::from_secs(1),
inbound_queue: Some(tx),
},
};
(rx, cfg)
}
Expand All @@ -154,6 +187,26 @@ impl Protocol {
// Validators are constantly self-selecting to request available data which may lead
// to constant load and occasional burstiness.
Protocol::AvailableDataFetching => 100,
// Our queue size approximation is how many blocks of the size of
// a runtime we can transfer within a statements timeout, minus the requests we handle
// in parallel.
Protocol::StatementFetching => {
// We assume we can utilize up to 70% of the available bandwidth for statements.
// This is just a guess/estimate, with the following considerations: If we are
// faster than that, queue size will stay low anyway, even if not - requesters will
// get an immediate error, but if we are slower, requesters will run in a timeout -
// waisting precious time.
let available_bandwidth = 7 * MIN_BANDWIDTH_BYTES / 10;
let size = u64::saturating_sub(
STATEMENTS_TIMEOUT.as_secs() * available_bandwidth / MAX_CODE_SIZE as u64,
MAX_PARALLEL_STATEMENT_REQUESTS as u64
);
debug_assert!(
size > 0,
"We should have a channel size greater zero, otherwise we won't accept any requests."
);
size as usize
}
}
}

Expand All @@ -169,6 +222,7 @@ impl Protocol {
Protocol::CollationFetching => "/polkadot/req_collation/1",
Protocol::PoVFetching => "/polkadot/req_pov/1",
Protocol::AvailableDataFetching => "/polkadot/req_available_data/1",
Protocol::StatementFetching => "/polkadot/req_statement/1",
}
}
}
47 changes: 47 additions & 0 deletions node/network/protocol/src/request_response/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use sc_network::PeerId;

use polkadot_primitives::v1::AuthorityDiscoveryId;

use crate::UnifiedReputationChange;

use super::{v1, Protocol};

/// Common properties of any `Request`.
Expand All @@ -47,6 +49,8 @@ pub enum Requests {
PoVFetching(OutgoingRequest<v1::PoVFetchingRequest>),
/// Request full available data from a node.
AvailableDataFetching(OutgoingRequest<v1::AvailableDataFetchingRequest>),
/// Requests for fetching large statements as part of statement distribution.
StatementFetching(OutgoingRequest<v1::StatementFetchingRequest>),
}

impl Requests {
Expand All @@ -57,6 +61,7 @@ impl Requests {
Self::CollationFetching(_) => Protocol::CollationFetching,
Self::PoVFetching(_) => Protocol::PoVFetching,
Self::AvailableDataFetching(_) => Protocol::AvailableDataFetching,
Self::StatementFetching(_) => Protocol::StatementFetching,
}
}

Expand All @@ -73,6 +78,7 @@ impl Requests {
Self::CollationFetching(r) => r.encode_request(),
Self::PoVFetching(r) => r.encode_request(),
Self::AvailableDataFetching(r) => r.encode_request(),
Self::StatementFetching(r) => r.encode_request(),
}
}
}
Expand Down Expand Up @@ -199,6 +205,22 @@ pub struct IncomingRequest<Req> {
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
}

/// Typed variant of [`netconfig::OutgoingResponse`].
///
/// Responses to `IncomingRequest`s.
pub struct OutgoingResponse<Response> {
/// The payload of the response.
pub result: Result<Response, ()>,

/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<UnifiedReputationChange>,

/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
/// peer.
pub sent_feedback: Option<oneshot::Sender<()>>,
}

impl<Req> IncomingRequest<Req>
where
Req: IsRequest,
Expand Down Expand Up @@ -232,6 +254,31 @@ where
})
.map_err(|_| resp)
}

/// Send response with additional options.
///
/// This variant allows for waiting for the response to be sent out, allows for changing peer's
/// reputation and allows for not sending a response at all (for only changing the peer's
/// reputation).
pub fn send_outoing_response(self, resp: OutgoingResponse<<Req as IsRequest>::Response>)
-> Result<(), ()> {
let OutgoingResponse {
result,
reputation_changes,
sent_feedback,
} = resp;

let response = netconfig::OutgoingResponse {
result: result.map(|v| v.encode()),
reputation_changes: reputation_changes
.into_iter()
.map(|c| c.into_base_rep())
.collect(),
sent_feedback,
};

self.pending_response.send(response).map_err(|_| ())
}
}

/// Future for actually receiving a typed response for an OutgoingRequest.
Expand Down
Loading