Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
3e9abcf
stupid, but it compiles
Sep 5, 2020
e1fe858
redo
Sep 8, 2020
d09622b
cleanup
Sep 8, 2020
9e8cfe2
add ValidatorDiscovery to msgs
Sep 8, 2020
0bae9e4
sketch network bridge code
Sep 8, 2020
4fa0236
ConnectToAuthorities instead of validators
Sep 9, 2020
9f81f90
more stuff
Sep 10, 2020
d8d1302
cleanup
Sep 10, 2020
6c18572
more stuff
Sep 10, 2020
251427d
complete ConnectToAuthoritiesState
Sep 10, 2020
31d7329
Update node/network/bridge/src/lib.rs
Sep 10, 2020
f715c33
Collator protocol subsystem (#1659)
montekki Sep 10, 2020
b80e050
handle multiple in-flight connection requests
Sep 10, 2020
0e0525d
handle cancelled requests
Sep 10, 2020
64dcdb1
Merge branch 'master' into ao-validator-discovery-api
Sep 10, 2020
2cf1610
Update node/core/runtime-api/src/lib.rs
Sep 11, 2020
1bee32c
redo it again
Sep 11, 2020
c0d3a5a
more stuff
Sep 12, 2020
de19f1d
redo it again
Sep 14, 2020
eb1afd7
Merge branch 'ao-validator-discovery-api' of github.com:paritytech/po…
Sep 14, 2020
dbbfe23
Merge branch 'master' into ao-validator-discovery-api
Sep 14, 2020
e6a0a85
update comments
Sep 14, 2020
36cf3f4
workaround Future is not Send
Sep 14, 2020
9f20552
fix trailing spaces
Sep 14, 2020
ef4c6da
clarify comments
Sep 14, 2020
6305c41
bridge: fix compilation in tests
Sep 14, 2020
14fe353
update more comments
Sep 14, 2020
7ea8588
small fixes
Sep 14, 2020
f6a4068
port collator protocol to new validator discovery api
Sep 14, 2020
ac02180
collator tests compile
Sep 15, 2020
3dea047
collator tests pass
Sep 15, 2020
62e46a1
do not revoke a request when the stream receiver is closed
Sep 15, 2020
bbaf435
make revoking opt-in
Sep 15, 2020
8cfab6f
fix is_fulfilled
Sep 15, 2020
68fc8bb
handle request revokation in collator
Sep 16, 2020
a23edc0
tests
Sep 16, 2020
0a9c064
Merge branch 'master' into ao-validator-discovery-api
Sep 16, 2020
98d8346
wait for validator connections asyncronously
Sep 17, 2020
c03f766
Merge branch 'master' into ao-validator-discovery-api
Sep 17, 2020
e077b51
fix compilation
Sep 17, 2020
2132114
relabel my todos
Sep 17, 2020
eb3bacb
apply Fedor's patch
Sep 17, 2020
1112368
resolve reconnection TODO
Sep 18, 2020
6fbca68
Merge branch 'master' into ao-validator-discovery-api
Sep 18, 2020
3d2def9
resolve revoking TODO
Sep 18, 2020
dee8b27
resolve channel capacity TODO
Sep 18, 2020
c59d5a7
resolve peer cloning TODO
Sep 18, 2020
0580441
resolve peer disconnected TODO
Sep 18, 2020
db37a2f
resolve PeerSet TODO
Sep 18, 2020
a237119
wip tests
Sep 18, 2020
a8e3105
more tests
Sep 22, 2020
95f989e
resolve Arc TODO
Sep 22, 2020
86a64fa
rename pending to non_revoked
Sep 22, 2020
7585d6e
one more test
Sep 22, 2020
9534c2b
Merge branch 'master' into ao-validator-discovery-api
Sep 22, 2020
f83534c
extract utility function into util crate
Sep 22, 2020
cb97211
fix compilation in tests
Sep 22, 2020
c1da4c6
Apply suggestions from code review
Sep 23, 2020
c6aa649
Merge branch 'master' into ao-validator-discovery-api
Sep 23, 2020
ae7f529
revert pin_project removal
Sep 23, 2020
fbf8901
fix while let loop
Sep 23, 2020
78216fc
Revert "revert pin_project removal"
Sep 23, 2020
4cb79ad
fix compilation
Sep 23, 2020
0295436
Update node/subsystem/src/messages.rs
Sep 23, 2020
66a6eed
Merge branch 'master' into ao-validator-discovery-api
Sep 28, 2020
7e67489
docs on pub items
Sep 28, 2020
30da479
guide updates
Sep 28, 2020
9c5b654
Merge branch 'master' into ao-validator-discovery-api
Sep 28, 2020
0e9771c
remove a TODO
Sep 28, 2020
4cc5f3e
small guide update
Sep 28, 2020
23bde8d
fix a typo
Sep 28, 2020
88e83ed
link to the issue
Sep 28, 2020
3893324
Merge branch 'master' into ao-validator-discovery-api
Oct 6, 2020
a7b5ddc
validator discovery: on_request docs
Oct 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
handle multiple in-flight connection requests
  • Loading branch information
Andronik Ordian committed Sep 10, 2020
commit b80e050678adc48237b3c51b35264ebe3f29f3e6
113 changes: 51 additions & 62 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub trait Network: Send + 'static {

/// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group.
fn set_priority_group(&self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
// TODO: we might want to add `add_to_priority_group` and `remove_from_priority_group`
// TODO: we might want to add `add_to_priority_group` and `remove_from_priority_group`
}

impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
Expand Down Expand Up @@ -569,10 +569,8 @@ async fn run_network<N: Network>(
let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new();

let mut pending_discovery_requests: Vec<ConnectToAuthoritiesState> = Vec::new();
// we assume one PeerId per AuthorityId is enough
// the requests are emptied when a new ConnectToAuthorities arrives
let mut collation_pending_discovery_requests = ConnectToAuthoritiesState::default();
let mut validation_pending_discovery_requests = ConnectToAuthoritiesState::default();
let mut connected_authority_peers: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();

loop {
Expand Down Expand Up @@ -606,16 +604,11 @@ async fn run_network<N: Network>(
).await?,

Action::ConnectToAuthorities(peer_set, authorities, res) => {
let (priority_group, requests) = match peer_set {
PeerSet::Validation => ("parachain_validation".to_owned(), &mut validation_pending_discovery_requests),
PeerSet::Collation => ("parachain_collation".to_owned(), &mut collation_pending_discovery_requests),
let priority_group = match peer_set {
PeerSet::Validation => "parachain_validation".to_owned(),
PeerSet::Collation => "parachain_collation".to_owned(),
};

// drop the previous requests
// TODO: do we want to handle more than one request at a time?
// what would be a cleaning strategy in that case?
let _ = requests.clear();

// collect multiaddress of authorities
let mut multiaddresses = HashSet::new();
for authority in authorities.iter().cloned() {
Expand All @@ -636,11 +629,11 @@ async fn run_network<N: Network>(
log::warn!("NetworkBridge: AuthorityDiscoveryService returned an invalid multiaddress: {}", e);
}

*requests = ConnectToAuthoritiesState::new(
pending_discovery_requests.push(ConnectToAuthoritiesState::new(
authorities.into_iter().collect(),
res,
&connected_authority_peers,
);
));
},

Action::ReportPeer(peer, rep) => net.network_service.report_peer(peer, rep).await?,
Expand All @@ -660,15 +653,26 @@ async fn run_network<N: Network>(
}

Action::PeerConnected(peer_set, peer, role) => {
let (peer_map, pending_requests) = match peer_set {
PeerSet::Validation => (&mut validation_peers, &mut validation_pending_discovery_requests),
PeerSet::Collation => (&mut collation_peers, &mut collation_pending_discovery_requests),
let peer_map = match peer_set {
PeerSet::Validation => &mut validation_peers,
PeerSet::Collation => &mut collation_peers,
};

// check if it's an authority we've been waiting for
let maybe_authority = net.authority_discovery_service.get_authority_id_by_peer_id(peer.clone()).await;
if let Some(authority) = maybe_authority {
pending_requests.on_authority_connected(&authority, &peer);
let mut ready = Vec::new();
for (i, pending) in pending_discovery_requests.iter_mut().enumerate() {
pending.on_authority_connected(&authority, &peer);
if pending.is_ready() {
ready.push(i);
}
}
// fulfill all ready requests and remove them from the pending list
for to_send in ready.into_iter().rev() {
pending_discovery_requests.swap_remove(to_send).send();
}

connected_authority_peers.insert(authority, peer.clone());
}

Expand Down Expand Up @@ -710,14 +714,25 @@ async fn run_network<N: Network>(
}
}
Action::PeerDisconnected(peer_set, peer) => {
let (peer_map, pending_requests) = match peer_set {
PeerSet::Validation => (&mut validation_peers, &mut validation_pending_discovery_requests),
PeerSet::Collation => (&mut collation_peers, &mut collation_pending_discovery_requests),
let peer_map = match peer_set {
PeerSet::Validation => &mut validation_peers,
PeerSet::Collation => &mut collation_peers,
};

let maybe_authority = net.authority_discovery_service.get_authority_id_by_peer_id(peer.clone()).await;
if let Some(authority) = maybe_authority {
pending_requests.on_authority_disconnected(&authority);
let mut ready = Vec::new();
for (i, pending) in pending_discovery_requests.iter_mut().enumerate() {
pending.on_authority_disconnected(&authority);
if pending.is_ready() {
ready.push(i);
}
}
// fulfill all ready requests and remove them from the pending list
for to_send in ready.into_iter().rev() {
pending_discovery_requests.swap_remove(to_send).send();
}

connected_authority_peers.remove(&authority);
}

Expand Down Expand Up @@ -788,10 +803,7 @@ async fn run_network<N: Network>(
}

/// This struct tracks the state for one `ConnectToAuthorities` request.
#[derive(Default)]
struct ConnectToAuthoritiesState(Option<ConnectToAuthoritiesStateInner>);

struct ConnectToAuthoritiesStateInner {
struct ConnectToAuthoritiesState {
pending: HashSet<AuthorityDiscoveryId>,
connected: HashMap<AuthorityDiscoveryId, PeerId>,
sender: oneshot::Sender<Vec<(AuthorityDiscoveryId, PeerId)>>,
Expand All @@ -800,7 +812,7 @@ struct ConnectToAuthoritiesStateInner {
impl ConnectToAuthoritiesState {
/// Create a new instance of `ConnectToAuthoritiesState`.
pub fn new(
mut pending: HashSet<AuthorityDiscoveryId>,
mut pending: HashSet<AuthorityDiscoveryId>,
sender: oneshot::Sender<Vec<(AuthorityDiscoveryId, PeerId)>>,
already_connected: &HashMap<AuthorityDiscoveryId, PeerId>,
) -> Self {
Expand All @@ -814,61 +826,38 @@ impl ConnectToAuthoritiesState {
}
});

Self(Some(ConnectToAuthoritiesStateInner {
Self {
pending,
connected,
sender,
}))
}

/// Drop the pending requests.
pub fn clear(&mut self) {
let _ = self.0.take();
}
}

pub fn on_authority_connected(&mut self, authority: &AuthorityDiscoveryId, peer_id: &PeerId) {
let inner = match self.0.as_mut() {
Some(inner) => inner,
None => return,
};

if inner.pending.remove(authority) {
inner.connected.insert(authority.clone(), peer_id.clone());
}

if self.is_ready() {
self.send();
if self.pending.remove(authority) {
self.connected.insert(authority.clone(), peer_id.clone());
}
}

pub fn on_authority_disconnected(&mut self, authority: &AuthorityDiscoveryId) {
let inner = match self.0.as_mut() {
Some(inner) => inner,
None => return,
};

let _ = inner.pending.remove(authority);
let _ = inner.connected.remove(authority);
let _ = self.pending.remove(authority);
let _ = self.connected.remove(authority);
}

/// Returns `true` if ready to send.
fn is_ready(&self) -> bool {
self.0.as_ref().map(|inner| inner.pending.is_empty()).unwrap_or(false)
pub fn is_ready(&self) -> bool {
self.pending.is_empty()
}

/// Fulfill the request.
fn send(&mut self) {
pub fn send(self) {
// TODO: maybe we actually want this if there is a timeout
debug_assert!(self.is_ready(), "calling `send` when not ready");

let inner = match self.0.take() {
Some(inner) => inner,
None => return,
};

let reply = inner.connected.into_iter().collect();
let Self { mut connected, sender, .. } = self;
let reply: Vec<_> = connected.drain().collect();
// if the receiver is dropped, it doesn't care about these authorities anymore
let _ = inner.sender.send(reply);
let _ = sender.send(reply);
}
}

Expand Down