Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
3e9abcf
stupid, but it compiles
ordian Sep 5, 2020
e1fe858
redo
ordian Sep 8, 2020
d09622b
cleanup
ordian Sep 8, 2020
9e8cfe2
add ValidatorDiscovery to msgs
ordian Sep 8, 2020
0bae9e4
sketch network bridge code
ordian Sep 8, 2020
4fa0236
ConnectToAuthorities instead of validators
ordian Sep 9, 2020
9f81f90
more stuff
ordian Sep 10, 2020
d8d1302
cleanup
ordian Sep 10, 2020
6c18572
more stuff
ordian Sep 10, 2020
251427d
complete ConnectToAuthoritiesState
ordian Sep 10, 2020
31d7329
Update node/network/bridge/src/lib.rs
ordian Sep 10, 2020
f715c33
Collator protocol subsystem (#1659)
montekki Sep 10, 2020
b80e050
handle multiple in-flight connection requests
ordian Sep 10, 2020
0e0525d
handle cancelled requests
ordian Sep 10, 2020
64dcdb1
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 10, 2020
2cf1610
Update node/core/runtime-api/src/lib.rs
ordian Sep 11, 2020
1bee32c
redo it again
ordian Sep 11, 2020
c0d3a5a
more stuff
ordian Sep 12, 2020
de19f1d
redo it again
ordian Sep 14, 2020
eb1afd7
Merge branch 'ao-validator-discovery-api' of github.com:paritytech/po…
ordian Sep 14, 2020
dbbfe23
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 14, 2020
e6a0a85
update comments
ordian Sep 14, 2020
36cf3f4
workaround Future is not Send
ordian Sep 14, 2020
9f20552
fix trailing spaces
ordian Sep 14, 2020
ef4c6da
clarify comments
ordian Sep 14, 2020
6305c41
bridge: fix compilation in tests
ordian Sep 14, 2020
14fe353
update more comments
ordian Sep 14, 2020
7ea8588
small fixes
ordian Sep 14, 2020
f6a4068
port collator protocol to new validator discovery api
ordian Sep 14, 2020
ac02180
collator tests compile
ordian Sep 15, 2020
3dea047
collator tests pass
ordian Sep 15, 2020
62e46a1
do not revoke a request when the stream receiver is closed
ordian Sep 15, 2020
bbaf435
make revoking opt-in
ordian Sep 15, 2020
8cfab6f
fix is_fulfilled
ordian Sep 15, 2020
68fc8bb
handle request revokation in collator
ordian Sep 16, 2020
a23edc0
tests
ordian Sep 16, 2020
0a9c064
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 16, 2020
98d8346
wait for validator connections asyncronously
ordian Sep 17, 2020
c03f766
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 17, 2020
e077b51
fix compilation
ordian Sep 17, 2020
2132114
relabel my todos
ordian Sep 17, 2020
eb3bacb
apply Fedor's patch
ordian Sep 17, 2020
1112368
resolve reconnection TODO
ordian Sep 18, 2020
6fbca68
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 18, 2020
3d2def9
resolve revoking TODO
ordian Sep 18, 2020
dee8b27
resolve channel capacity TODO
ordian Sep 18, 2020
c59d5a7
resolve peer cloning TODO
ordian Sep 18, 2020
0580441
resolve peer disconnected TODO
ordian Sep 18, 2020
db37a2f
resolve PeerSet TODO
ordian Sep 18, 2020
a237119
wip tests
ordian Sep 18, 2020
a8e3105
more tests
ordian Sep 22, 2020
95f989e
resolve Arc TODO
ordian Sep 22, 2020
86a64fa
rename pending to non_revoked
ordian Sep 22, 2020
7585d6e
one more test
ordian Sep 22, 2020
9534c2b
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 22, 2020
f83534c
extract utility function into util crate
ordian Sep 22, 2020
cb97211
fix compilation in tests
ordian Sep 22, 2020
c1da4c6
Apply suggestions from code review
ordian Sep 23, 2020
c6aa649
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 23, 2020
ae7f529
revert pin_project removal
ordian Sep 23, 2020
fbf8901
fix while let loop
ordian Sep 23, 2020
78216fc
Revert "revert pin_project removal"
ordian Sep 23, 2020
4cb79ad
fix compilation
ordian Sep 23, 2020
0295436
Update node/subsystem/src/messages.rs
ordian Sep 23, 2020
66a6eed
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 28, 2020
7e67489
docs on pub items
ordian Sep 28, 2020
30da479
guide updates
ordian Sep 28, 2020
9c5b654
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 28, 2020
0e9771c
remove a TODO
ordian Sep 28, 2020
4cc5f3e
small guide update
ordian Sep 28, 2020
23bde8d
fix a typo
ordian Sep 28, 2020
88e83ed
link to the issue
ordian Sep 28, 2020
3893324
Merge branch 'master' into ao-validator-discovery-api
ordian Oct 6, 2020
a7b5ddc
validator discovery: on_request docs
ordian Oct 6, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
extract utility function into util crate
  • Loading branch information
ordian committed Sep 22, 2020
commit f83534c901910284d6c9b08a73a669daf0b4892a
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/network/collator-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ edition = "2018"
futures = "0.3.5"
log = "0.4.11"
derive_more = "0.99.9"
pin-project = "0.4.22"

codec = { package="parity-scale-codec", version = "1.3.4", features = ["std"] }

Expand Down
124 changes: 12 additions & 112 deletions node/network/collator-protocol/src/collator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::pin::Pin;

use super::{TARGET, Result};

use futures::channel::{mpsc, oneshot};
use futures::stream::{self, StreamExt as _};
use futures::task::{Poll, self};
use futures::channel::oneshot;
use futures::stream::StreamExt as _;
use futures::task::Poll;
use log::warn;
use pin_project::pin_project;

use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt,
PoV, ValidatorId, AuthorityDiscoveryId,
PoV, ValidatorId,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, SubsystemContext,
Expand All @@ -40,6 +38,7 @@ use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, NetworkBridgeEvent, RequestId,
};
use polkadot_node_subsystem_util::{
validator_discovery,
request_validators_ctx,
request_validator_groups_ctx,
};
Expand Down Expand Up @@ -77,7 +76,7 @@ struct State {
known_validators: HashMap<PeerId, ValidatorId>,

/// Use to await for the next validator connection and revoke the request.
last_connection_request: Option<ConnectionRequest>,
last_connection_request: Option<validator_discovery::ConnectionRequest>,
}

/// Distribute a collation.
Expand Down Expand Up @@ -261,116 +260,17 @@ where
request.revoke();
}

let request = connect_to_validators_impl(ctx, relay_parent, validators).await?;
let request = validator_discovery::connect_to_validators(
ctx,
relay_parent,
validators,
).await?;

state.last_connection_request = Some(request);

Ok(())
}

// TODO (ordian): make this code reusable by other subsystems
// put in subsystem-util?
async fn connect_to_validators_impl<Context: SubsystemContext>(
ctx: &mut Context,
relay_parent: Hash,
validators: Vec<ValidatorId>,
) -> Result<ConnectionRequest> {
// ValidatorId -> AuthorityDiscoveryId
let (tx, rx) = oneshot::channel();

ctx.send_message(AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators.clone(), tx),
)
)).await?;

let maybe_authorities = rx.await??;
let authorities: Vec<_> = maybe_authorities.iter()
.cloned()
.filter_map(|id| id)
.collect();

let validator_map = validators.into_iter()
.zip(maybe_authorities.into_iter())
.filter_map(|(k, v)| v.map(|v| (v, k)))
.collect::<HashMap<AuthorityDiscoveryId, ValidatorId>>();

let (connections, revoke) = connect_to_authorities(ctx, authorities).await?;

Ok(ConnectionRequest {
validator_map,
connections,
revoke,
})
}

async fn connect_to_authorities<Context: SubsystemContext>(
ctx: &mut Context,
validator_ids: Vec<AuthorityDiscoveryId>,
) -> Result<(mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>, oneshot::Sender<()>)> {
// do we need a bounded channel at all?
const PEERS_CAPACITY: usize = 8;

let (revoke_tx, revoke) = oneshot::channel();
let (connected, connected_rx) = mpsc::channel(PEERS_CAPACITY);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
connected,
revoke,
}
)).await?;

Ok((connected_rx, revoke_tx))
}

#[pin_project]
struct ConnectionRequest {
#[pin]
validator_map: HashMap<AuthorityDiscoveryId, ValidatorId>,
#[pin]
#[must_use = "streams do nothing unless polled"]
connections: mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>,
#[must_use = "a request should be revoked at some point"]
revoke: oneshot::Sender<()>,
}

impl stream::Stream for ConnectionRequest {
type Item = (ValidatorId, PeerId);

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if this.validator_map.is_empty() {
return Poll::Ready(None);
}
match this.connections.poll_next(cx) {
Poll::Ready(Some((id, peer_id))) => {
if let Some(validator_id) = this.validator_map.remove(&id) {
return Poll::Ready(Some((validator_id, peer_id)));
} else {
// unknown authority_id
// should be unreachable
}
}
_ => {},
}
Poll::Pending
}
}

impl ConnectionRequest {
pub fn revoke(self) {
if let Err(_) = self.revoke.send(()) {
warn!(
target: TARGET,
"Failed to revoke a validator connection request",
);
}
}
}

/// Advertise collation to a set of relay chain validators.
async fn advertise_collation<Context>(
ctx: &mut Context,
Expand Down Expand Up @@ -690,7 +590,7 @@ where
Communication { msg } => process_msg(&mut ctx, &mut state, msg).await?,
Signal(ActiveLeaves(_update)) => {}
Signal(BlockFinalized(_)) => {}
Signal(Conclude) => break,
Signal(Conclude) => break,
}
}

Expand Down
10 changes: 10 additions & 0 deletions node/network/collator-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ enum Error {
UtilError(util::Error),
}

impl From<util::validator_discovery::Error> for Error {
fn from(me: util::validator_discovery::Error) -> Self {
match me {
util::validator_discovery::Error::Subsystem(s) => Error::Subsystem(s),
util::validator_discovery::Error::RuntimeApi(ra) => Error::RuntimeApi(ra),
util::validator_discovery::Error::Oneshot(c) => Error::Oneshot(c),
}
}
}

type Result<T> = std::result::Result<T, Error>;

enum ProtocolSide {
Expand Down
2 changes: 2 additions & 0 deletions node/subsystem-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ use std::{
};
use streamunordered::{StreamUnordered, StreamYield};

pub mod validator_discovery;

/// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
pub mod reexports {
pub use sp_core::traits::SpawnNamed;
Expand Down
155 changes: 155 additions & 0 deletions node/subsystem-util/src/validator_discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Utility function to make it easier to connect to validators.

use std::collections::HashMap;
use std::pin::Pin;

use futures::{
channel::{mpsc, oneshot},
task::{Poll, self},
stream,
};
use pin_project::pin_project;

use polkadot_node_subsystem::{
errors::RuntimeApiError, SubsystemError,
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeMessage},
SubsystemContext,
};
use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId};
use sc_network::PeerId;

/// Error when making a request to connect to validators.
#[derive(Debug, derive_more::From)]
pub enum Error {
/// Attempted to send or receive on a oneshot channel which had been canceled
#[from]
Oneshot(oneshot::Canceled),
/// A subsystem error.
#[from]
Subsystem(SubsystemError),
/// An error in the Runtime API.
#[from]
RuntimeApi(RuntimeApiError),
}

/// Utility function to make it easier to connect to validators.
pub async fn connect_to_validators<Context: SubsystemContext>(
ctx: &mut Context,
relay_parent: Hash,
validators: Vec<ValidatorId>,
) -> Result<ConnectionRequest, Error> {
// ValidatorId -> AuthorityDiscoveryId
let (tx, rx) = oneshot::channel();

ctx.send_message(AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators.clone(), tx),
)
)).await?;

let maybe_authorities = rx.await??;
let authorities: Vec<_> = maybe_authorities.iter()
.cloned()
.filter_map(|id| id)
.collect();

let validator_map = validators.into_iter()
.zip(maybe_authorities.into_iter())
.filter_map(|(k, v)| v.map(|v| (v, k)))
.collect::<HashMap<AuthorityDiscoveryId, ValidatorId>>();

let (connections, revoke) = connect_to_authorities(ctx, authorities).await?;

Ok(ConnectionRequest {
validator_map,
connections,
revoke,
})
}

async fn connect_to_authorities<Context: SubsystemContext>(
ctx: &mut Context,
validator_ids: Vec<AuthorityDiscoveryId>,
) -> Result<(mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>, oneshot::Sender<()>), Error> {
const PEERS_CAPACITY: usize = 8;

let (revoke_tx, revoke) = oneshot::channel();
let (connected, connected_rx) = mpsc::channel(PEERS_CAPACITY);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
connected,
revoke,
}
)).await?;

Ok((connected_rx, revoke_tx))
}

/// A pending connection request to validators.
/// This struct implements `Stream` to allow for asynchronous
/// discovery of validator addresses.
///
/// NOTE: you should call `revoke` on this struct
/// when you're no longer interested in the requested validators.
#[pin_project]
pub struct ConnectionRequest {
#[pin]
validator_map: HashMap<AuthorityDiscoveryId, ValidatorId>,
#[pin]
#[must_use = "streams do nothing unless polled"]
connections: mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>,
#[must_use = "a request should be revoked at some point"]
revoke: oneshot::Sender<()>,
}

impl stream::Stream for ConnectionRequest {
type Item = (ValidatorId, PeerId);

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if this.validator_map.is_empty() {
return Poll::Ready(None);
}
match this.connections.poll_next(cx) {
Poll::Ready(Some((id, peer_id))) => {
if let Some(validator_id) = this.validator_map.remove(&id) {
return Poll::Ready(Some((validator_id, peer_id)));
} else {
// unknown authority_id
// should be unreachable
}
}
_ => {},
}
Poll::Pending
}
}

impl ConnectionRequest {
pub fn revoke(self) {
if let Err(_) = self.revoke.send(()) {
log::warn!(
"Failed to revoke a validator connection request",
);
}
}
}