Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
857883b
Wip
eskimor Mar 29, 2021
4f11772
Merge branch 'master' into rk-req-res-runtime
eskimor Mar 30, 2021
7dc3c40
Increase proposer timeout.
eskimor Apr 1, 2021
7f428aa
WIP.
eskimor Apr 1, 2021
c045773
Better timeout values now that we are going to be connected to all no…
eskimor Mar 31, 2021
fe1f0a1
Better and more consistent sizes.
eskimor Apr 1, 2021
8a0b22f
Introduce statement fetching request.
eskimor Apr 1, 2021
86b147c
WIP
eskimor Apr 1, 2021
d68d646
Statement cache retrieval logic.
eskimor Apr 2, 2021
707c96b
Review remarks by @rphmeier
eskimor Apr 3, 2021
ded3f78
Fixes.
eskimor Apr 5, 2021
a64bfef
Better requester logic.
eskimor Apr 6, 2021
5aa7dba
WIP: Handle requester messages.
eskimor Apr 6, 2021
afc7f08
Missing dep.
eskimor Apr 6, 2021
10d7b8f
Fix request launching logic.
eskimor Apr 6, 2021
351a0d1
Finish fetching logic.
eskimor Apr 7, 2021
d9b80cc
Sending logic.
eskimor Apr 7, 2021
03e11ca
Redo code size calculations.
eskimor Apr 7, 2021
0a57c1f
Update Cargo.lock (new dep)
eskimor Apr 7, 2021
6ff978c
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 7, 2021
f4a1748
Get request receiver to statement distribution.
eskimor Apr 7, 2021
ad907dc
Expose new functionality for responding to requests.
eskimor Apr 7, 2021
186ab0e
Cleanup.
eskimor Apr 7, 2021
77c6d8c
Responder logic.
eskimor Apr 7, 2021
41243fa
Fixes + Cleanup.
eskimor Apr 7, 2021
6d76bbf
Cargo.lock
eskimor Apr 7, 2021
52d1a69
Whitespace.
eskimor Apr 7, 2021
095f78e
Add lost copyright.
eskimor Apr 7, 2021
129b0fa
Launch responder task.
eskimor Apr 7, 2021
ae3c492
Typo.
eskimor Apr 7, 2021
49b4eae
info -> warn
eskimor Apr 7, 2021
2e37f70
Typo.
eskimor Apr 7, 2021
c3e911a
Fix.
eskimor Apr 7, 2021
98d366e
Fix.
eskimor Apr 7, 2021
8118d0c
Update comment.
eskimor Apr 7, 2021
3687401
Doc fix.
eskimor Apr 7, 2021
e3610d2
Better large statement heuristics.
eskimor Apr 8, 2021
f901615
Fix tests.
eskimor Apr 8, 2021
8b99d30
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
e66dc8b
Fix network bridge tests.
eskimor Apr 8, 2021
ce8e5b5
Add test for size estimate.
eskimor Apr 8, 2021
12f78a1
Very simple tests that checks we get LargeStatement.
eskimor Apr 8, 2021
b76af99
Basic check, that fetching of large candidates is performed.
eskimor Apr 8, 2021
7f2936b
More tests.
eskimor Apr 8, 2021
f2883ff
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
2a56490
Basic metrics for responder.
eskimor Apr 8, 2021
653ea6a
More metrics.
eskimor Apr 8, 2021
861e8d8
Use Encode::encoded_size().
eskimor Apr 8, 2021
ba8b7fb
Some useful spans.
eskimor Apr 8, 2021
42945ce
Get rid of redundant metrics.
eskimor Apr 8, 2021
2fcde8f
Don't add peer on duplicate.
eskimor Apr 8, 2021
6acc8ab
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
0b51e89
Properly check hash
eskimor Apr 8, 2021
606ff6b
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
ca11986
Preserve ordering + better flood protection.
eskimor Apr 9, 2021
3180253
Get rid of redundant clone.
eskimor Apr 9, 2021
5af1636
Don't shutdown responder on failed query.
eskimor Apr 9, 2021
5c4e316
Smaller fixes.
eskimor Apr 9, 2021
1454d6a
Quotes.
eskimor Apr 9, 2021
48cac47
Better queue size calculation.
eskimor Apr 9, 2021
0f3a21c
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 9, 2021
e506d64
A bit saner response sizes.
eskimor Apr 9, 2021
25df93f
Fixes.
eskimor Apr 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions node/core/proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ use sp_transaction_pool::TransactionPool;
use prometheus_endpoint::Registry as PrometheusRegistry;
use std::{fmt, pin::Pin, sync::Arc, time};

/// How long proposal can take before we give up and err out
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(2500);
/// How long proposing can take, before we give up and err out. We need a relatively large timeout
/// here as long as we have large payload in statement distribution. Assuming we can reach most
/// nodes within two hops, we will take about 2 seconds for transferring statements (data transfer
/// only). If necessary, we could be able to reduce this to 3 seconds. To consider: The lower the
/// riskier that we will not be able to include a candidate.
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(4000);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What needs to happen after that 4 seconds and can it fit within the remaining two seconds?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invoking the on_initialize, adding transactions, and then on_finalize of the block. 2 seconds is a bit of a squeeze, honestly.

When this timeout is hit, it means that no parachain stuff will be included which probably makes the 2 seconds more acceptable.


/// Custom Proposer factory for Polkadot
pub struct ProposerFactory<TxPool, Backend, Client> {
Expand Down
64 changes: 59 additions & 5 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use futures::channel::mpsc;
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;

use polkadot_subsystem::{
ActiveLeavesUpdate, ActivatedLeaf, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult, SubsystemSender, OverseerSignal, FromOverseer,
};
use polkadot_subsystem::{ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SubsystemSender, messages::StatementDistributionMessage};
use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages,
CollatorProtocolMessage, NetworkBridgeEvent,
Expand Down Expand Up @@ -842,12 +839,16 @@ where

let NetworkBridge {
network_service,
request_multiplexer,
mut request_multiplexer,
authority_discovery_service,
metrics,
sync_oracle,
} = bridge;

let statement_receiver = request_multiplexer
.get_statement_fetching()
.expect("Gets initialized, must be `Some` on startup. qed.");

let (validation_worker_tx, validation_worker_rx) = mpsc::channel(1024);

let (remote, network_event_handler) = handle_network_messages(
Expand All @@ -861,6 +862,10 @@ where

ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?;

ctx.send_message(AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(statement_receiver)
)).await;

let subsystem_event_handler = handle_subsystem_messages(
ctx,
network_service,
Expand Down Expand Up @@ -1777,6 +1782,13 @@ mod tests {

let view = view![Hash::repeat_byte(1)];

assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);

// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
Expand Down Expand Up @@ -1822,6 +1834,13 @@ mod tests {
ObservedRole::Full,
).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);

// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
Expand Down Expand Up @@ -1887,6 +1906,13 @@ mod tests {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);

// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
Expand Down Expand Up @@ -1964,6 +1990,13 @@ mod tests {
network_handle.connect_peer(peer_a.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer_b.clone(), PeerSet::Collation, ObservedRole::Full).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);

// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
Expand Down Expand Up @@ -2052,6 +2085,13 @@ mod tests {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);

// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
Expand Down Expand Up @@ -2205,6 +2245,13 @@ mod tests {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);

// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
Expand Down Expand Up @@ -2366,6 +2413,13 @@ mod tests {
0,
);

assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);

assert_sends_validation_event_to_all(
NetworkBridgeEvent::OurViewChange(our_view.clone()),
&mut virtual_overseer,
Expand Down
25 changes: 24 additions & 1 deletion node/network/bridge/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ use polkadot_subsystem::messages::AllMessages;
/// type, useful for the network bridge to send them via the `Overseer` to other subsystems.
///
/// The resulting stream will end once any of its input ends.
///
/// TODO: Get rid of this: https://github.com/paritytech/polkadot/issues/2842
pub struct RequestMultiplexer {
receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>,
statement_fetching: Option<mpsc::Receiver<network::IncomingRequest>>,
next_poll: usize,
}

Expand All @@ -58,21 +61,38 @@ impl RequestMultiplexer {
/// `RequestMultiplexer` from it. The returned `RequestResponseConfig`s must be passed to the
/// network implementation.
pub fn new() -> (Self, Vec<RequestResponseConfig>) {
let (receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter()
let (mut receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter()
.map(|p| {
let (rx, cfg) = p.get_config();
((p, rx), cfg)
})
.unzip();

let index = receivers.iter().enumerate().find_map(|(i, (p, _))|
if let Protocol::StatementFetching = p {
Some(i)
} else {
None
}
).expect("Statement fetching must be registered. qed.");
let statement_fetching = Some(receivers.remove(index).1);

(
Self {
receivers,
statement_fetching,
next_poll: 0,
},
cfgs,
)
}

/// Get the receiver for handling statement fetching requests.
///
/// This function will only return `Some` once.
pub fn get_statement_fetching(&mut self) -> Option<mpsc::Receiver<network::IncomingRequest>> {
std::mem::take(&mut self.statement_fetching)
}
}

impl Stream for RequestMultiplexer {
Expand Down Expand Up @@ -151,6 +171,9 @@ fn multiplex_single(
decode_with_peer::<v1::AvailableDataFetchingRequest>(peer, payload)?,
pending_response,
)),
Protocol::StatementFetching => {
panic!("Statement fetching requests are handled directly. qed.");
}
};
Ok(r)
}
Expand Down
68 changes: 63 additions & 5 deletions node/network/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,7 @@ pub mod v1 {
use parity_scale_codec::{Encode, Decode};
use std::convert::TryFrom;

use polkadot_primitives::v1::{
CandidateIndex, CollatorId, Hash, Id as ParaId, SignedAvailabilityBitfield,
CollatorSignature,
};
use polkadot_primitives::v1::{CandidateHash, CandidateIndex, CollatorId, CollatorSignature, CompactStatement, Hash, Id as ParaId, SignedAvailabilityBitfield, ValidatorIndex, ValidatorSignature};
use polkadot_node_primitives::{
approval::{IndirectAssignmentCert, IndirectSignedApprovalVote},
SignedFullStatement,
Expand All @@ -313,7 +310,68 @@ pub mod v1 {
pub enum StatementDistributionMessage {
/// A signed full statement under a given relay-parent.
#[codec(index = 0)]
Statement(Hash, SignedFullStatement)
Statement(Hash, SignedFullStatement),
/// Seconded statement with large payload (e.g. containing a runtime upgrade).
///
/// We only gossip the hash in that case, actual payloads can be fetched from sending node
/// via req/response.
#[codec(index = 1)]
LargeStatement(StatementMetadata),
}

/// Data that maes a statement unique.
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, Hash)]
pub struct StatementMetadata {
/// Relayt parent this statement is relevant under.
pub relay_parent: Hash,
/// Hash of the candidate that got validated.
pub candidate_hash: CandidateHash,
/// Validator that attested the valididty.
pub signed_by: ValidatorIndex,
/// Signature of seconding validator.
pub signature: ValidatorSignature,
}

impl StatementDistributionMessage {
/// Get meta data of the given `StatementDistributionMessage`.
pub fn get_metadata(&self) -> StatementMetadata {
match self {
Self::Statement(relay_parent, statement) => StatementMetadata {
relay_parent: *relay_parent,
candidate_hash: statement.payload().candidate_hash(),
signed_by: statement.validator_index(),
signature: statement.signature().clone(),
},
Self::LargeStatement(metadata) => metadata.clone(),
}
}

/// Get fingerprint describing the contained statement uniquely.
pub fn get_fingerprint(&self) -> (CompactStatement, ValidatorIndex) {
match self {
Self::Statement(_, statement) =>
(statement.payload().to_compact(), statement.validator_index()),
Self::LargeStatement(meta) =>
(CompactStatement::Seconded(meta.candidate_hash), meta.signed_by),
}
}

/// Get contained relay parent.
pub fn get_relay_parent(&self) -> Hash {
match self {
Self::Statement(r, _) => *r,
Self::LargeStatement(meta) => meta.relay_parent,
}
}

/// Whether or not this message contains a large statement.
pub fn is_large_statement(&self) -> bool {
if let Self::LargeStatement(_) = self {
true
} else {
false
}
}
}

/// Network messages used by the approval distribution subsystem.
Expand Down
Loading