Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
3e9abcf
stupid, but it compiles
ordian Sep 5, 2020
e1fe858
redo
ordian Sep 8, 2020
d09622b
cleanup
ordian Sep 8, 2020
9e8cfe2
add ValidatorDiscovery to msgs
ordian Sep 8, 2020
0bae9e4
sketch network bridge code
ordian Sep 8, 2020
4fa0236
ConnectToAuthorities instead of validators
ordian Sep 9, 2020
9f81f90
more stuff
ordian Sep 10, 2020
d8d1302
cleanup
ordian Sep 10, 2020
6c18572
more stuff
ordian Sep 10, 2020
251427d
complete ConnectToAuthoritiesState
ordian Sep 10, 2020
31d7329
Update node/network/bridge/src/lib.rs
ordian Sep 10, 2020
f715c33
Collator protocol subsystem (#1659)
montekki Sep 10, 2020
b80e050
handle multiple in-flight connection requests
ordian Sep 10, 2020
0e0525d
handle cancelled requests
ordian Sep 10, 2020
64dcdb1
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 10, 2020
2cf1610
Update node/core/runtime-api/src/lib.rs
ordian Sep 11, 2020
1bee32c
redo it again
ordian Sep 11, 2020
c0d3a5a
more stuff
ordian Sep 12, 2020
de19f1d
redo it again
ordian Sep 14, 2020
eb1afd7
Merge branch 'ao-validator-discovery-api' of github.com:paritytech/po…
ordian Sep 14, 2020
dbbfe23
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 14, 2020
e6a0a85
update comments
ordian Sep 14, 2020
36cf3f4
workaround Future is not Send
ordian Sep 14, 2020
9f20552
fix trailing spaces
ordian Sep 14, 2020
ef4c6da
clarify comments
ordian Sep 14, 2020
6305c41
bridge: fix compilation in tests
ordian Sep 14, 2020
14fe353
update more comments
ordian Sep 14, 2020
7ea8588
small fixes
ordian Sep 14, 2020
f6a4068
port collator protocol to new validator discovery api
ordian Sep 14, 2020
ac02180
collator tests compile
ordian Sep 15, 2020
3dea047
collator tests pass
ordian Sep 15, 2020
62e46a1
do not revoke a request when the stream receiver is closed
ordian Sep 15, 2020
bbaf435
make revoking opt-in
ordian Sep 15, 2020
8cfab6f
fix is_fulfilled
ordian Sep 15, 2020
68fc8bb
handle request revokation in collator
ordian Sep 16, 2020
a23edc0
tests
ordian Sep 16, 2020
0a9c064
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 16, 2020
98d8346
wait for validator connections asyncronously
ordian Sep 17, 2020
c03f766
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 17, 2020
e077b51
fix compilation
ordian Sep 17, 2020
2132114
relabel my todos
ordian Sep 17, 2020
eb3bacb
apply Fedor's patch
ordian Sep 17, 2020
1112368
resolve reconnection TODO
ordian Sep 18, 2020
6fbca68
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 18, 2020
3d2def9
resolve revoking TODO
ordian Sep 18, 2020
dee8b27
resolve channel capacity TODO
ordian Sep 18, 2020
c59d5a7
resolve peer cloning TODO
ordian Sep 18, 2020
0580441
resolve peer disconnected TODO
ordian Sep 18, 2020
db37a2f
resolve PeerSet TODO
ordian Sep 18, 2020
a237119
wip tests
ordian Sep 18, 2020
a8e3105
more tests
ordian Sep 22, 2020
95f989e
resolve Arc TODO
ordian Sep 22, 2020
86a64fa
rename pending to non_revoked
ordian Sep 22, 2020
7585d6e
one more test
ordian Sep 22, 2020
9534c2b
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 22, 2020
f83534c
extract utility function into util crate
ordian Sep 22, 2020
cb97211
fix compilation in tests
ordian Sep 22, 2020
c1da4c6
Apply suggestions from code review
ordian Sep 23, 2020
c6aa649
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 23, 2020
ae7f529
revert pin_project removal
ordian Sep 23, 2020
fbf8901
fix while let loop
ordian Sep 23, 2020
78216fc
Revert "revert pin_project removal"
ordian Sep 23, 2020
4cb79ad
fix compilation
ordian Sep 23, 2020
0295436
Update node/subsystem/src/messages.rs
ordian Sep 23, 2020
66a6eed
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 28, 2020
7e67489
docs on pub items
ordian Sep 28, 2020
30da479
guide updates
ordian Sep 28, 2020
9c5b654
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 28, 2020
0e9771c
remove a TODO
ordian Sep 28, 2020
4cc5f3e
small guide update
ordian Sep 28, 2020
23bde8d
fix a typo
ordian Sep 28, 2020
88e83ed
link to the issue
ordian Sep 28, 2020
3893324
Merge branch 'master' into ao-validator-discovery-api
ordian Oct 6, 2020
a7b5ddc
validator discovery: on_request docs
ordian Oct 6, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ fn make_runtime_api_request<Client>(
Request::CandidatePendingAvailability(para, sender) =>
query!(candidate_pending_availability(para), sender),
Request::CandidateEvents(sender) => query!(candidate_events(), sender),
Request::ValidatorDiscovery(ids, sender) => query!(validator_discovery(ids), sender),
}
}

Expand Down Expand Up @@ -169,7 +170,7 @@ mod tests {
use polkadot_primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode,
CommittedCandidateReceipt, CandidateEvent,
CommittedCandidateReceipt, CandidateEvent, AuthorityDiscoveryId,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_core::testing::TaskExecutor;
Expand Down Expand Up @@ -258,6 +259,10 @@ mod tests {
fn candidate_events(&self) -> Vec<CandidateEvent> {
self.candidate_events.clone()
}

fn validator_discovery(ids: Vec<ValidatorId>) -> Vec<Option<AuthorityDiscoveryId>> {
vec![None; idx.len()]
}
}
}

Expand Down
1 change: 1 addition & 0 deletions node/network/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../../primitives" }
parity-scale-codec = "1.3.4"
sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
Expand Down
194 changes: 170 additions & 24 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::channel::oneshot;

use sc_network::Event as NetworkEvent;
use sc_network::{Event as NetworkEvent, Multiaddr};
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sp_runtime::ConsensusEngineId;

use polkadot_subsystem::{
Expand All @@ -34,12 +35,12 @@ use polkadot_subsystem::messages::{
BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
CollatorProtocolMessage,
};
use polkadot_primitives::v1::{Block, Hash, ValidatorId};
use polkadot_primitives::v1::{Block, Hash, AuthorityDiscoveryId};
use polkadot_node_network_protocol::{
ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1
};

use std::collections::hash_map::{HashMap, Entry as HEntry};
use std::collections::{HashSet, HashMap, hash_map};
use std::iter::ExactSizeIterator;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -127,6 +128,10 @@ pub trait Network: Send + 'static {
self.action_sink().send(NetworkAction::WriteNotification(who, peer_set, message)).await
}.boxed()
}

/// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group.
fn set_priority_group(&self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
// TODO: we might want to add `add_to_priority_group` and `remove_from_priority_group`
}

impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
Expand Down Expand Up @@ -185,18 +190,29 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {

Box::pin(ActionSink(&**self))
}

fn set_priority_group(&self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::set_priority_group(&**self, group_id, multiaddresses)
}
}

/// The network bridge subsystem.
pub struct NetworkBridge<N>(N);
// TODO: does it need to be generic over AuthorityDiscoveryService?
pub struct NetworkBridge<N> {
network_service: N,
authority_discovery_service: AuthorityDiscoveryService,
}

impl<N> NetworkBridge<N> {
/// Create a new network bridge subsystem with underlying network service.
/// Create a new network bridge subsystem with underlying network service and authority discovery service.
///
/// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info).
pub fn new(net_service: N) -> Self {
NetworkBridge(net_service)
pub fn new(network_service: N, authority_discovery_service: AuthorityDiscoveryService) -> Self {
NetworkBridge {
network_service,
authority_discovery_service,
}
}
}

Expand All @@ -212,7 +228,7 @@ impl<Net, Context> Subsystem<Context> for NetworkBridge<Net>
// within `run_network`.
SpawnedSubsystem {
name: "network-bridge-subsystem",
future: run_network(self.0, ctx).map(|_| ()).boxed(),
future: run_network(self, ctx).map(|_| ()).boxed(),
}
}
}
Expand All @@ -226,7 +242,7 @@ struct PeerData {
enum Action {
SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol),
SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol),
ConnectToValidators(PeerSet, Vec<ValidatorId>, oneshot::Sender<Vec<(ValidatorId, PeerId)>>),
ConnectToAuthorities(PeerSet, Vec<AuthorityDiscoveryId>, oneshot::Sender<Vec<(AuthorityDiscoveryId, PeerId)>>),
ReportPeer(PeerId, ReputationChange),

ActiveLeaves(ActiveLeavesUpdate),
Expand Down Expand Up @@ -256,8 +272,8 @@ fn action_from_overseer_message(
=> Action::SendValidationMessage(peers, msg),
NetworkBridgeMessage::SendCollationMessage(peers, msg)
=> Action::SendCollationMessage(peers, msg),
NetworkBridgeMessage::ConnectToValidators(peer_set, validators, res)
=> Action::ConnectToValidators(peer_set, validators, res),
NetworkBridgeMessage::ConnectToAuthorities(peer_set, authorities, res)
=> Action::ConnectToAuthorities(peer_set, authorities, res),
},
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_)))
=> Action::Nop,
Expand Down Expand Up @@ -541,10 +557,10 @@ async fn dispatch_collation_events_to_all<I>(
}

async fn run_network<N: Network>(
mut net: N,
mut net: NetworkBridge<N>,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()> {
let mut event_stream = net.event_stream().fuse();
let mut event_stream = net.network_service.event_stream().fuse();

// Most recent heads are at the back.
let mut live_heads: Vec<Hash> = Vec::with_capacity(MAX_VIEW_HEADS);
Expand All @@ -553,6 +569,10 @@ async fn run_network<N: Network>(
let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new();

let mut pending_discovery_requests: Vec<ConnectToAuthoritiesState> = Vec::new();
// we assume one PeerId per AuthorityId is enough
let mut connected_authority_peers: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();

loop {
let action = {
let subsystem_next = ctx.recv().fuse();
Expand All @@ -570,31 +590,60 @@ async fn run_network<N: Network>(
Action::Abort => return Ok(()),

Action::SendValidationMessage(peers, msg) => send_message(
&mut net,
&mut net.network_service,
peers,
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
).await?,

Action::SendCollationMessage(peers, msg) => send_message(
&mut net,
&mut net.network_service,
peers,
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
).await?,

Action::ConnectToValidators(_peer_set, _validators, _res) => {
// TODO: https://github.com/paritytech/polkadot/issues/1461
}
Action::ConnectToAuthorities(peer_set, authorities, res) => {
let priority_group = match peer_set {
PeerSet::Validation => "parachain_validation".to_owned(),
PeerSet::Collation => "parachain_collation".to_owned(),
};

// collect multiaddress of authorities
let mut multiaddresses = HashSet::new();
for authority in authorities.iter().cloned() {
let result = net.authority_discovery_service.get_addresses_by_authority_id(authority).await;
if let Some(addresses) = result {
// we might have several `PeerId`s per `AuthorityId` depending on the number of sentry nodes,
// so in theory we might want to limit max number of sentries per node to connect to,
// in practice the number of sentries per node is low and they are going to be removed soon
// https://github.com/paritytech/substrate/issues/6845
for addr in addresses {
multiaddresses.insert(addr);
}
}
}

Action::ReportPeer(peer, rep) => net.report_peer(peer, rep).await?,
// ask the network to connect to these nodes and not disconnect from them until removed from the priority group
if let Err(e) = net.network_service.set_priority_group(priority_group, multiaddresses) {
log::warn!("NetworkBridge: AuthorityDiscoveryService returned an invalid multiaddress: {}", e);
}

pending_discovery_requests.push(ConnectToAuthoritiesState::new(
authorities.into_iter().collect(),
res,
&connected_authority_peers,
));
},

Action::ReportPeer(peer, rep) => net.network_service.report_peer(peer, rep).await?,

Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
live_heads.extend(activated);
live_heads.retain(|h| !deactivated.contains(h));

update_view(
&mut net,
&mut net.network_service,
&mut ctx,
&live_heads,
&mut local_view,
Expand All @@ -609,9 +658,27 @@ async fn run_network<N: Network>(
PeerSet::Collation => &mut collation_peers,
};

// check if it's an authority we've been waiting for
let maybe_authority = net.authority_discovery_service.get_authority_id_by_peer_id(peer.clone()).await;
if let Some(authority) = maybe_authority {
let mut ready = Vec::new();
for (i, pending) in pending_discovery_requests.iter_mut().enumerate() {
pending.on_authority_connected(&authority, &peer);
if pending.is_ready() {
ready.push(i);
}
}
// fulfill all ready requests and remove them from the pending list
for to_send in ready.into_iter().rev() {
pending_discovery_requests.swap_remove(to_send).send();
}

connected_authority_peers.insert(authority, peer.clone());
}

match peer_map.entry(peer.clone()) {
HEntry::Occupied(_) => continue,
HEntry::Vacant(vacant) => {
hash_map::Entry::Occupied(_) => continue,
hash_map::Entry::Vacant(vacant) => {
vacant.insert(PeerData {
view: View(Vec::new()),
});
Expand Down Expand Up @@ -652,6 +719,23 @@ async fn run_network<N: Network>(
PeerSet::Collation => &mut collation_peers,
};

let maybe_authority = net.authority_discovery_service.get_authority_id_by_peer_id(peer.clone()).await;
if let Some(authority) = maybe_authority {
let mut ready = Vec::new();
for (i, pending) in pending_discovery_requests.iter_mut().enumerate() {
pending.on_authority_disconnected(&authority);
if pending.is_ready() {
ready.push(i);
}
}
// fulfill all ready requests and remove them from the pending list
for to_send in ready.into_iter().rev() {
pending_discovery_requests.swap_remove(to_send).send();
}

connected_authority_peers.remove(&authority);
}

if peer_map.remove(&peer).is_some() {
let res = match peer_set {
PeerSet::Validation => dispatch_validation_event_to_all(
Expand Down Expand Up @@ -679,7 +763,7 @@ async fn run_network<N: Network>(
peer.clone(),
&mut validation_peers,
v_messages,
&mut net,
&mut net.network_service,
).await?;

if let Err(e) = dispatch_validation_events_to_all(
Expand All @@ -699,7 +783,7 @@ async fn run_network<N: Network>(
peer.clone(),
&mut collation_peers,
c_messages,
&mut net,
&mut net.network_service,
).await?;

if let Err(e) = dispatch_collation_events_to_all(
Expand All @@ -718,6 +802,68 @@ async fn run_network<N: Network>(
}
}

/// This struct tracks the state for one `ConnectToAuthorities` request.
struct ConnectToAuthoritiesState {
pending: HashSet<AuthorityDiscoveryId>,
connected: HashMap<AuthorityDiscoveryId, PeerId>,
sender: oneshot::Sender<Vec<(AuthorityDiscoveryId, PeerId)>>,
}

impl ConnectToAuthoritiesState {
/// Create a new instance of `ConnectToAuthoritiesState`.
pub fn new(
mut pending: HashSet<AuthorityDiscoveryId>,
sender: oneshot::Sender<Vec<(AuthorityDiscoveryId, PeerId)>>,
already_connected: &HashMap<AuthorityDiscoveryId, PeerId>,
) -> Self {
let mut connected = HashMap::new();
pending.retain(|authority| {
if let Some(peer_id) = already_connected.get(authority) {
connected.insert(authority.clone(), peer_id.clone());
false
} else {
true
}
});

Self {
pending,
connected,
sender,
}
}

pub fn on_authority_connected(&mut self, authority: &AuthorityDiscoveryId, peer_id: &PeerId) {
if self.pending.remove(authority) {
self.connected.insert(authority.clone(), peer_id.clone());
}
}

pub fn on_authority_disconnected(&mut self, authority: &AuthorityDiscoveryId) {
let _ = self.pending.remove(authority);
let _ = self.connected.remove(authority);
}

/// Returns `true` if ready to send.
pub fn is_ready(&self) -> bool {
self.pending.is_empty() || self.sender.is_canceled()
}

/// Fulfill the request.
pub fn send(self) {
if self.sender.is_canceled() {
return;
}
// TODO: maybe we actually want this if there is a timeout
debug_assert!(self.is_ready(), "calling `send` when not ready");

let Self { mut connected, sender, .. } = self;
let reply: Vec<_> = connected.drain().collect();
// if the receiver is dropped, it doesn't care about these authorities anymore
let _ = sender.send(reply);
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading