Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup
  • Loading branch information
drahnr committed Jun 20, 2022
commit 3fe17bc0b10ed1c48cbba38503be1493d93b80fb
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node/network/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-subsystem-util = { path = "../../subsystem-util"}
parking_lot = "0.12.0"
bytes = "1"
fatality = "0.0.6"
thiserror = "1"

[dev-dependencies]
assert_matches = "1.4.0"
Expand Down
38 changes: 26 additions & 12 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl<N, AD> NetworkBridge<N, AD> {
impl<Net, AD, Context> NetworkBridge<Net, AD>
where
Net: Network + Sync,
AD: validator_discovery::AuthorityDiscovery + Clone,
AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
{
fn start(mut self, ctx: Context) -> SpawnedSubsystem {
// The stream of networking events has to be created at initialization, otherwise the
Expand Down Expand Up @@ -204,7 +204,7 @@ where
let mut mode = Mode::Syncing(sync_oracle);

loop {
match ctx.recv().fuse()? {
match ctx.recv().fuse().await? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves)) => {
let ActiveLeavesUpdate { activated, deactivated } = active_leaves;
gum::trace!(
Expand Down Expand Up @@ -262,15 +262,25 @@ where
return Ok(());
}
FromOrchestra::Communication { msg } => {
handle_incoming(&mut ctx, &mut network_service, msg).await;
(network_service, authority_discovery_service) = handle_incoming(&mut ctx, network_service, &mut validator_discovery, authority_discovery_service.clone(), msg, &metrics).await;
}
}
}
Ok(())
}

#[overseer::contextbounds(NetworkBridge, prefix = self::overseer)]
async fn handle_incoming<Context, N: Network>(ctx: &mut Context, network_service: &mut N, msg: NetworkBridgeMessage) {
async fn handle_incoming<Context, N, AD>(
ctx: &mut Context,
mut network_service: N,
validator_discovery: &mut validator_discovery::Service<N, AD>,
mut authority_discovery_service: AD,
msg: NetworkBridgeMessage,
metrics: &Metrics,
) -> (N, AD)
where
N: Network,
AD: validator_discovery::AuthorityDiscovery + Clone,
{
match msg {
NetworkBridgeMessage::ReportPeer(peer, rep) => {
if !rep.is_benefit() {
Expand Down Expand Up @@ -391,16 +401,15 @@ async fn handle_incoming<Context, N: Network>(ctx: &mut Context, network_service

metrics.note_desired_peer_count(peer_set, validator_ids.len());

let (ns, ads) = validator_discovery.on_request(
let (network_service, ads) = validator_discovery.on_request(
validator_ids,
peer_set,
failed,
network_service,
authority_discovery_service,
).await;

network_service = ns;
authority_discovery_service = ads;
return (network_service, ads);
}
NetworkBridgeMessage::ConnectToResolvedValidators {
validator_addrs,
Expand All @@ -417,11 +426,12 @@ async fn handle_incoming<Context, N: Network>(ctx: &mut Context, network_service
metrics.note_desired_peer_count(peer_set, validator_addrs.len());

let all_addrs = validator_addrs.into_iter().flatten().collect();
network_service = validator_discovery.on_resolved_request(
let network_service = validator_discovery.on_resolved_request(
all_addrs,
peer_set,
network_service,
).await;
return (network_service, authority_discovery_service);
}
NetworkBridgeMessage::NewGossipTopology {
session,
Expand Down Expand Up @@ -458,6 +468,7 @@ async fn handle_incoming<Context, N: Network>(ctx: &mut Context, network_service
);
}
}
(network_service, authority_discovery_service)
}

async fn update_gossip_peers_1d<AD, N>(
Expand All @@ -482,14 +493,17 @@ where
peers
}

async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
async fn handle_network_messages<AD>(
mut sender: impl overseer::NetworkBridgeSenderTrait,
mut network_service: impl Network,
network_stream: BoxStream<'static, NetworkEvent>,
mut authority_discovery_service: AD,
metrics: Metrics,
shared: Shared,
) -> Result<(), Error> {
) -> Result<(), Error>
where
AD: validator_discovery::AuthorityDiscovery + Send,
{
let mut network_stream = network_stream.fuse();
loop {
match network_stream.next().await {
Expand Down Expand Up @@ -844,7 +858,7 @@ async fn run_network<N, AD, Context>(
) -> SubsystemResult<()>
where
N: Network,
AD: validator_discovery::AuthorityDiscovery + Clone,
AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
{
let shared = Shared::default();

Expand Down