diff --git a/Cargo.lock b/Cargo.lock index b5c4f91edaf6..41b498d789e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6497,6 +6497,7 @@ dependencies = [ "assert_matches", "futures 0.3.13", "indexmap", + "parity-scale-codec", "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", @@ -6504,6 +6505,7 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-primitives", "sc-keystore", + "sc-network", "sp-application-crypto", "sp-core", "sp-keyring", diff --git a/node/core/proposer/src/lib.rs b/node/core/proposer/src/lib.rs index 1a8fd5c28a31..ce2b8386fa00 100644 --- a/node/core/proposer/src/lib.rs +++ b/node/core/proposer/src/lib.rs @@ -40,8 +40,12 @@ use sp_transaction_pool::TransactionPool; use prometheus_endpoint::Registry as PrometheusRegistry; use std::{fmt, pin::Pin, sync::Arc, time}; -/// How long proposal can take before we give up and err out -const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(2500); +/// How long proposing can take, before we give up and err out. We need a relatively large timeout +/// here as long as we have large payload in statement distribution. Assuming we can reach most +/// nodes within two hops, we will take about 2 seconds for transferring statements (data transfer +/// only). If necessary, we could be able to reduce this to 3 seconds. To consider: The lower the +/// riskier that we will not be able to include a candidate. +const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(4000); /// Custom Proposer factory for Polkadot pub struct ProposerFactory { diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index efffc81234ff..dc921047f969 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -28,8 +28,9 @@ use sc_network::Event as NetworkEvent; use sp_consensus::SyncOracle; use polkadot_subsystem::{ - ActiveLeavesUpdate, ActivatedLeaf, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, - SubsystemResult, SubsystemSender, OverseerSignal, FromOverseer, + ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, + Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SubsystemSender, + messages::StatementDistributionMessage }; use polkadot_subsystem::messages::{ NetworkBridgeMessage, AllMessages, @@ -842,12 +843,16 @@ where let NetworkBridge { network_service, - request_multiplexer, + mut request_multiplexer, authority_discovery_service, metrics, sync_oracle, } = bridge; + let statement_receiver = request_multiplexer + .get_statement_fetching() + .expect("Gets initialized, must be `Some` on startup. qed."); + let (validation_worker_tx, validation_worker_rx) = mpsc::channel(1024); let (remote, network_event_handler) = handle_network_messages( @@ -861,6 +866,10 @@ where ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?; + ctx.send_message(AllMessages::StatementDistribution( + StatementDistributionMessage::StatementFetchingReceiver(statement_receiver) + )).await; + let subsystem_event_handler = handle_subsystem_messages( ctx, network_service, @@ -1777,6 +1786,13 @@ mod tests { let view = view![Hash::repeat_byte(1)]; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::StatementFetchingReceiver(_) + ) + ); + // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( @@ -1822,6 +1838,13 @@ mod tests { ObservedRole::Full, ).await; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::StatementFetchingReceiver(_) + ) + ); + // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( @@ -1887,6 +1910,13 @@ mod tests { network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await; network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::StatementFetchingReceiver(_) + ) + ); + // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( @@ -1964,6 +1994,13 @@ mod tests { network_handle.connect_peer(peer_a.clone(), PeerSet::Validation, ObservedRole::Full).await; network_handle.connect_peer(peer_b.clone(), PeerSet::Collation, ObservedRole::Full).await; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::StatementFetchingReceiver(_) + ) + ); + // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( @@ -2052,6 +2089,13 @@ mod tests { network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await; network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::StatementFetchingReceiver(_) + ) + ); + // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( @@ -2205,6 +2249,13 @@ mod tests { network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await; network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::StatementFetchingReceiver(_) + ) + ); + // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( @@ -2366,6 +2417,13 @@ mod tests { 0, ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::StatementFetchingReceiver(_) + ) + ); + assert_sends_validation_event_to_all( NetworkBridgeEvent::OurViewChange(our_view.clone()), &mut virtual_overseer, diff --git a/node/network/bridge/src/multiplexer.rs b/node/network/bridge/src/multiplexer.rs index c71ba33a871b..ad65309d3eea 100644 --- a/node/network/bridge/src/multiplexer.rs +++ b/node/network/bridge/src/multiplexer.rs @@ -37,8 +37,11 @@ use polkadot_subsystem::messages::AllMessages; /// type, useful for the network bridge to send them via the `Overseer` to other subsystems. /// /// The resulting stream will end once any of its input ends. +/// +/// TODO: Get rid of this: https://github.com/paritytech/polkadot/issues/2842 pub struct RequestMultiplexer { receivers: Vec<(Protocol, mpsc::Receiver)>, + statement_fetching: Option>, next_poll: usize, } @@ -58,21 +61,38 @@ impl RequestMultiplexer { /// `RequestMultiplexer` from it. The returned `RequestResponseConfig`s must be passed to the /// network implementation. pub fn new() -> (Self, Vec) { - let (receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter() + let (mut receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter() .map(|p| { let (rx, cfg) = p.get_config(); ((p, rx), cfg) }) .unzip(); + let index = receivers.iter().enumerate().find_map(|(i, (p, _))| + if let Protocol::StatementFetching = p { + Some(i) + } else { + None + } + ).expect("Statement fetching must be registered. qed."); + let statement_fetching = Some(receivers.remove(index).1); + ( Self { receivers, + statement_fetching, next_poll: 0, }, cfgs, ) } + + /// Get the receiver for handling statement fetching requests. + /// + /// This function will only return `Some` once. + pub fn get_statement_fetching(&mut self) -> Option> { + std::mem::take(&mut self.statement_fetching) + } } impl Stream for RequestMultiplexer { @@ -151,6 +171,9 @@ fn multiplex_single( decode_with_peer::(peer, payload)?, pending_response, )), + Protocol::StatementFetching => { + panic!("Statement fetching requests are handled directly. qed."); + } }; Ok(r) } diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 06304859e6cc..33cb2034adfd 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -291,10 +291,7 @@ pub mod v1 { use parity_scale_codec::{Encode, Decode}; use std::convert::TryFrom; - use polkadot_primitives::v1::{ - CandidateIndex, CollatorId, Hash, Id as ParaId, SignedAvailabilityBitfield, - CollatorSignature, - }; + use polkadot_primitives::v1::{CandidateHash, CandidateIndex, CollatorId, CollatorSignature, CompactStatement, Hash, Id as ParaId, SignedAvailabilityBitfield, ValidatorIndex, ValidatorSignature}; use polkadot_node_primitives::{ approval::{IndirectAssignmentCert, IndirectSignedApprovalVote}, SignedFullStatement, @@ -313,7 +310,68 @@ pub mod v1 { pub enum StatementDistributionMessage { /// A signed full statement under a given relay-parent. #[codec(index = 0)] - Statement(Hash, SignedFullStatement) + Statement(Hash, SignedFullStatement), + /// Seconded statement with large payload (e.g. containing a runtime upgrade). + /// + /// We only gossip the hash in that case, actual payloads can be fetched from sending node + /// via req/response. + #[codec(index = 1)] + LargeStatement(StatementMetadata), + } + + /// Data that maes a statement unique. + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, Hash)] + pub struct StatementMetadata { + /// Relayt parent this statement is relevant under. + pub relay_parent: Hash, + /// Hash of the candidate that got validated. + pub candidate_hash: CandidateHash, + /// Validator that attested the valididty. + pub signed_by: ValidatorIndex, + /// Signature of seconding validator. + pub signature: ValidatorSignature, + } + + impl StatementDistributionMessage { + /// Get meta data of the given `StatementDistributionMessage`. + pub fn get_metadata(&self) -> StatementMetadata { + match self { + Self::Statement(relay_parent, statement) => StatementMetadata { + relay_parent: *relay_parent, + candidate_hash: statement.payload().candidate_hash(), + signed_by: statement.validator_index(), + signature: statement.signature().clone(), + }, + Self::LargeStatement(metadata) => metadata.clone(), + } + } + + /// Get fingerprint describing the contained statement uniquely. + pub fn get_fingerprint(&self) -> (CompactStatement, ValidatorIndex) { + match self { + Self::Statement(_, statement) => + (statement.payload().to_compact(), statement.validator_index()), + Self::LargeStatement(meta) => + (CompactStatement::Seconded(meta.candidate_hash), meta.signed_by), + } + } + + /// Get contained relay parent. + pub fn get_relay_parent(&self) -> Hash { + match self { + Self::Statement(r, _) => *r, + Self::LargeStatement(meta) => meta.relay_parent, + } + } + + /// Whether or not this message contains a large statement. + pub fn is_large_statement(&self) -> bool { + if let Self::LargeStatement(_) = self { + true + } else { + false + } + } } /// Network messages used by the approval distribution subsystem. diff --git a/node/network/protocol/src/request_response/mod.rs b/node/network/protocol/src/request_response/mod.rs index 99fd8b18da1c..8310d0f9c8c5 100644 --- a/node/network/protocol/src/request_response/mod.rs +++ b/node/network/protocol/src/request_response/mod.rs @@ -32,11 +32,12 @@ //! //! Versioned (v1 module): The actual requests and responses as sent over the network. -use std::borrow::Cow; +use std::{borrow::Cow, u64}; use std::time::Duration; use futures::channel::mpsc; use polkadot_node_primitives::MAX_POV_SIZE; +use polkadot_primitives::v1::MAX_CODE_SIZE; use strum::EnumIter; pub use sc_network::config as network; @@ -64,8 +65,15 @@ pub enum Protocol { PoVFetching, /// Protocol for fetching available data. AvailableDataFetching, + /// Fetching of statements that are too large for gossip. + StatementFetching, } + +/// Minimum bandwidth we expect for validators - 500Mbit/s is the recommendation, so approximately +/// 50Meg bytes per second: +const MIN_BANDWIDTH_BYTES: u64 = 50 * 1024 * 1024; + /// Default request timeout in seconds. /// /// When decreasing this value, take into account that the very first request might need to open a @@ -78,14 +86,22 @@ const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3); /// peer set as well). const DEFAULT_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_secs(1); -/// Minimum bandwidth we expect for validators - 500Mbit/s is the recommendation, so approximately -/// 50Meg bytes per second: -const MIN_BANDWIDTH_BYTES: u64 = 50 * 1024 * 1024; /// Timeout for PoV like data, 2 times what it should take, assuming we can fully utilize the /// bandwidth. This amounts to two seconds right now. const POV_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_millis(2 * 1000 * (MAX_POV_SIZE as u64) / MIN_BANDWIDTH_BYTES); +/// We want timeout statement requests fast, so we don't waste time on slow nodes. Responders will +/// try their best to either serve within that timeout or return an error immediately. (We need to +/// fit statement distribution within a block of 6 seconds.) +const STATEMENTS_TIMEOUT: Duration = Duration::from_secs(1); + +/// We don't want a slow peer to slow down all the others, at the same time we want to get out the +/// data quickly in full to at least some peers (as this will reduce load on us as they then can +/// start serving the data). So this value is a tradeoff. 3 seems to be sensible. So we would need +/// to have 3 slow noded connected, to delay transfer for others by `STATEMENTS_TIMEOUT`. +pub const MAX_PARALLEL_STATEMENT_REQUESTS: u32 = 3; + impl Protocol { /// Get a configuration for a given Request response protocol. /// @@ -105,16 +121,16 @@ impl Protocol { let cfg = match self { Protocol::ChunkFetching => RequestResponseConfig { name: p_name, - max_request_size: 10_000, - max_response_size: 10_000_000, + max_request_size: 1_000, + max_response_size: MAX_POV_SIZE as u64 / 10, // We are connected to all validators: request_timeout: DEFAULT_REQUEST_TIMEOUT_CONNECTED, inbound_queue: Some(tx), }, Protocol::CollationFetching => RequestResponseConfig { name: p_name, - max_request_size: 10_000, - max_response_size: MAX_POV_SIZE as u64, + max_request_size: 1_000, + max_response_size: MAX_POV_SIZE as u64 + 1000, // Taken from initial implementation in collator protocol: request_timeout: POV_REQUEST_TIMEOUT_CONNECTED, inbound_queue: Some(tx), @@ -130,10 +146,28 @@ impl Protocol { name: p_name, max_request_size: 1_000, // Available data size is dominated by the PoV size. - max_response_size: MAX_POV_SIZE as u64, + max_response_size: MAX_POV_SIZE as u64 + 1000, request_timeout: POV_REQUEST_TIMEOUT_CONNECTED, inbound_queue: Some(tx), }, + Protocol::StatementFetching => RequestResponseConfig { + name: p_name, + max_request_size: 1_000, + // Available data size is dominated code size. + // + 1000 to account for protocol overhead (should be way less). + max_response_size: MAX_CODE_SIZE as u64 + 1000, + // We need statement fetching to be fast and will try our best at the responding + // side to answer requests within that timeout, assuming a bandwidth of 500Mbit/s + // - which is the recommended minimum bandwidth for nodes on Kusama as of April + // 2021. + // Responders will reject requests, if it is unlikely they can serve them within + // the timeout, so the requester can immediately try another node, instead of + // waiting for timeout on an overloaded node. Fetches from slow nodes will likely + // fail, but this is desired, so we can quickly move on to a faster one - we should + // also decrease its reputation. + request_timeout: Duration::from_secs(1), + inbound_queue: Some(tx), + }, }; (rx, cfg) } @@ -154,6 +188,26 @@ impl Protocol { // Validators are constantly self-selecting to request available data which may lead // to constant load and occasional burstiness. Protocol::AvailableDataFetching => 100, + // Our queue size approximation is how many blocks of the size of + // a runtime we can transfer within a statements timeout, minus the requests we handle + // in parallel. + Protocol::StatementFetching => { + // We assume we can utilize up to 70% of the available bandwidth for statements. + // This is just a guess/estimate, with the following considerations: If we are + // faster than that, queue size will stay low anyway, even if not - requesters will + // get an immediate error, but if we are slower, requesters will run in a timeout - + // waisting precious time. + let available_bandwidth = 7 * MIN_BANDWIDTH_BYTES / 10; + let size = u64::saturating_sub( + STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64), + MAX_PARALLEL_STATEMENT_REQUESTS as u64 + ); + debug_assert!( + size > 0, + "We should have a channel size greater zero, otherwise we won't accept any requests." + ); + size as usize + } } } @@ -169,6 +223,7 @@ impl Protocol { Protocol::CollationFetching => "/polkadot/req_collation/1", Protocol::PoVFetching => "/polkadot/req_pov/1", Protocol::AvailableDataFetching => "/polkadot/req_available_data/1", + Protocol::StatementFetching => "/polkadot/req_statement/1", } } } diff --git a/node/network/protocol/src/request_response/request.rs b/node/network/protocol/src/request_response/request.rs index 4e8bbca81b05..69b86ff7f642 100644 --- a/node/network/protocol/src/request_response/request.rs +++ b/node/network/protocol/src/request_response/request.rs @@ -25,6 +25,8 @@ use sc_network::PeerId; use polkadot_primitives::v1::AuthorityDiscoveryId; +use crate::UnifiedReputationChange; + use super::{v1, Protocol}; /// Common properties of any `Request`. @@ -47,6 +49,8 @@ pub enum Requests { PoVFetching(OutgoingRequest), /// Request full available data from a node. AvailableDataFetching(OutgoingRequest), + /// Requests for fetching large statements as part of statement distribution. + StatementFetching(OutgoingRequest), } impl Requests { @@ -57,6 +61,7 @@ impl Requests { Self::CollationFetching(_) => Protocol::CollationFetching, Self::PoVFetching(_) => Protocol::PoVFetching, Self::AvailableDataFetching(_) => Protocol::AvailableDataFetching, + Self::StatementFetching(_) => Protocol::StatementFetching, } } @@ -73,6 +78,7 @@ impl Requests { Self::CollationFetching(r) => r.encode_request(), Self::PoVFetching(r) => r.encode_request(), Self::AvailableDataFetching(r) => r.encode_request(), + Self::StatementFetching(r) => r.encode_request(), } } } @@ -199,6 +205,22 @@ pub struct IncomingRequest { pending_response: oneshot::Sender, } +/// Typed variant of [`netconfig::OutgoingResponse`]. +/// +/// Responses to `IncomingRequest`s. +pub struct OutgoingResponse { + /// The payload of the response. + pub result: Result, + + /// Reputation changes accrued while handling the request. To be applied to the reputation of + /// the peer sending the request. + pub reputation_changes: Vec, + + /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the + /// peer. + pub sent_feedback: Option>, +} + impl IncomingRequest where Req: IsRequest, @@ -232,6 +254,31 @@ where }) .map_err(|_| resp) } + + /// Send response with additional options. + /// + /// This variant allows for waiting for the response to be sent out, allows for changing peer's + /// reputation and allows for not sending a response at all (for only changing the peer's + /// reputation). + pub fn send_outgoing_response(self, resp: OutgoingResponse<::Response>) + -> Result<(), ()> { + let OutgoingResponse { + result, + reputation_changes, + sent_feedback, + } = resp; + + let response = netconfig::OutgoingResponse { + result: result.map(|v| v.encode()), + reputation_changes: reputation_changes + .into_iter() + .map(|c| c.into_base_rep()) + .collect(), + sent_feedback, + }; + + self.pending_response.send(response).map_err(|_| ()) + } } /// Future for actually receiving a typed response for an OutgoingRequest. diff --git a/node/network/protocol/src/request_response/v1.rs b/node/network/protocol/src/request_response/v1.rs index bd362eb2c510..89c732773552 100644 --- a/node/network/protocol/src/request_response/v1.rs +++ b/node/network/protocol/src/request_response/v1.rs @@ -18,10 +18,7 @@ use parity_scale_codec::{Decode, Encode}; -use polkadot_primitives::v1::{ - CandidateHash, CandidateReceipt, ValidatorIndex, - Hash, -}; +use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, CommittedCandidateReceipt, Hash, ValidatorIndex}; use polkadot_primitives::v1::Id as ParaId; use polkadot_node_primitives::{AvailableData, PoV, ErasureChunk}; @@ -169,3 +166,29 @@ impl IsRequest for AvailableDataFetchingRequest { type Response = AvailableDataFetchingResponse; const PROTOCOL: Protocol = Protocol::AvailableDataFetching; } + +/// Request for fetching a large statement via request/response. +#[derive(Debug, Clone, Encode, Decode)] +pub struct StatementFetchingRequest { + /// Data needed to locate and identify the needed statement. + pub relay_parent: Hash, + /// Hash of candidate that was used create the CommitedCandidateRecept. + pub candidate_hash: CandidateHash, +} + +/// Respond with found full statement. +/// +/// In this protocol the requester will only request data it was previously notified about, +/// therefore not having the data is not really an option and would just result in a +/// `RequestFailure`. +#[derive(Debug, Clone, Encode, Decode)] +pub enum StatementFetchingResponse { + /// Data missing to reconstruct the full signed statement. + #[codec(index = 0)] + Statement(CommittedCandidateReceipt), +} + +impl IsRequest for StatementFetchingRequest { + type Response = StatementFetchingResponse; + const PROTOCOL: Protocol = Protocol::StatementFetching; +} diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml index 5bcae793a348..ead19ecdb4b8 100644 --- a/node/network/statement-distribution/Cargo.toml +++ b/node/network/statement-distribution/Cargo.toml @@ -10,12 +10,14 @@ futures = "0.3.12" tracing = "0.1.25" polkadot-primitives = { path = "../../../primitives" } sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-node-network-protocol = { path = "../../network/protocol" } arrayvec = "0.5.2" indexmap = "1.6.1" +parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] } [dev-dependencies] polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index d1c58d3bec22..3d8c428e8ef3 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. +// Copyright 2021 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify @@ -22,35 +22,53 @@ #![deny(unused_crate_dependencies)] #![warn(missing_docs)] +use parity_scale_codec::Encode; + use polkadot_subsystem::{ - Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem, - ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, - jaeger, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, SpawnedSubsystem, Subsystem, + SubsystemContext, SubsystemError, SubsystemResult, jaeger, messages::{ - AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage, - RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent, + AllMessages, NetworkBridgeMessage, StatementDistributionMessage, + CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent, }, }; use polkadot_node_subsystem_util::{ metrics::{self, prometheus}, self as util, MIN_GOSSIP_PEERS, }; -use polkadot_node_primitives::{SignedFullStatement}; +use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_primitives::v1::{ - Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, CandidateHash, + CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash, + SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature }; use polkadot_node_network_protocol::{ - v1 as protocol_v1, View, PeerId, UnifiedReputationChange as Rep, + IfDisconnected, PeerId, UnifiedReputationChange as Rep, View, + peer_set::{ + IsAuthority, PeerSet + }, + v1::{ + self as protocol_v1, StatementMetadata + } }; -use futures::prelude::*; +use futures::{channel::mpsc, future::RemoteHandle, prelude::*}; use futures::channel::oneshot; use indexmap::IndexSet; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, hash_map::Entry}; + +/// Background task logic for requesting of large statements. +mod requester; +use requester::{RequesterMessage, fetch}; + +/// Background task logic for responding for large statements. +mod responder; +use responder::{ResponderMessage, respond}; const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement"); +const COST_FETCH_FAIL: Rep = Rep::CostMinor("Requesting `CommittedCandidateReceipt` from peer failed"); const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature"); +const COST_WRONG_HASH: Rep = Rep::CostMajor("Received candidate had wrong hash"); const COST_DUPLICATE_STATEMENT: Rep = Rep::CostMajorRepeated("Statement sent more than once by peer"); const COST_APPARENT_FLOOD: Rep = Rep::Malicious("Peer appears to be flooding us with statements"); @@ -58,6 +76,7 @@ const BENEFIT_VALID_STATEMENT: Rep = Rep::BenefitMajor("Peer provided a valid st const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::BenefitMajorFirst( "Peer was the first to provide a valid statement", ); +const BENEFIT_VALID_RESPONSE: Rep = Rep::BenefitMajor("Peer provided a valid large statement response"); /// The maximum amount of candidates each validator is allowed to second at any relay-parent. /// Short for "Validator Candidate Threshold". @@ -476,6 +495,61 @@ enum NotedStatement<'a> { UsefulButKnown } +/// Large statement fetching status. +enum LargeStatementStatus { + /// We are currently fetching the statement data from a remote peer. We keep a list of other nodes + /// claiming to have that data and will fallback on them. + Fetching(FetchingInfo), + /// Statement data is fetched + Fetched(CommittedCandidateReceipt), +} + +/// Info about a fetch in progress. +struct FetchingInfo { + /// All peers that send us a `LargeStatement` or a `Valid` statement for the given + /// `CandidateHash`, together with their originally sent messages. + available_peers: HashMap>, + /// Peers left to try in case the background task needs it. + peers_to_try: Vec, + /// Sender for sending fresh peers to the fetching task in case of failure. + peer_sender: Option>>, + /// Task taking care of the request. + /// + /// Will be killed once dropped. + #[allow(dead_code)] + fetching_task: RemoteHandle<()>, +} + +/// Messages to be handled in this subsystem. +enum Message { + /// Messages from other subsystems. + Subsystem(SubsystemResult>), + /// Messages from spawned requester background tasks. + Requester(Option), + /// Messages from spawned responder background task. + Responder(Option) +} + +impl Message { + async fn receive( + ctx: &mut impl SubsystemContext, + from_requester: &mut mpsc::Receiver, + from_responder: &mut mpsc::Receiver, + ) -> Message { + // We are only fusing here to make `select` happy, in reality we will quit if one of those + // streams end: + let from_overseer = ctx.recv().fuse(); + let from_requester = from_requester.next().fuse(); + let from_responder = from_responder.next().fuse(); + futures::pin_mut!(from_overseer, from_requester, from_responder); + futures::select!( + msg = from_overseer => Message::Subsystem(msg), + msg = from_requester => Message::Requester(msg), + msg = from_responder => Message::Responder(msg), + ) + } +} + #[derive(Debug, PartialEq, Eq)] enum DeniedStatement { NotUseful, @@ -490,6 +564,8 @@ struct ActiveHeadData { /// These are iterable in insertion order, and `Seconded` statements are always /// accepted before dependent statements. statements: IndexSet, + /// Large statements we are waiting for with associated meta data. + waiting_large_statements: HashMap, /// The validators at this head. validators: Vec, /// The session index this head is at. @@ -509,6 +585,7 @@ impl ActiveHeadData { ActiveHeadData { candidates: Default::default(), statements: Default::default(), + waiting_large_statements: Default::default(), validators, session_index, seconded_counts: Default::default(), @@ -717,7 +794,7 @@ fn check_statement_signature( async fn circulate_statement_and_dependents( peers: &mut HashMap, active_heads: &mut HashMap, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemContext, relay_parent: Hash, statement: SignedFullStatement, metrics: &Metrics, @@ -771,10 +848,44 @@ async fn circulate_statement_and_dependents( fn statement_message(relay_parent: Hash, statement: SignedFullStatement) -> protocol_v1::ValidationProtocol -{ - protocol_v1::ValidationProtocol::StatementDistribution( +{ + let msg = if is_statement_large(&statement) { + protocol_v1::StatementDistributionMessage::LargeStatement( + StatementMetadata { + relay_parent, + candidate_hash: statement.payload().candidate_hash(), + signed_by: statement.validator_index(), + signature: statement.signature().clone(), + } + ) + } else { protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement) - ) + }; + + protocol_v1::ValidationProtocol::StatementDistribution(msg) +} + +/// Check whether a statement should be treated as large statement. +fn is_statement_large(statement: &SignedFullStatement) -> bool { + match &statement.payload() { + Statement::Seconded(committed) => { + // Runtime upgrades will always be large and even if not - no harm done. + if committed.commitments.new_validation_code.is_some() { + return true + } + // No runtime upgrade, now we need to be more nuanced: + let size = statement.encoded_size(); + + // Half max size seems to be a good threshold to start not using notifications: + let threshold = + PeerSet::Validation.get_info(IsAuthority::Yes) + .max_notification_size as usize / 2; + + size >= threshold + } + Statement::Valid(_) => + false, + } } /// Circulates a statement to all peers who have not seen it yet, and returns @@ -782,7 +893,7 @@ fn statement_message(relay_parent: Hash, statement: SignedFullStatement) #[tracing::instrument(level = "trace", skip(peers, ctx), fields(subsystem = LOG_TARGET))] async fn circulate_statement( peers: &mut HashMap, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemContext, relay_parent: Hash, stored: &StoredStatement, ) -> Vec { @@ -832,7 +943,7 @@ async fn circulate_statement( async fn send_statements_about( peer: PeerId, peer_data: &mut PeerData, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemContext, relay_parent: Hash, candidate_hash: CandidateHash, active_head: &ActiveHeadData, @@ -870,7 +981,7 @@ async fn send_statements_about( async fn send_statements( peer: PeerId, peer_data: &mut PeerData, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemContext, relay_parent: Hash, active_head: &ActiveHeadData, metrics: &Metrics, @@ -911,44 +1022,256 @@ async fn report_peer( )).await } -// Handle an incoming wire message. Returns a reference to a newly-stored statement +/// If message contains a statement, then retrieve it, otherwise fork task to fetch it. +/// +/// This function will also return `None` if the message did not pass some basic checks, in that +/// case no statement will be requested, on the flipside you get `ActiveHeadData` in addition to +/// your statement. +/// +/// If the message was large, but the result has been fetched already that one is returned. +async fn retrieve_statement_from_message<'a>( + peer: PeerId, + message: protocol_v1::StatementDistributionMessage, + active_head: &'a mut ActiveHeadData, + ctx: &mut impl SubsystemContext, + req_sender: &mpsc::Sender, + metrics: &Metrics, +) -> Option { + + let fingerprint = message.get_fingerprint(); + let candidate_hash = *fingerprint.0.candidate_hash(); + + // Immediately return any Seconded statement: + let message = + if let protocol_v1::StatementDistributionMessage::Statement(h, s) = message { + if let Statement::Seconded(_) = s.payload() { + return Some(s) + } + protocol_v1::StatementDistributionMessage::Statement(h, s) + } else { + message + }; + + match active_head.waiting_large_statements.entry(candidate_hash) { + Entry::Occupied(mut occupied) => { + match occupied.get_mut() { + LargeStatementStatus::Fetching(info) => { + + let is_new_peer = !info.available_peers.contains_key(&peer); + let is_large_statement = message.is_large_statement(); + + match info.available_peers.entry(peer) { + Entry::Occupied(mut occupied) => { + occupied.get_mut().push(message); + } + Entry::Vacant(vacant) => { + vacant.insert(vec![message]); + } + } + + if is_new_peer & is_large_statement { + info.peers_to_try.push(peer); + // Answer any pending request for more peers: + if let Some(sender) = std::mem::take(&mut info.peer_sender) { + let to_send = std::mem::take(&mut info.peers_to_try); + if let Err(peers) = sender.send(to_send) { + // Requester no longer interested for now, might want them + // later: + info.peers_to_try = peers; + } + } + } + } + LargeStatementStatus::Fetched(committed) => { + match message { + protocol_v1::StatementDistributionMessage::Statement(_, s) => { + // We can now immediately return any statements (should only be + // `Statement::Valid` ones, but we don't care at this point.) + return Some(s) + } + protocol_v1::StatementDistributionMessage::LargeStatement(metadata) => { + + let validator_id = active_head.validators.get(metadata.signed_by.0 as usize); + + if let Some(validator_id) = validator_id { + let signing_context = SigningContext { + session_index: active_head.session_index, + parent_hash: metadata.relay_parent, + }; + + let statement = SignedFullStatement::new( + Statement::Seconded(committed.clone()), + metadata.signed_by, + metadata.signature.clone(), + &signing_context, + validator_id, + ); + + if let Some(statement) = statement { + return Some(statement) + } else { + tracing::debug!( + target: LOG_TARGET, + validator_index = ?metadata.signed_by, + "Building statement failed - invalid signature!" + ); + report_peer(ctx, peer, COST_INVALID_SIGNATURE).await; + } + } else { + tracing::debug!( + target: LOG_TARGET, + validator_index = ?metadata.signed_by, + "Error loading statement, could not find key for validator." + ); + } + } + } + } + } + } + Entry::Vacant(vacant) => { + match message { + protocol_v1::StatementDistributionMessage::LargeStatement(metadata) => { + if let Some(new_status) = launch_request( + metadata, + peer, + req_sender.clone(), + ctx, + metrics + ).await { + vacant.insert(new_status); + } + } + protocol_v1::StatementDistributionMessage::Statement(_, s) => { + // No fetch in progress, safe to return any statement immediately (we don't bother + // about normal network jitter which might cause `Valid` statements to arrive early + // for now.). + return Some(s) + } + } + } + } + None +} + +/// Launch request for a large statement and get tracking status. +/// +/// Returns `None` if spawning task failed. +async fn launch_request( + meta: StatementMetadata, + peer: PeerId, + req_sender: mpsc::Sender, + ctx: &mut impl SubsystemContext, + metrics: &Metrics, +) -> Option { + + let (task, handle) = fetch( + meta.relay_parent, + meta.candidate_hash, + vec![peer], + req_sender, + metrics.clone(), + ) + .remote_handle(); + + let result = ctx.spawn("large-statement-fetcher", task.boxed()) + .await; + if let Err(err) = result { + tracing::error!(target: LOG_TARGET, ?err, "Spawning task failed."); + return None + } + let available_peers = { + let mut m = HashMap::new(); + m.insert(peer, vec![protocol_v1::StatementDistributionMessage::LargeStatement(meta)]); + m + }; + Some(LargeStatementStatus::Fetching(FetchingInfo { + available_peers, + peers_to_try: Vec::new(), + peer_sender: None, + fetching_task: handle, + })) +} + +/// Handle incoming message and circulate it to peers, if we did not know it already. +/// +async fn handle_incoming_message_and_circulate<'a>( + peer: PeerId, + peers: &mut HashMap, + active_heads: &'a mut HashMap, + ctx: &mut impl SubsystemContext, + message: protocol_v1::StatementDistributionMessage, + req_sender: &mpsc::Sender, + metrics: &Metrics, +) { + let handled_incoming = match peers.get_mut(&peer) { + Some(data) => { + handle_incoming_message( + peer, + data, + active_heads, + ctx, + message, + req_sender, + metrics, + ).await + } + None => None, + }; + + // if we got a fresh message, we need to circulate it to all peers. + if let Some((relay_parent, statement)) = handled_incoming { + // we can ignore the set of peers who this function returns as now expecting + // dependent statements. + // + // we have the invariant in this subsystem that we never store a `Valid` or `Invalid` + // statement before a `Seconded` statement. `Seconded` statements are the only ones + // that require dependents. Thus, if this is a `Seconded` statement for a candidate we + // were not aware of before, we cannot have any dependent statements from the candidate. + let _ = circulate_statement( + peers, + ctx, + relay_parent, + statement, + ).await; + } +} + +// Handle a statement. Returns a reference to a newly-stored statement // if we were not already aware of it, along with the corresponding relay-parent. // // This function checks the signature and ensures the statement is compatible with our // view. It also notifies candidate backing if the statement was previously unknown. -#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))] async fn handle_incoming_message<'a>( peer: PeerId, peer_data: &mut PeerData, active_heads: &'a mut HashMap, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemContext, message: protocol_v1::StatementDistributionMessage, + req_sender: &mpsc::Sender, metrics: &Metrics, ) -> Option<(Hash, &'a StoredStatement)> { - let (relay_parent, statement) = match message { - protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s), - }; + let relay_parent = message.get_relay_parent(); let active_head = match active_heads.get_mut(&relay_parent) { Some(h) => h, None => { tracing::debug!( target: LOG_TARGET, - ?peer, - ?relay_parent, - "Unknown (or outdated) relay parent" + %relay_parent, + "our view out-of-sync with active heads; head not found", ); report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await; - return None; + return None } }; - let candidate_hash = statement.payload().candidate_hash(); + let fingerprint = message.get_fingerprint(); + let candidate_hash = fingerprint.0.candidate_hash().clone(); let handle_incoming_span = active_head.span.child("handle-incoming") .with_candidate(candidate_hash) .with_peer_id(&peer); - let fingerprint = (statement.payload().to_compact(), statement.validator_index()); let max_message_count = active_head.validators.len() * 2; // perform only basic checks before verifying the signature @@ -957,7 +1280,7 @@ async fn handle_incoming_message<'a>( tracing::debug!( target: LOG_TARGET, ?peer, - ?statement, + ?message, ?rep, "Error inserting received statement" ); @@ -965,6 +1288,20 @@ async fn handle_incoming_message<'a>( return None; } + let statement = retrieve_statement_from_message( + peer, + message, + active_head, + ctx, + req_sender, + metrics, + ).await; + + let statement = match statement { + None => return None, + Some(statement) => statement, + }; + match active_head.check_useful_or_unknown(statement.clone()) { Ok(()) => {}, Err(DeniedStatement::NotUseful) => { @@ -1047,7 +1384,7 @@ async fn handle_incoming_message<'a>( async fn update_peer_view_and_send_unlocked( peer: PeerId, peer_data: &mut PeerData, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemContext, active_heads: &HashMap, new_view: View, metrics: &Metrics, @@ -1078,11 +1415,11 @@ async fn update_peer_view_and_send_unlocked( } } -#[tracing::instrument(level = "trace", skip(peers, active_heads, ctx, metrics), fields(subsystem = LOG_TARGET))] async fn handle_network_update( peers: &mut HashMap, active_heads: &mut HashMap, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemContext, + req_sender: &mpsc::Sender, update: NetworkBridgeEvent, metrics: &Metrics, ) { @@ -1108,36 +1445,15 @@ async fn handle_network_update( peers.remove(&peer); } NetworkBridgeEvent::PeerMessage(peer, message) => { - let handled_incoming = match peers.get_mut(&peer) { - Some(data) => { - handle_incoming_message( - peer, - data, - active_heads, - ctx, - message, - metrics, - ).await - } - None => None, - }; - - // if we got a fresh message, we need to circulate it to all peers. - if let Some((relay_parent, statement)) = handled_incoming { - // we can ignore the set of peers who this function returns as now expecting - // dependent statements. - // - // we have the invariant in this subsystem that we never store a `Valid` or `Invalid` - // statement before a `Seconded` statement. `Seconded` statements are the only ones - // that require dependents. Thus, if this is a `Seconded` statement for a candidate we - // were not aware of before, we cannot have any dependent statements from the candidate. - let _ = circulate_statement( - peers, - ctx, - relay_parent, - statement, - ).await; - } + handle_incoming_message_and_circulate( + peer, + peers, + active_heads, + ctx, + message, + req_sender, + metrics, + ).await; } NetworkBridgeEvent::PeerViewChange(peer, view) => { tracing::trace!( @@ -1164,7 +1480,6 @@ async fn handle_network_update( // handled by `ActiveLeavesUpdate` } } - } impl StatementDistribution { @@ -1175,64 +1490,277 @@ impl StatementDistribution { ) -> SubsystemResult<()> { let mut peers: HashMap = HashMap::new(); let mut active_heads: HashMap = HashMap::new(); - let metrics = self.metrics; + // Sender/Receiver for getting news from our statement fetching tasks. + let (req_sender, mut req_receiver) = mpsc::channel(1); + // Sender/Receiver for getting news from our responder task. + let (res_sender, mut res_receiver) = mpsc::channel(1); loop { - let message = ctx.recv().await?; - match message { - FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => { - let _timer = metrics.time_active_leaves_update(); + let message = Message::receive(&mut ctx, &mut req_receiver, &mut res_receiver).await; + let finished = match message { + Message::Subsystem(result) => + self.handle_subsystem_message( + &mut ctx, + &mut peers, + &mut active_heads, + &req_sender, + &res_sender, + result?, + ) + .await?, + Message::Requester(result) => + self.handle_requester_message( + &mut ctx, + &mut peers, + &mut active_heads, + &req_sender, + result.ok_or(SubsystemError::Context( + "Failed to read from requester receiver (stream finished)" + .to_string() + ))? + ) + .await?, + Message::Responder(result) => + self.handle_responder_message( + &mut active_heads, + result.ok_or(SubsystemError::Context( + "Failed to read from responder receiver (stream finished)" + .to_string() + ))? + ) + .await?, + }; + if finished { + break + } + } + Ok(()) + } - for activated in activated { - let relay_parent = activated.hash; - let span = PerLeafSpan::new(activated.span, "statement-distribution"); - tracing::trace!( + /// Handle messages from responder background task. + async fn handle_responder_message( + &self, + active_heads: &mut HashMap, + message: ResponderMessage, + ) -> SubsystemResult { + match message { + ResponderMessage::GetData { + relay_parent, + candidate_hash, + tx, + } => { + let active_head = match active_heads.get(&relay_parent) { + Some(head) => head, + None => return Ok(false), + }; + let committed = match active_head.waiting_large_statements.get(&candidate_hash) { + Some(LargeStatementStatus::Fetched(committed)) => committed.clone(), + _ => { + tracing::debug!( target: LOG_TARGET, - hash = ?relay_parent, - "New active leaf", + ?candidate_hash, + "Requested data not found - this should not happen under normal circumstances." ); + return Ok(false) + } + }; - let (validators, session_index) = { - let (val_tx, val_rx) = oneshot::channel(); - let (session_tx, session_rx) = oneshot::channel(); + if let Err(_) = tx.send(committed) { + tracing::debug!( + target: LOG_TARGET, + "Sending data to responder failed" + ); + return Ok(false) + } + } + } + Ok(false) + } - let val_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(val_tx), - ), - ); - let session_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(session_tx), - ), - ); + async fn handle_requester_message( + &self, + ctx: &mut impl SubsystemContext, + peers: &mut HashMap, + active_heads: &mut HashMap, + req_sender: &mpsc::Sender, + message: RequesterMessage, + ) -> SubsystemResult { + match message { + RequesterMessage::Finished { + relay_parent, + candidate_hash, + from_peer, + response, + bad_peers, + } => { + for bad in bad_peers { + report_peer(ctx, bad, COST_FETCH_FAIL).await; + } + report_peer(ctx, from_peer, BENEFIT_VALID_RESPONSE).await; - ctx.send_messages( - std::iter::once(val_message).chain(std::iter::once(session_message)) - ).await; + let active_head = match active_heads.get_mut(&relay_parent) { + Some(head) => head, + None => return Ok(false), + }; - match (val_rx.await?, session_rx.await?) { - (Ok(v), Ok(s)) => (v, s), - (Err(e), _) | (_, Err(e)) => { - tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Failed to fetch runtime API data for active leaf", - ); + let status = active_head + .waiting_large_statements + .remove(&candidate_hash); - // Lacking this bookkeeping might make us behave funny, although - // not in any slashable way. But we shouldn't take down the node - // on what are likely spurious runtime API errors. - continue; - } - } - }; + let info = match status { + Some(LargeStatementStatus::Fetching(info)) => info, + Some(LargeStatementStatus::Fetched(_)) => { + debug_assert!(false, "On status fetched, fetching task already succeeded. qed."); + return Ok(false) + } + None => { + tracing::warn!( + target: LOG_TARGET, + "Received finished task event for non existent status - not supposed to happen." + ); + return Ok(false) + } + }; - active_heads.entry(relay_parent) - .or_insert(ActiveHeadData::new(validators, session_index, span)); + active_head.waiting_large_statements.insert( + candidate_hash, + LargeStatementStatus::Fetched(response), + ); + + // Cache is now populated, send all messages: + for (peer, messages) in info.available_peers { + for message in messages { + handle_incoming_message_and_circulate( + peer, + peers, + active_heads, + ctx, + message, + req_sender, + &self.metrics, + ) + .await; } + } + } + RequesterMessage::SendRequest(req) => { + ctx.send_message( + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendRequests( + vec![req], + IfDisconnected::ImmediateError, + ) + )) + .await; + } + RequesterMessage::GetMorePeers { + relay_parent, + candidate_hash, + tx, + } => { + let active_head = match active_heads.get_mut(&relay_parent) { + Some(head) => head, + None => return Ok(false), + }; + + let status = active_head + .waiting_large_statements + .get_mut(&candidate_hash); + + let info = match status { + Some(LargeStatementStatus::Fetching(info)) => info, + Some(LargeStatementStatus::Fetched(_)) => { + debug_assert!(false, "On status fetched, fetching task already succeeded. qed."); + return Ok(false) + } + None => { + tracing::warn!( + target: LOG_TARGET, + "Received 'get more peers' event for non existent status - not supposed to happen." + ); + return Ok(false) + } + }; + + if info.peers_to_try.is_empty() { + info.peer_sender = Some(tx); + } else { + let peers_to_try = std::mem::take(&mut info.peers_to_try); + if let Err(peers) = tx.send(peers_to_try) { + // No longer interested for now - might want them later: + info.peers_to_try = peers; + } + } + } + RequesterMessage::ReportPeer(peer, rep) => + report_peer(ctx, peer, rep).await, + } + Ok(false) + } + + async fn handle_subsystem_message( + &self, + ctx: &mut impl SubsystemContext, + peers: &mut HashMap, + active_heads: &mut HashMap, + req_sender: &mpsc::Sender, + res_sender: &mpsc::Sender, + message: FromOverseer, + ) -> SubsystemResult { + let metrics = &self.metrics; + + match message { + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => { + let _timer = metrics.time_active_leaves_update(); + + for activated in activated { + let relay_parent = activated.hash; + let span = PerLeafSpan::new(activated.span, "statement-distribution"); + tracing::trace!( + target: LOG_TARGET, + hash = ?relay_parent, + "New active leaf", + ); + + let (validators, session_index) = { + let (val_tx, val_rx) = oneshot::channel(); + let (session_tx, session_rx) = oneshot::channel(); + + let val_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(val_tx), + ), + ); + let session_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(session_tx), + ), + ); + ctx.send_messages( + std::iter::once(val_message).chain(std::iter::once(session_message)) + ).await; + + match (val_rx.await?, session_rx.await?) { + (Ok(v), Ok(s)) => (v, s), + (Err(e), _) | (_, Err(e)) => { + tracing::warn!( + target: LOG_TARGET, + err = ?e, + "Failed to fetch runtime API data for active leaf", + ); + + // Lacking this bookkeeping might make us behave funny, although + // not in any slashable way. But we shouldn't take down the node + // on what are likely spurious runtime API errors. + return Ok(false) + } + } + }; + + active_heads.entry(relay_parent) + .or_insert(ActiveHeadData::new(validators, session_index, span)); active_heads.retain(|h, _| { let live = !deactivated.contains(h); @@ -1246,44 +1774,76 @@ impl StatementDistribution { live }); } - FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => { - // do nothing - } - FromOverseer::Signal(OverseerSignal::Conclude) => break, - FromOverseer::Communication { msg } => match msg { - StatementDistributionMessage::Share(relay_parent, statement) => { - let _timer = metrics.time_share(); - - circulate_statement_and_dependents( - &mut peers, - &mut active_heads, - &mut ctx, - relay_parent, - statement, - &metrics, - ).await; - } - StatementDistributionMessage::NetworkBridgeUpdateV1(event) => { - let _timer = metrics.time_network_bridge_update_v1(); - - handle_network_update( - &mut peers, - &mut active_heads, - &mut ctx, - event, - &metrics, - ).await; + } + FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => { + // do nothing + } + FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(true), + FromOverseer::Communication { msg } => match msg { + StatementDistributionMessage::Share(relay_parent, statement) => { + let _timer = metrics.time_share(); + + // Make sure we have data in cache: + if is_statement_large(&statement) { + if let Statement::Seconded(committed) = &statement.payload() { + let active_head = match active_heads.get_mut(&relay_parent) { + Some(h) => h, + None => { + // This should never be out-of-sync with our view if the view updates + // correspond to actual `StartWork` messages. So we just log and ignore. + tracing::warn!( + target: LOG_TARGET, + %relay_parent, + "our view out-of-sync with active heads; head not found", + ); + return Ok(false) + } + }; + active_head.waiting_large_statements.insert( + statement.payload().candidate_hash(), + LargeStatementStatus::Fetched(committed.clone()) + ); + } } + + circulate_statement_and_dependents( + peers, + active_heads, + ctx, + relay_parent, + statement, + metrics, + ).await; + } + StatementDistributionMessage::NetworkBridgeUpdateV1(event) => { + let _timer = metrics.time_network_bridge_update_v1(); + + handle_network_update( + peers, + active_heads, + ctx, + req_sender, + event, + metrics, + ).await; + } + StatementDistributionMessage::StatementFetchingReceiver(receiver) => { + ctx.spawn( + "large-statement-responder", + respond(receiver, res_sender.clone()).boxed() + ).await?; } } } - Ok(()) + Ok(false) } } #[derive(Clone)] struct MetricsInner { statements_distributed: prometheus::Counter, + sent_requests: prometheus::Counter, + received_responses: prometheus::CounterVec, active_leaves_update: prometheus::Histogram, share: prometheus::Histogram, network_bridge_update_v1: prometheus::Histogram, @@ -1300,6 +1860,19 @@ impl Metrics { } } + fn on_sent_request(&self) { + if let Some(metrics) = &self.0 { + metrics.sent_requests.inc(); + } + } + + fn on_received_response(&self, success: bool) { + if let Some(metrics) = &self.0 { + let label = if success { "succeeded" } else { "failed" }; + metrics.received_responses.with_label_values(&[label]).inc(); + } + } + /// Provide a timer for `active_leaves_update` which observes on drop. fn time_active_leaves_update(&self) -> Option { self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer()) @@ -1326,6 +1899,23 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + sent_requests: prometheus::register( + prometheus::Counter::new( + "parachain_statement_distribution_sent_requests_total", + "Number of large statement fetching requests sent." + )?, + registry, + )?, + received_responses: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_statement_distribution_received_responses_total", + "Number of received responses for large statement data." + ), + &["success"], + )?, + registry, + )?, active_leaves_update: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( @@ -1360,18 +1950,26 @@ impl metrics::Metrics for Metrics { #[cfg(test)] mod tests { + use parity_scale_codec::{Decode, Encode}; use super::*; use std::sync::Arc; use sp_keyring::Sr25519Keyring; use sp_application_crypto::AppKey; use polkadot_node_primitives::Statement; - use polkadot_primitives::v1::CommittedCandidateReceipt; + use polkadot_primitives::v1::{CommittedCandidateReceipt, ValidationCode}; use assert_matches::assert_matches; use futures::executor::{self, block_on}; use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore}; use sc_keystore::LocalKeystore; use polkadot_node_network_protocol::{view, ObservedRole}; use polkadot_subsystem::{jaeger, ActivatedLeaf}; + use polkadot_node_network_protocol::request_response::{ + Requests, + v1::{ + StatementFetchingRequest, + StatementFetchingResponse, + }, + }; #[test] fn active_head_accepts_only_2_seconded_per_validator() { @@ -1767,7 +2365,10 @@ mod tests { }; let pool = sp_core::testing::TaskExecutor::new(); - let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + let (mut ctx, mut handle) = + polkadot_node_subsystem_test_helpers + ::make_subsystem_context + ::(pool); let peer = PeerId::random(); executor::block_on(async move { @@ -1857,7 +2458,10 @@ mod tests { ].into_iter().collect(); let pool = sp_core::testing::TaskExecutor::new(); - let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + let (mut ctx, mut handle) = + polkadot_node_subsystem_test_helpers + ::make_subsystem_context + ::(pool); executor::block_on(async move { let statement = { @@ -2092,4 +2696,240 @@ mod tests { executor::block_on(future::join(test_fut, bg)); } + + #[test] + fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing() { + let hash_a = Hash::repeat_byte(1); + let hash_b = Hash::repeat_byte(2); + + let candidate = { + let mut c = CommittedCandidateReceipt::default(); + c.descriptor.relay_parent = hash_a; + c.descriptor.para_id = 1.into(); + c.commitments.new_validation_code = Some(ValidationCode(vec![1,2,3])); + c + }; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + let validators = vec![ + Sr25519Keyring::Alice.public().into(), + Sr25519Keyring::Bob.public().into(), + Sr25519Keyring::Charlie.public().into(), + ]; + + let session_index = 1; + + let pool = sp_core::testing::TaskExecutor::new(); + let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + + let bg = async move { + let s = StatementDistribution { metrics: Default::default() }; + s.run(ctx).await.unwrap(); + }; + + let (mut tx_reqs, rx_reqs) = mpsc::channel(1); + + let test_fut = async move { + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::StatementFetchingReceiver(rx_reqs) + }).await; + + // register our active heads. + handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: vec![ActivatedLeaf { + hash: hash_a, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }].into(), + deactivated: vec![].into(), + }))).await; + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::Validators(tx)) + ) + if r == hash_a + => { + let _ = tx.send(Ok(validators)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx)) + ) + if r == hash_a + => { + let _ = tx.send(Ok(session_index)); + } + ); + + // notify of peers and view + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]) + ) + }).await; + + // receive a seconded statement from peer A. it should be propagated onwards to peer B and to + // candidate backing. + let statement = { + let signing_context = SigningContext { + parent_hash: hash_a, + session_index, + }; + + let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory()); + let alice_public = CryptoStore::sr25519_generate_new( + &*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed()) + ).await.unwrap(); + + SignedFullStatement::sign( + &keystore, + Statement::Seconded(candidate.clone()), + &signing_context, + ValidatorIndex(0), + &alice_public.into(), + ).await.ok().flatten().expect("should be signed") + }; + + let metadata = + protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()).get_metadata(); + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()), + ) + ) + }).await; + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendRequests( + mut reqs, IfDisconnected::ImmediateError + ) + ) => { + let reqs = reqs.pop().unwrap(); + let outgoing = match reqs { + Requests::StatementFetching(outgoing) => outgoing, + _ => panic!("Unexpected request"), + }; + let req = outgoing.payload; + assert_eq!(req.relay_parent, metadata.relay_parent); + assert_eq!(req.candidate_hash, metadata.candidate_hash); + let response = StatementFetchingResponse::Statement(candidate.clone()); + outgoing.pending_response.send(Ok(response.encode())).unwrap(); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(p, r) + ) if p == peer_a && r == BENEFIT_VALID_RESPONSE => {} + ); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(p, r) + ) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {} + ); + + assert_matches!( + handle.recv().await, + AllMessages::CandidateBacking( + CandidateBackingMessage::Statement(r, s) + ) if r == hash_a && s == statement => {} + ); + + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + recipients, + protocol_v1::ValidationProtocol::StatementDistribution( + protocol_v1::StatementDistributionMessage::LargeStatement(meta) + ), + ) + ) => { + assert_eq!(recipients, vec![peer_b.clone()]); + assert_eq!(meta.relay_parent, hash_a); + assert_eq!(meta.candidate_hash, statement.payload().candidate_hash()); + assert_eq!(meta.signed_by, statement.validator_index()); + assert_eq!(&meta.signature, statement.signature()); + } + ); + + // Now that it has the candidate it should answer requests accordingly (even after a + // failed request): + + // Failing request first: + let (pending_response, response_rx) = oneshot::channel(); + let inner_req = StatementFetchingRequest { + relay_parent: hash_b, + candidate_hash: metadata.candidate_hash, + }; + let req = sc_network::config::IncomingRequest { + peer: peer_a, + payload: inner_req.encode(), + pending_response, + }; + tx_reqs.send(req).await.unwrap(); + assert_matches!( + response_rx.await.unwrap().result, + Err(()) => {} + ); + + // Now the working one: + let (pending_response, response_rx) = oneshot::channel(); + let inner_req = StatementFetchingRequest { + relay_parent: metadata.relay_parent, + candidate_hash: metadata.candidate_hash, + }; + let req = sc_network::config::IncomingRequest { + peer: peer_a, + payload: inner_req.encode(), + pending_response, + }; + tx_reqs.send(req).await.unwrap(); + + let StatementFetchingResponse::Statement(committed) = + Decode::decode(&mut response_rx.await.unwrap().result.unwrap().as_ref()).unwrap(); + assert_eq!(committed, candidate); + + handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::pin_mut!(test_fut); + futures::pin_mut!(bg); + + executor::block_on(future::join(test_fut, bg)); + } } diff --git a/node/network/statement-distribution/src/requester.rs b/node/network/statement-distribution/src/requester.rs new file mode 100644 index 000000000000..5d786d52acfe --- /dev/null +++ b/node/network/statement-distribution/src/requester.rs @@ -0,0 +1,230 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Large statement requesting background task logic. + +use std::time::Duration; + +use futures::{SinkExt, channel::{mpsc, oneshot}}; + +use polkadot_node_network_protocol::{ + PeerId, UnifiedReputationChange, + request_response::{ + OutgoingRequest, Recipient, Requests, + v1::{ + StatementFetchingRequest, StatementFetchingResponse + } + }}; +use polkadot_node_subsystem_util::TimeoutExt; +use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash}; +use polkadot_subsystem::{Span, Stage}; + +use crate::{LOG_TARGET, Metrics, COST_WRONG_HASH}; + +// In case we failed fetching from our known peers, how long we should wait before attempting a +// retry, even though we have not yet discovered any new peers. Or in other words how long to +// wait before retrying peers that already failed. +const RETRY_TIMEOUT: Duration = Duration::from_millis(500); + +/// Messages coming from a background task. +pub enum RequesterMessage { + /// Get an update of availble peers to try for fetching a given statement. + GetMorePeers { + relay_parent: Hash, + candidate_hash: CandidateHash, + tx: oneshot::Sender> + }, + /// Fetching finished, ask for verification. If verification failes, task will continue asking + /// peers for data. + Finished { + /// Relay parent this candidate is in the context of. + relay_parent: Hash, + /// The candidate we fetched data for. + candidate_hash: CandidateHash, + /// Data was fetched from this peer. + from_peer: PeerId, + /// Response we received from above peer. + response: CommittedCandidateReceipt, + /// Peers which failed providing the data. + bad_peers: Vec, + }, + /// Report a peer which behaved worse than just not providing data: + ReportPeer(PeerId, UnifiedReputationChange), + /// Ask subsystem to send a request for us. + SendRequest(Requests), +} + + +/// A fetching task, taking care of fetching large statements via request/response. +/// +/// A fetch task does not know about a particular `Statement` instead it just tries fetching a +/// `CommittedCandidateReceipt` from peers, whether or not this can be used to re-assemble one ore +/// many `SignedFullStatement`s needs to be verified by the caller. +pub async fn fetch( + relay_parent: Hash, + candidate_hash: CandidateHash, + peers: Vec, + mut sender: mpsc::Sender, + metrics: Metrics, +) { + let span = Span::new(candidate_hash, "fetch-large-statement") + .with_relay_parent(relay_parent) + .with_stage(Stage::StatementDistribution); + + // Peers we already tried (and failed). + let mut tried_peers = Vec::new(); + // Peers left for trying out. + let mut new_peers = peers; + + let req = StatementFetchingRequest { + relay_parent, + candidate_hash, + }; + + // We retry endlessly (with sleep periods), and rely on the subsystem to kill us eventually. + loop { + + let span = span.child("try-available-peers"); + + while let Some(peer) = new_peers.pop() { + + let _span = span.child("try-peer") + .with_peer_id(&peer); + + let (outgoing, pending_response) = OutgoingRequest::new( + Recipient::Peer(peer), + req.clone(), + ); + if let Err(err) = sender.feed( + RequesterMessage::SendRequest(Requests::StatementFetching(outgoing)) + ).await { + tracing::info!( + target: LOG_TARGET, + ?err, + "Sending request failed, node might be shutting down - exiting." + ); + return + } + + metrics.on_sent_request(); + + match pending_response.await { + Ok(StatementFetchingResponse::Statement(statement)) => { + + if statement.hash() != candidate_hash { + metrics.on_received_response(false); + + if let Err(err) = sender.feed( + RequesterMessage::ReportPeer(peer, COST_WRONG_HASH) + ).await { + tracing::warn!( + target: LOG_TARGET, + ?err, + "Sending reputation change failed: This should not happen." + ); + } + // We want to get rid of this peer: + continue + } + + if let Err(err) = sender.feed( + RequesterMessage::Finished { + relay_parent, + candidate_hash, + from_peer: peer, + response: statement, + bad_peers: tried_peers, + } + ).await { + tracing::warn!( + target: LOG_TARGET, + ?err, + "Sending task response failed: This should not happen." + ); + } + + metrics.on_received_response(true); + + // We are done now. + return + }, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + ?err, + "Receiving response failed with error - trying next peer." + ); + + metrics.on_received_response(false); + } + } + + tried_peers.push(peer); + } + + new_peers = std::mem::take(&mut tried_peers); + + // All our peers failed us - try getting new ones before trying again: + match try_get_new_peers(relay_parent, candidate_hash, &mut sender, &span).await { + Ok(Some(mut peers)) => { + // New arrivals will be tried first: + new_peers.append(&mut peers); + } + // No new peers, try the old ones again (if we have any): + Ok(None) => { + // Note: In case we don't have any more peers, we will just keep asking for new + // peers, which is exactly what we want. + }, + Err(()) => return, + } + } +} + +/// Try getting new peers from subsystem. +/// +/// If there are non, we will return after a timeout with `None`. +async fn try_get_new_peers( + relay_parent: Hash, + candidate_hash: CandidateHash, + sender: &mut mpsc::Sender, + span: &Span, +) -> Result>, ()> { + + let _span = span.child("wait-for-peers"); + + let (tx, rx) = oneshot::channel(); + + if let Err(err) = sender.send( + RequesterMessage::GetMorePeers { relay_parent, candidate_hash, tx } + ).await { + tracing::debug!( + target: LOG_TARGET, + ?err, + "Failed sending background task message, subsystem probably moved on." + ); + return Err(()) + } + + match rx.timeout(RETRY_TIMEOUT).await.transpose() { + Err(_) => { + tracing::debug!( + target: LOG_TARGET, + "Failed fetching more peers." + ); + Err(()) + } + Ok(val) => Ok(val) + } +} + diff --git a/node/network/statement-distribution/src/responder.rs b/node/network/statement-distribution/src/responder.rs new file mode 100644 index 000000000000..a5a0a15460ad --- /dev/null +++ b/node/network/statement-distribution/src/responder.rs @@ -0,0 +1,169 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Large statement responding background task logic. + +use futures::{SinkExt, StreamExt, channel::{mpsc, oneshot}, stream::FuturesUnordered}; + +use parity_scale_codec::Decode; + +use polkadot_node_network_protocol::{ + UnifiedReputationChange as Rep, + request_response::{ + IncomingRequest, MAX_PARALLEL_STATEMENT_REQUESTS, request::OutgoingResponse, + v1::{ + StatementFetchingRequest, StatementFetchingResponse + } + } +}; +use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash}; + +use crate::LOG_TARGET; + +const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request"); + +/// Messages coming from a background task. +pub enum ResponderMessage { + /// Get an update of availble peers to try for fetching a given statement. + GetData { + relay_parent: Hash, + candidate_hash: CandidateHash, + tx: oneshot::Sender + }, +} + + +/// A fetching task, taking care of fetching large statements via request/response. +/// +/// A fetch task does not know about a particular `Statement` instead it just tries fetching a +/// `CommittedCandidateReceipt` from peers, whether or not this can be used to re-assemble one ore +/// many `SignedFullStatement`s needs to be verified by the caller. +pub async fn respond( + mut receiver: mpsc::Receiver, + mut sender: mpsc::Sender, +) { + let mut pending_out = FuturesUnordered::new(); + loop { + // Ensure we are not handling too many requests in parallel. + // We do this for three reasons: + // + // 1. We want some requesters to have full data fast, rather then lots of them having them + // late, as each requester having the data will help distributing it. + // 2. If we take too long, the requests timing out will not yet have had any data sent, + // thus we wasted no bandwidth. + // 3. If the queue is full, requestes will get an immediate error instead of running in a + // timeout, thus requesters can immediately try another peer and be faster. + // + // From this perspective we would not want parallel response sending at all, but we don't + // want a single slow requester slowing everyone down, so we want some parallelism for that + // reason. + if pending_out.len() >= MAX_PARALLEL_STATEMENT_REQUESTS as usize { + // Wait for one to finish: + pending_out.next().await; + } + + let raw = match receiver.next().await { + None => { + tracing::debug!( + target: LOG_TARGET, + "Shutting down request responder" + ); + return + } + Some(v) => v, + }; + + let sc_network::config::IncomingRequest { + payload, + peer, + pending_response, + } = raw; + + let payload = match StatementFetchingRequest::decode(&mut payload.as_ref()) { + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + ?err, + "Decoding request failed" + ); + report_peer(pending_response, COST_INVALID_REQUEST); + continue + } + Ok(payload) => payload, + }; + + let req = IncomingRequest::new( + peer, + payload, + pending_response + ); + + let (tx, rx) = oneshot::channel(); + if let Err(err) = sender.feed( + ResponderMessage::GetData { + relay_parent: req.payload.relay_parent, + candidate_hash: req.payload.candidate_hash, + tx, + } + ).await { + tracing::debug!( + target: LOG_TARGET, + ?err, + "Shutting down responder" + ); + return + } + let response = match rx.await { + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + ?err, + "Requested data not found." + ); + Err(()) + } + Ok(v) => Ok(StatementFetchingResponse::Statement(v)), + }; + let (pending_sent_tx, pending_sent_rx) = oneshot::channel(); + let response = OutgoingResponse { + result: response, + reputation_changes: Vec::new(), + sent_feedback: Some(pending_sent_tx), + }; + pending_out.push(pending_sent_rx); + if let Err(_) = req.send_outgoing_response(response) { + tracing::debug!( + target: LOG_TARGET, + "Sending response failed" + ); + } + } +} + +/// Report peer who sent us a request. +fn report_peer( + tx: oneshot::Sender, + rep: Rep, +) { + if let Err(_) = tx.send(sc_network::config::OutgoingResponse { + result: Err(()), + reputation_changes: vec![rep.into_base_rep()], + sent_feedback: None, + }) { + tracing::debug!( + target: LOG_TARGET, + "Reporting peer failed." + ); + } +} diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 57af4fb6768e..f16576f6531c 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -1333,7 +1333,7 @@ impl OverseerSubsystemContext { } } - /// Create a new `OverseserSubsystemContext` with no metering. + /// Create a new `OverseerSubsystemContext` with no metering. /// /// Intended for tests. #[allow(unused)] diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 7ed9db54522c..8c45e843556d 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -42,7 +42,7 @@ pub mod approval; pub const VALIDATION_CODE_BOMB_LIMIT: usize = 16 * 1024 * 1024; /// Maximum PoV size we support right now. -pub const MAX_POV_SIZE: u32 = 50 * 1024 * 1024; +pub const MAX_POV_SIZE: u32 = 20 * 1024 * 1024; /// The bomb limit for decompressing PoV blobs. pub const POV_BOMB_LIMIT: usize = MAX_POV_SIZE as usize; diff --git a/node/service/src/chain_spec.rs b/node/service/src/chain_spec.rs index 71b146716e1e..e3c77cd0d590 100644 --- a/node/service/src/chain_spec.rs +++ b/node/service/src/chain_spec.rs @@ -16,6 +16,7 @@ //! Polkadot chain configurations. +use rococo::constants::size::MAX_CODE_SIZE; use sp_authority_discovery::AuthorityId as AuthorityDiscoveryId; use babe_primitives::AuthorityId as BabeId; use beefy_primitives::ecdsa::AuthorityId as BeefyId; @@ -894,7 +895,7 @@ fn rococo_staging_testnet_config_genesis(wasm_binary: &[u8]) -> rococo_runtime:: validation_upgrade_frequency: 1u32, validation_upgrade_delay: 1, code_retention_period: 1200, - max_code_size: 5 * 1024 * 1024, + max_code_size: MAX_CODE_SIZE, max_pov_size: MAX_POV_SIZE, max_head_data_size: 32 * 1024, group_rotation_frequency: 20, diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 4910d32e9f3c..1c8fa9ab91c7 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -28,8 +28,11 @@ use thiserror::Error; pub use sc_network::IfDisconnected; use polkadot_node_network_protocol::{ - peer_set::PeerSet, v1 as protocol_v1, UnifiedReputationChange, PeerId, - request_response::{Requests, request::IncomingRequest, v1 as req_res_v1}, + PeerId, UnifiedReputationChange, peer_set::PeerSet, + request_response::{ + Requests, request::IncomingRequest, v1 as req_res_v1 + }, + v1 as protocol_v1, }; use polkadot_node_primitives::{ CollationGenerationConfig, SignedFullStatement, ValidationResult, @@ -526,16 +529,8 @@ pub enum StatementDistributionMessage { /// Event from the network bridge. #[from] NetworkBridgeUpdateV1(NetworkBridgeEvent), -} - -impl StatementDistributionMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - match self { - Self::Share(hash, _) => Some(*hash), - Self::NetworkBridgeUpdateV1(_) => None, - } - } + /// Get receiver for receiving incoming network requests for statement fetching. + StatementFetchingReceiver(mpsc::Receiver), } /// This data becomes intrinsics or extrinsics which should be included in a future relay chain block. @@ -747,7 +742,6 @@ impl From> for CollatorPro } } - impl From> for AllMessages { fn from(req: IncomingRequest) -> Self { From::::from(From::from(req)) diff --git a/primitives/src/v1.rs b/primitives/src/v1.rs index a9ff43fa64eb..4fdab1f3e1ad 100644 --- a/primitives/src/v1.rs +++ b/primitives/src/v1.rs @@ -175,6 +175,11 @@ pub const PARACHAINS_INHERENT_IDENTIFIER: InherentIdentifier = *b"parachn0"; /// The key type ID for parachain assignment key. pub const ASSIGNMENT_KEY_TYPE_ID: KeyTypeId = KeyTypeId(*b"asgn"); +/// Maximum compressed code size we support right now. +/// At the moment we have runtime upgrade on chain, which restricts scalability severly. If we want +/// to have bigger values, we should fix that first. +pub const MAX_CODE_SIZE: u32 = 3 * 1024 * 1024; + // The public key of a keypair used by a validator for determining assignments /// to approve included parachain candidates. mod assignment_app { diff --git a/runtime/rococo/src/constants.rs b/runtime/rococo/src/constants.rs index 0bbde45ef00c..0df97b457e6d 100644 --- a/runtime/rococo/src/constants.rs +++ b/runtime/rococo/src/constants.rs @@ -46,6 +46,11 @@ pub mod time { pub const PRIMARY_PROBABILITY: (u64, u64) = (1, 4); } +/// Size restrictions. +pub mod size { + pub use primitives::v1::MAX_CODE_SIZE; +} + /// Fee-related. pub mod fee { pub use sp_runtime::Perbill; diff --git a/runtime/rococo/src/lib.rs b/runtime/rococo/src/lib.rs index 08e0fd2692bf..87f1cd57434a 100644 --- a/runtime/rococo/src/lib.rs +++ b/runtime/rococo/src/lib.rs @@ -88,8 +88,13 @@ use polkadot_parachain::primitives::Id as ParaId; use xcm::v0::{MultiLocation, NetworkId, BodyId}; use xcm_executor::XcmExecutor; -use xcm_builder::{AccountId32Aliases, ChildParachainConvertsVia, SovereignSignedViaLocation, CurrencyAdapter as XcmCurrencyAdapter, ChildParachainAsNative, SignedAccountId32AsNative, ChildSystemParachainAsSuperuser, LocationInverter, IsConcrete, FixedWeightBounds, FixedRateOfConcreteFungible, BackingToPlurality, SignedToAccountId32}; -use constants::{time::*, currency::*, fee::*}; +use xcm_builder::{ + AccountId32Aliases, ChildParachainConvertsVia, SovereignSignedViaLocation, + CurrencyAdapter as XcmCurrencyAdapter, ChildParachainAsNative, SignedAccountId32AsNative, + ChildSystemParachainAsSuperuser, LocationInverter, IsConcrete, FixedWeightBounds, + FixedRateOfConcreteFungible, BackingToPlurality, SignedToAccountId32 +}; +use constants::{time::*, currency::*, fee::*, size::*}; use frame_support::traits::InstanceFilter; /// Constant values used within the runtime. @@ -700,7 +705,7 @@ impl paras_sudo_wrapper::Config for Runtime {} parameter_types! { pub const ParaDeposit: Balance = 5 * DOLLARS; pub const DataDepositPerByte: Balance = deposit(0, 1); - pub const MaxCodeSize: u32 = 10 * 1024 * 1024; // 10 MB + pub const MaxCodeSize: u32 = MAX_CODE_SIZE; pub const MaxHeadSize: u32 = 20 * 1024; // 20 KB }