Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
8c83835
Dispute protocol.
eskimor Jun 14, 2021
125dd32
Dispute distribution protocol.
eskimor Jun 15, 2021
2c395e5
Get network requests routed.
eskimor Jun 15, 2021
7c53c69
WIP: Basic dispute sender logic.
eskimor Jun 15, 2021
0bd8713
Basic validator determination logic.
eskimor Jun 15, 2021
608f84c
WIP: Getting things to typecheck.
eskimor Jun 15, 2021
a78fbc9
Slightly larger timeout.
eskimor Jun 15, 2021
53f0c15
More typechecking stuff.
eskimor Jun 16, 2021
6abf1f3
Cleanup.
eskimor Jun 16, 2021
b541062
Finished most of the sending logic.
eskimor Jun 17, 2021
5883c08
Handle active leaves updates
eskimor Jun 17, 2021
4c4e3a7
Pass sessions in already.
eskimor Jun 17, 2021
bd8bb95
Startup dispute sending.
eskimor Jun 17, 2021
4ebe127
Provide incoming decoding facilities
eskimor Jun 18, 2021
95351bd
Relaxed runtime util requirements.
eskimor Jun 18, 2021
863b1d9
Better usability of incoming requests.
eskimor Jun 18, 2021
35d2cad
Add basic receiver functionality.
eskimor Jun 18, 2021
da8abac
Cleanup + fixes for sender.
eskimor Jun 18, 2021
c997aed
One more sender fix.
eskimor Jun 18, 2021
06770db
Start receiver.
eskimor Jun 18, 2021
2378c7e
Make sure to send responses back.
eskimor Jun 19, 2021
3910cf4
WIP: Exposed authority discovery
eskimor Jun 21, 2021
840a046
Merge branch 'master' into rk-dispute-distribution-impl
eskimor Jun 21, 2021
5b10c78
Make tests pass.
eskimor Jun 21, 2021
d2aa4ff
Fully featured receiver.
eskimor Jun 21, 2021
da4955d
Decrease cost of `NotAValidator`.
eskimor Jun 22, 2021
1fc9740
Make `RuntimeInfo` LRU cache size configurable.
eskimor Jun 22, 2021
9397e35
Cache more sessions.
eskimor Jun 22, 2021
ccbab3f
Fix collator protocol.
eskimor Jun 22, 2021
5db60d3
Disable metrics for now.
eskimor Jun 22, 2021
da20774
Make dispute-distribution a proper subsystem.
eskimor Jun 22, 2021
f9da3ae
Fix naming.
eskimor Jun 22, 2021
2231dc3
Code style fixes.
eskimor Jun 23, 2021
41c2801
Factored out 4x copied mock function.
eskimor Jun 23, 2021
3e91427
WIP: Tests.
eskimor Jun 23, 2021
8a3da18
Whitespace cleanup.
eskimor Jun 24, 2021
0b188cd
Accessor functions.
eskimor Jun 24, 2021
2637c1b
More testing.
eskimor Jun 24, 2021
3d09d48
More Debug instances.
eskimor Jun 25, 2021
def8772
Fix busy loop.
eskimor Jun 25, 2021
c7cdca9
Working tests.
eskimor Jun 29, 2021
b9f20c2
More tests.
eskimor Jun 30, 2021
6f7da40
Merge branch 'master' into rk-dispute-distribution-impl
eskimor Jun 30, 2021
b3c7427
Cleanup.
eskimor Jun 30, 2021
c01d8d1
Fix build.
eskimor Jun 30, 2021
4f616d6
Basic receiving test.
eskimor Jun 30, 2021
a3ff6ae
Non validator message gets dropped.
eskimor Jun 30, 2021
217bd7e
More receiving tests.
eskimor Jun 30, 2021
26b8b00
Test nested and subsequent imports.
eskimor Jun 30, 2021
5dd7c84
Fix spaces.
eskimor Jun 30, 2021
488fb47
Better formatted imports.
eskimor Jun 30, 2021
92ff4be
Import cleanup.
eskimor Jul 1, 2021
7d5f416
Metrics.
eskimor Jul 2, 2021
04f91e9
Message -> MuxedMessage
eskimor Jul 2, 2021
d43fb5f
Message -> MuxedMessage
eskimor Jul 2, 2021
dc6d0f1
Merge branch 'master' into rk-dispute-distribution-impl
eskimor Jul 2, 2021
fc6a612
More review remarks.
eskimor Jul 2, 2021
2907a53
Add missing metrics.rs.
eskimor Jul 2, 2021
1f7ce88
Fix flaky test.
eskimor Jul 2, 2021
cd66550
Dispute coordinator - deliver confirmations.
eskimor Jul 6, 2021
fd920a1
Send out `DisputeMessage` on issue local statement.
eskimor Jul 6, 2021
90b46f4
Merge branch 'master' into rk-dispute-distribution-impl
eskimor Jul 8, 2021
210b853
Unwire dispute distribution.
eskimor Jul 8, 2021
58b1bee
Review remarks.
eskimor Jul 8, 2021
59c76c8
Review remarks.
eskimor Jul 8, 2021
3cd91cb
Better docs.
eskimor Jul 8, 2021
dedbaa8
Merge branch 'master' into rk-dispute-distribution-impl
eskimor Jul 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ members = [
"node/network/availability-recovery",
"node/network/collator-protocol",
"node/network/gossip-support",
"node/network/dispute-distribution",
"node/overseer",
"node/malus",
"node/primitives",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
use futures::{FutureExt, channel::oneshot, future::BoxFuture};

use polkadot_subsystem::jaeger;
use polkadot_node_network_protocol::{
request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse}}
};
use polkadot_node_network_protocol::request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse}};
use polkadot_primitives::v1::{
CandidateHash, Hash, ValidatorIndex,
};
Expand All @@ -49,7 +47,7 @@ pub async fn fetch_pov<Context>(
where
Context: SubsystemContext,
{
let info = &runtime.get_session_info(ctx, parent).await?.session_info;
let info = &runtime.get_session_info(ctx.sender(), parent).await?.session_info;
let authority_id = info.discovery_keys.get(from_validator.0 as usize)
.ok_or(NonFatal::InvalidValidatorIndex)?
.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl SessionCache {
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R,
{
let session_index = runtime.get_session_index(ctx, parent).await?;
let session_index = runtime.get_session_index(ctx.sender(), parent).await?;

if let Some(o_info) = self.session_info_cache.get(&session_index) {
tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
Expand Down Expand Up @@ -183,7 +183,7 @@ impl SessionCache {
where
Context: SubsystemContext,
{
let info = runtime.get_session_info_by_index(ctx, parent, session_index).await?;
let info = runtime.get_session_info_by_index(ctx.sender(), parent, session_index).await?;

let discovery_keys = info.session_info.discovery_keys.clone();
let mut validator_groups = info.session_info.validator_groups.clone();
Expand Down
1 change: 0 additions & 1 deletion node/network/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ futures = "0.3.15"
tracing = "0.1.26"
polkadot-primitives = { path = "../../../primitives" }
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
Expand Down
35 changes: 29 additions & 6 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ use parity_scale_codec::{Encode, Decode};
use parking_lot::Mutex;
use futures::prelude::*;
use futures::stream::BoxStream;
use polkadot_subsystem::ActiveLeavesUpdate;
use polkadot_subsystem::FromOverseer;
use polkadot_subsystem::OverseerSignal;
use polkadot_subsystem::messages::DisputeDistributionMessage;
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;

use polkadot_subsystem::{
ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem,
ActivatedLeaf, SpawnedSubsystem,
Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SubsystemSender,
messages::StatementDistributionMessage
};
Expand All @@ -48,7 +52,8 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};

use std::collections::{HashMap, hash_map, HashSet};
use std::collections::HashSet;
use std::collections::{HashMap, hash_map};
use std::sync::Arc;

mod validator_discovery;
Expand All @@ -57,12 +62,14 @@ mod validator_discovery;
///
/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
mod network;
use network::{Network, send_message, get_peer_id_by_authority_id};
use network::{Network, send_message};

/// Request multiplexer for combining the multiple request sources into a single `Stream` of `AllMessages`.
mod multiplexer;
pub use multiplexer::RequestMultiplexer;

use crate::network::get_peer_id_by_authority_id;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -295,7 +302,7 @@ impl<N, AD> NetworkBridge<N, AD> {
impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
where
Net: Network + Sync,
AD: validator_discovery::AuthorityDiscovery,
AD: validator_discovery::AuthorityDiscovery + Clone,
Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
fn start(mut self, ctx: Context) -> SpawnedSubsystem {
Expand Down Expand Up @@ -364,7 +371,7 @@ async fn handle_subsystem_messages<Context, N, AD>(
where
Context: SubsystemContext<Message = NetworkBridgeMessage>,
N: Network,
AD: validator_discovery::AuthorityDiscovery,
AD: validator_discovery::AuthorityDiscovery + Clone,
{
// This is kept sorted, descending, by block number.
let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS);
Expand Down Expand Up @@ -556,6 +563,15 @@ where
network_service = ns;
authority_discovery_service = ads;
}
NetworkBridgeMessage::GetAuthorityDiscoveryService(tx) => {
if let Err(err) = tx.send(Box::new(authority_discovery_service.clone())) {
tracing::debug!(
target: LOG_TARGET,
?err,
"Answering `GetAuthorityDiscoveryService` request failed."
);
}
}
NetworkBridgeMessage::NewGossipTopology {
our_neighbors,
} => {
Expand Down Expand Up @@ -861,7 +877,7 @@ async fn run_network<N, AD>(
) -> SubsystemResult<()>
where
N: Network,
AD: validator_discovery::AuthorityDiscovery,
AD: validator_discovery::AuthorityDiscovery + Clone,
{
let shared = Shared::default();

Expand All @@ -877,6 +893,10 @@ where
.get_statement_fetching()
.expect("Gets initialized, must be `Some` on startup. qed.");

let dispute_receiver = request_multiplexer
.get_dispute_sending()
.expect("Gets initialized, must be `Some` on startup. qed.");

let (remote, network_event_handler) = handle_network_messages(
ctx.sender().clone(),
network_service.clone(),
Expand All @@ -889,6 +909,9 @@ where

ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?;

ctx.send_message(AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(dispute_receiver)
)).await;
ctx.send_message(AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(statement_receiver)
)).await;
Expand Down
25 changes: 24 additions & 1 deletion node/network/bridge/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use polkadot_subsystem::messages::AllMessages;
pub struct RequestMultiplexer {
receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>,
statement_fetching: Option<mpsc::Receiver<network::IncomingRequest>>,
dispute_sending: Option<mpsc::Receiver<network::IncomingRequest>>,
next_poll: usize,
}

Expand All @@ -68,19 +69,31 @@ impl RequestMultiplexer {
})
.unzip();

// Ok this code is ugly as hell, it is also a hack, see #2842. But it works and is executed
// on startup so, if anything is wrong here it will be noticed immediately.
let index = receivers.iter().enumerate().find_map(|(i, (p, _))|
if let Protocol::StatementFetching = p {
Some(i)
} else {
None
}
).expect("Statement fetching must be registered. qed.");
).expect("Statement fetching must be registered.");
let statement_fetching = Some(receivers.remove(index).1);

let index = receivers.iter().enumerate().find_map(|(i, (p, _))|
if let Protocol::DisputeSending = p {
Some(i)
} else {
None
}
).expect("Dispute sending must be registered.");
let dispute_sending = Some(receivers.remove(index).1);

(
Self {
receivers,
statement_fetching,
dispute_sending,
next_poll: 0,
},
cfgs,
Expand All @@ -93,6 +106,13 @@ impl RequestMultiplexer {
pub fn get_statement_fetching(&mut self) -> Option<mpsc::Receiver<network::IncomingRequest>> {
std::mem::take(&mut self.statement_fetching)
}

/// Get the receiver for handling dispute sending requests.
///
/// This function will only return `Some` once.
pub fn get_dispute_sending(&mut self) -> Option<mpsc::Receiver<network::IncomingRequest>> {
std::mem::take(&mut self.dispute_sending)
}
}

impl Stream for RequestMultiplexer {
Expand Down Expand Up @@ -174,6 +194,9 @@ fn multiplex_single(
Protocol::StatementFetching => {
panic!("Statement fetching requests are handled directly. qed.");
}
Protocol::DisputeSending => {
panic!("Dispute sending request are handled directly. qed.");
}
};
Ok(r)
}
Expand Down
47 changes: 45 additions & 2 deletions node/network/bridge/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct TestNetwork {
_req_configs: Vec<RequestResponseConfig>,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
struct TestAuthorityDiscovery;

// The test's view of the network. This receives updates from the subsystem in the form
Expand Down Expand Up @@ -688,6 +688,12 @@ fn peer_view_updates_sent_via_overseer() {

let view = view![Hash::repeat_byte(1)];

assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
Expand Down Expand Up @@ -740,6 +746,12 @@ fn peer_messages_sent_via_overseer() {
ObservedRole::Full,
).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
Expand Down Expand Up @@ -812,6 +824,12 @@ fn peer_disconnect_from_just_one_peerset() {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
Expand Down Expand Up @@ -894,6 +912,12 @@ fn relays_collation_protocol_messages() {
let peer_a = PeerId::random();
let peer_b = PeerId::random();

assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
Expand Down Expand Up @@ -992,6 +1016,12 @@ fn different_views_on_different_peer_sets() {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
Expand Down Expand Up @@ -1153,6 +1183,12 @@ fn send_messages_to_peers() {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
Expand Down Expand Up @@ -1278,7 +1314,8 @@ fn spread_event_to_subsystems_is_up_to_date() {
AllMessages::ApprovalDistribution(_) => { cnt += 1; }
AllMessages::GossipSupport(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeCoordinator(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeParticipation(_) => unreachable!("Not interetsed in network events"),
AllMessages::DisputeParticipation(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeDistribution(_) => unreachable!("Not interested in network events"),
// Add variants here as needed, `{ cnt += 1; }` for those that need to be
// notified, `unreachable!()` for those that should not.
}
Expand Down Expand Up @@ -1323,6 +1360,12 @@ fn our_view_updates_decreasing_order_and_limited_to_max() {
0,
);

assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
Expand Down
30 changes: 5 additions & 25 deletions node/network/bridge/src/validator_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,16 @@ use crate::Network;
use core::marker::PhantomData;
use std::collections::HashSet;

use async_trait::async_trait;
use futures::channel::oneshot;

use sc_network::multiaddr::Multiaddr;
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use polkadot_node_network_protocol::PeerId;

use polkadot_primitives::v1::AuthorityDiscoveryId;
use polkadot_node_network_protocol::peer_set::{PeerSet, PerPeerSet};
pub use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;

const LOG_TARGET: &str = "parachain::validator-discovery";

/// An abstraction over the authority discovery service.
#[async_trait]
pub trait AuthorityDiscovery: Send + Clone + 'static {
/// Get the addresses for the given [`AuthorityId`] from the local address cache.
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>>;
/// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache.
async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId>;
}

#[async_trait]
impl AuthorityDiscovery for AuthorityDiscoveryService {
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
AuthorityDiscoveryService::get_addresses_by_authority_id(self, authority).await
}

async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId> {
AuthorityDiscoveryService::get_authority_id_by_peer_id(self, peer_id).await
}
}

pub(super) struct Service<N, AD> {
state: PerPeerSet<StatePerPeerSet>,
// PhantomData used to make the struct generic instead of having generic methods
Expand Down Expand Up @@ -147,9 +126,10 @@ mod tests {

use std::{borrow::Cow, collections::HashMap};
use futures::stream::BoxStream;
use async_trait::async_trait;
use sc_network::{Event as NetworkEvent, IfDisconnected};
use sp_keyring::Sr25519Keyring;
use polkadot_node_network_protocol::request_response::request::Requests;
use polkadot_node_network_protocol::{PeerId, request_response::request::Requests};

fn new_service() -> Service<TestNetwork, TestAuthorityDiscovery> {
Service::new()
Expand All @@ -164,7 +144,7 @@ mod tests {
peers_set: HashSet<Multiaddr>,
}

#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
struct TestAuthorityDiscovery {
by_authority_id: HashMap<AuthorityDiscoveryId, Multiaddr>,
by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
Expand Down
Loading