From 940a1cfc1c8f94612630675f1e5fea38a0d2b4c3 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Wed, 25 May 2022 17:53:41 +0200 Subject: [PATCH 1/6] foo --- node/network/bridge/src/lib.rs | 493 +++++++++++++++++---------------- 1 file changed, 248 insertions(+), 245 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index f10f7fa5b67a..8df83243640b 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -150,23 +150,21 @@ struct PeerData { version: ProtocolVersion, } -#[derive(Debug)] -enum UnexpectedAbort { +#[fatality::fatality(splitable)] +enum Error { /// Received error from overseer: - SubsystemError(SubsystemError), + #[fatal] + #[error(transparent)] + SubsystemError(#[from] SubsystemError), /// The stream of incoming events concluded. + #[fatal] + #[error("Event stream closed unexpectedly")] EventStreamConcluded, } -impl From for UnexpectedAbort { - fn from(e: SubsystemError) -> Self { - UnexpectedAbort::SubsystemError(e) - } -} - -impl From for UnexpectedAbort { +impl From for Error { fn from(e: OverseerError) -> Self { - UnexpectedAbort::SubsystemError(SubsystemError::from(e)) + Error::SubsystemError(SubsystemError::from(e)) } } @@ -193,7 +191,7 @@ async fn handle_subsystem_messages( shared: Shared, sync_oracle: Box, metrics: Metrics, -) -> Result<(), UnexpectedAbort> +) -> Result<(), Error> where N: Network, AD: validator_discovery::AuthorityDiscovery + Clone, @@ -206,253 +204,258 @@ where let mut mode = Mode::Syncing(sync_oracle); loop { - futures::select! { - msg = ctx.recv().fuse() => match msg { - Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => { - let ActiveLeavesUpdate { activated, deactivated } = active_leaves; - gum::trace!( - target: LOG_TARGET, - action = "ActiveLeaves", - has_activated = activated.is_some(), - num_deactivated = %deactivated.len(), - ); - - for activated in activated { - let pos = live_heads - .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse()) - .unwrap_or_else(|i| i); - - live_heads.insert(pos, activated); - } - live_heads.retain(|h| !deactivated.contains(&h.hash)); - - // if we're done syncing, set the mode to `Mode::Active`. - // Otherwise, we don't need to send view updates. - { - let is_done_syncing = match mode { - Mode::Active => true, - Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(), - }; + match ctx.recv().fuse()? { + FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves)) => { + let ActiveLeavesUpdate { activated, deactivated } = active_leaves; + gum::trace!( + target: LOG_TARGET, + action = "ActiveLeaves", + has_activated = activated.is_some(), + num_deactivated = %deactivated.len(), + ); - if is_done_syncing { - mode = Mode::Active; + for activated in activated { + let pos = live_heads + .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse()) + .unwrap_or_else(|i| i); - update_our_view( - &mut network_service, - &mut ctx, - &live_heads, - &shared, - finalized_number, - &metrics, - ); - } - } - } - Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number))) => { - gum::trace!( - target: LOG_TARGET, - action = "BlockFinalized" - ); - - debug_assert!(finalized_number < number); - - // we don't send the view updates here, but delay them until the next `ActiveLeaves` - // otherwise it might break assumptions of some of the subsystems - // that we never send the same `ActiveLeavesUpdate` - finalized_number = number; + live_heads.insert(pos, activated); } - Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => { - return Ok(()); - } - Ok(FromOrchestra::Communication { msg }) => match msg { - NetworkBridgeMessage::ReportPeer(peer, rep) => { - if !rep.is_benefit() { - gum::debug!( - target: LOG_TARGET, - ?peer, - ?rep, - action = "ReportPeer" - ); - } + live_heads.retain(|h| !deactivated.contains(&h.hash)); - metrics.on_report_event(); - network_service.report_peer(peer, rep); - } - NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { - gum::trace!( - target: LOG_TARGET, - action = "DisconnectPeer", - ?peer, - peer_set = ?peer_set, - ); + // if we're done syncing, set the mode to `Mode::Active`. + // Otherwise, we don't need to send view updates. + { + let is_done_syncing = match mode { + Mode::Active => true, + Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(), + }; - network_service.disconnect_peer(peer, peer_set); - } - NetworkBridgeMessage::SendValidationMessage(peers, msg) => { - gum::trace!( - target: LOG_TARGET, - action = "SendValidationMessages", - num_messages = 1usize, - ); + if is_done_syncing { + mode = Mode::Active; - match msg { - Versioned::V1(msg) => send_validation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - NetworkBridgeMessage::SendValidationMessages(msgs) => { - gum::trace!( - target: LOG_TARGET, - action = "SendValidationMessages", - num_messages = %msgs.len(), + update_our_view( + &mut network_service, + &mut ctx, + &live_heads, + &shared, + finalized_number, + &metrics, ); - - for (peers, msg) in msgs { - match msg { - Versioned::V1(msg) => send_validation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } } - NetworkBridgeMessage::SendCollationMessage(peers, msg) => { - gum::trace!( - target: LOG_TARGET, - action = "SendCollationMessages", - num_messages = 1usize, - ); + } + } + FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { + gum::trace!( + target: LOG_TARGET, + action = "BlockFinalized" + ); - match msg { - Versioned::V1(msg) => send_collation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - NetworkBridgeMessage::SendCollationMessages(msgs) => { - gum::trace!( - target: LOG_TARGET, - action = "SendCollationMessages", - num_messages = %msgs.len(), - ); + debug_assert!(finalized_number < number); + + // we don't send the view updates here, but delay them until the next `ActiveLeaves` + // otherwise it might break assumptions of some of the subsystems + // that we never send the same `ActiveLeavesUpdate` + finalized_number = number; + } + FromOrchestra::Signal(OverseerSignal::Conclude) => { + return Ok(()); + } + FromOrchestra::Communication { msg } => { + handle_incoming(&mut ctx, &mut network_service, msg).await; + } + } + } + Ok(()) +} - for (peers, msg) in msgs { - match msg { - Versioned::V1(msg) => send_collation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - } - NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => { - gum::trace!( - target: LOG_TARGET, - action = "SendRequests", - num_requests = %reqs.len(), - ); +#[overseer::contextbounds(NetworkBridge, prefix = self::overseer)] +async fn handle_incoming(ctx: &mut Context, network_service: &mut N, msg: NetworkBridgeMessage) { + match msg { + NetworkBridgeMessage::ReportPeer(peer, rep) => { + if !rep.is_benefit() { + gum::debug!( + target: LOG_TARGET, + ?peer, + ?rep, + action = "ReportPeer" + ); + } - for req in reqs { - network_service - .start_request(&mut authority_discovery_service, req, if_disconnected) - .await; - } - } - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - peer_set, - failed, - } => { - gum::trace!( - target: LOG_TARGET, - action = "ConnectToValidators", - peer_set = ?peer_set, - ids = ?validator_ids, - "Received a validator connection request", - ); + metrics.on_report_event(); + network_service.report_peer(peer, rep); + } + NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { + gum::trace!( + target: LOG_TARGET, + action = "DisconnectPeer", + ?peer, + peer_set = ?peer_set, + ); - metrics.note_desired_peer_count(peer_set, validator_ids.len()); + network_service.disconnect_peer(peer, peer_set); + } + NetworkBridgeMessage::SendValidationMessage(peers, msg) => { + gum::trace!( + target: LOG_TARGET, + action = "SendValidationMessages", + num_messages = 1usize, + ); - let (ns, ads) = validator_discovery.on_request( - validator_ids, - peer_set, - failed, - network_service, - authority_discovery_service, - ).await; + match msg { + Versioned::V1(msg) => send_validation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + } + NetworkBridgeMessage::SendValidationMessages(msgs) => { + gum::trace!( + target: LOG_TARGET, + action = "SendValidationMessages", + num_messages = %msgs.len(), + ); - network_service = ns; - authority_discovery_service = ads; - } - NetworkBridgeMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - } => { - gum::trace!( - target: LOG_TARGET, - action = "ConnectToPeers", - peer_set = ?peer_set, - ?validator_addrs, - "Received a resolved validator connection request", - ); + for (peers, msg) in msgs { + match msg { + Versioned::V1(msg) => send_validation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + } + } + NetworkBridgeMessage::SendCollationMessage(peers, msg) => { + gum::trace!( + target: LOG_TARGET, + action = "SendCollationMessages", + num_messages = 1usize, + ); - metrics.note_desired_peer_count(peer_set, validator_addrs.len()); + match msg { + Versioned::V1(msg) => send_collation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + } + NetworkBridgeMessage::SendCollationMessages(msgs) => { + gum::trace!( + target: LOG_TARGET, + action = "SendCollationMessages", + num_messages = %msgs.len(), + ); - let all_addrs = validator_addrs.into_iter().flatten().collect(); - network_service = validator_discovery.on_resolved_request( - all_addrs, - peer_set, - network_service, - ).await; - } - NetworkBridgeMessage::NewGossipTopology { - session, - our_neighbors_x, - our_neighbors_y, - } => { - gum::debug!( - target: LOG_TARGET, - action = "NewGossipTopology", - neighbors_x = our_neighbors_x.len(), - neighbors_y = our_neighbors_y.len(), - "Gossip topology has changed", - ); + for (peers, msg) in msgs { + match msg { + Versioned::V1(msg) => send_collation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + } + } + NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => { + gum::trace!( + target: LOG_TARGET, + action = "SendRequests", + num_requests = %reqs.len(), + ); - let gossip_peers_x = update_gossip_peers_1d( - &mut authority_discovery_service, - our_neighbors_x, - ).await; - - let gossip_peers_y = update_gossip_peers_1d( - &mut authority_discovery_service, - our_neighbors_y, - ).await; - - dispatch_validation_event_to_all_unbounded( - NetworkBridgeEvent::NewGossipTopology( - NewGossipTopology { - session, - our_neighbors_x: gossip_peers_x, - our_neighbors_y: gossip_peers_y, - } - ), - ctx.sender(), - ); + for req in reqs { + network_service + .start_request(&mut authority_discovery_service, req, if_disconnected) + .await; + } + } + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + peer_set, + failed, + } => { + gum::trace!( + target: LOG_TARGET, + action = "ConnectToValidators", + peer_set = ?peer_set, + ids = ?validator_ids, + "Received a validator connection request", + ); + + metrics.note_desired_peer_count(peer_set, validator_ids.len()); + + let (ns, ads) = validator_discovery.on_request( + validator_ids, + peer_set, + failed, + network_service, + authority_discovery_service, + ).await; + + network_service = ns; + authority_discovery_service = ads; + } + NetworkBridgeMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + } => { + gum::trace!( + target: LOG_TARGET, + action = "ConnectToPeers", + peer_set = ?peer_set, + ?validator_addrs, + "Received a resolved validator connection request", + ); + + metrics.note_desired_peer_count(peer_set, validator_addrs.len()); + + let all_addrs = validator_addrs.into_iter().flatten().collect(); + network_service = validator_discovery.on_resolved_request( + all_addrs, + peer_set, + network_service, + ).await; + } + NetworkBridgeMessage::NewGossipTopology { + session, + our_neighbors_x, + our_neighbors_y, + } => { + gum::debug!( + target: LOG_TARGET, + action = "NewGossipTopology", + neighbors_x = our_neighbors_x.len(), + neighbors_y = our_neighbors_y.len(), + "Gossip topology has changed", + ); + + let gossip_peers_x = update_gossip_peers_1d( + &mut authority_discovery_service, + our_neighbors_x, + ).await; + + let gossip_peers_y = update_gossip_peers_1d( + &mut authority_discovery_service, + our_neighbors_y, + ).await; + + dispatch_validation_event_to_all_unbounded( + NetworkBridgeEvent::NewGossipTopology( + NewGossipTopology { + session, + our_neighbors_x: gossip_peers_x, + our_neighbors_y: gossip_peers_y, } - } - Err(e) => return Err(e.into()), - }, + ), + ctx.sender(), + ); } } } @@ -486,11 +489,11 @@ async fn handle_network_messages( mut authority_discovery_service: AD, metrics: Metrics, shared: Shared, -) -> Result<(), UnexpectedAbort> { +) -> Result<(), Error> { let mut network_stream = network_stream.fuse(); loop { match network_stream.next().await { - None => return Err(UnexpectedAbort::EventStreamConcluded), + None => return Err(Error::EventStreamConcluded), Some(NetworkEvent::Dht(_)) | Some(NetworkEvent::SyncConnected { .. }) | Some(NetworkEvent::SyncDisconnected { .. }) => {}, @@ -877,7 +880,7 @@ where .0 { Ok(()) => Ok(()), - Err(UnexpectedAbort::SubsystemError(err)) => { + Err(Error::SubsystemError(err)) => { gum::warn!( target: LOG_TARGET, err = ?err, @@ -889,7 +892,7 @@ where err ))) }, - Err(UnexpectedAbort::EventStreamConcluded) => { + Err(Error::EventStreamConcluded) => { gum::info!( target: LOG_TARGET, "Shutting down Network Bridge: underlying request stream concluded" From 73e410408cb71fda82aba1eb16e7ff61dde4f25b Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Wed, 25 May 2022 17:53:46 +0200 Subject: [PATCH 2/6] futures :: select! --- node/orchestra/proc-macro/src/impl_orchestra.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/orchestra/proc-macro/src/impl_orchestra.rs b/node/orchestra/proc-macro/src/impl_orchestra.rs index 62dc27e84573..66ff6c60939b 100644 --- a/node/orchestra/proc-macro/src/impl_orchestra.rs +++ b/node/orchestra/proc-macro/src/impl_orchestra.rs @@ -112,7 +112,7 @@ pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenS ).fuse(); loop { - select! { + #support_crate ::futures::select! { _ = self.running_subsystems.next() => if self.running_subsystems.is_empty() { break; From 57f99ed587f6cb5b2873c9a5cf4d9367976794f2 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Wed, 25 May 2022 17:54:02 +0200 Subject: [PATCH 3/6] rolling session window --- .../src/rolling_session_window.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index ba8a62d08118..86557355233d 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -31,13 +31,16 @@ use polkadot_node_subsystem::{ use thiserror::Error; /// Sessions unavailable in state to cache. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, thiserror::Error)] pub enum SessionsUnavailableReason { /// Runtime API subsystem was unavailable. - RuntimeApiUnavailable(oneshot::Canceled), + #[error(transparent)] + RuntimeApiUnavailable(#[from] oneshot::Canceled), /// The runtime API itself returned an error. - RuntimeApi(RuntimeApiError), + #[error(transparent)] + RuntimeApi(#[from] RuntimeApiError), /// Missing session info from runtime API for given `SessionIndex`. + #[error("Missing session index {0:?}")] Missing(SessionIndex), } @@ -53,20 +56,16 @@ pub struct SessionsUnavailableInfo { } /// Sessions were unavailable to fetch from the state for some reason. -#[derive(Debug, Error, Clone)] +#[derive(Debug, thiserror::Error, Clone)] +#[error("Sessions unavailable: {kind:?}, info: {info:?}")] pub struct SessionsUnavailable { /// The error kind. + #[source] kind: SessionsUnavailableReason, /// The info about the session window, if any. info: Option, } -impl core::fmt::Display for SessionsUnavailable { - fn fmt(&self, f: &mut core::fmt::Formatter) -> Result<(), core::fmt::Error> { - write!(f, "Sessions unavailable: {:?}, info: {:?}", self.kind, self.info) - } -} - /// An indicated update of the rolling session window. #[derive(Debug, PartialEq, Clone)] pub enum SessionWindowUpdate { From 5c5517d46cb20b66d72237b48fbfdcd9359da1ca Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Mon, 30 May 2022 13:57:14 +0200 Subject: [PATCH 4/6] fixup --- Cargo.lock | 2 ++ node/network/bridge/Cargo.toml | 2 ++ node/network/bridge/src/lib.rs | 38 +++++++++++++++++++++++----------- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3cea4995ffd..6f782e7abe4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6433,6 +6433,7 @@ dependencies = [ "assert_matches", "async-trait", "bytes", + "fatality", "futures 0.3.21", "futures-timer", "parity-scale-codec", @@ -6448,6 +6449,7 @@ dependencies = [ "sp-consensus", "sp-core", "sp-keyring", + "thiserror", "tracing-gum", ] diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index d625c7e94f17..12d5fbc708e1 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -19,6 +19,8 @@ polkadot-node-network-protocol = { path = "../protocol" } polkadot-node-subsystem-util = { path = "../../subsystem-util"} parking_lot = "0.12.0" bytes = "1" +fatality = "0.0.6" +thiserror = "1" [dev-dependencies] assert_matches = "1.4.0" diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 8df83243640b..18c32f8c3007 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -128,7 +128,7 @@ impl NetworkBridge { impl NetworkBridge where Net: Network + Sync, - AD: validator_discovery::AuthorityDiscovery + Clone, + AD: validator_discovery::AuthorityDiscovery + Clone + Sync, { fn start(mut self, ctx: Context) -> SpawnedSubsystem { // The stream of networking events has to be created at initialization, otherwise the @@ -204,7 +204,7 @@ where let mut mode = Mode::Syncing(sync_oracle); loop { - match ctx.recv().fuse()? { + match ctx.recv().fuse().await? { FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves)) => { let ActiveLeavesUpdate { activated, deactivated } = active_leaves; gum::trace!( @@ -262,15 +262,25 @@ where return Ok(()); } FromOrchestra::Communication { msg } => { - handle_incoming(&mut ctx, &mut network_service, msg).await; + (network_service, authority_discovery_service) = handle_incoming(&mut ctx, network_service, &mut validator_discovery, authority_discovery_service.clone(), msg, &metrics).await; } } } - Ok(()) } #[overseer::contextbounds(NetworkBridge, prefix = self::overseer)] -async fn handle_incoming(ctx: &mut Context, network_service: &mut N, msg: NetworkBridgeMessage) { +async fn handle_incoming( + ctx: &mut Context, + mut network_service: N, + validator_discovery: &mut validator_discovery::Service, + mut authority_discovery_service: AD, + msg: NetworkBridgeMessage, + metrics: &Metrics, +) -> (N, AD) +where + N: Network, + AD: validator_discovery::AuthorityDiscovery + Clone, +{ match msg { NetworkBridgeMessage::ReportPeer(peer, rep) => { if !rep.is_benefit() { @@ -391,7 +401,7 @@ async fn handle_incoming(ctx: &mut Context, network_service metrics.note_desired_peer_count(peer_set, validator_ids.len()); - let (ns, ads) = validator_discovery.on_request( + let (network_service, ads) = validator_discovery.on_request( validator_ids, peer_set, failed, @@ -399,8 +409,7 @@ async fn handle_incoming(ctx: &mut Context, network_service authority_discovery_service, ).await; - network_service = ns; - authority_discovery_service = ads; + return (network_service, ads); } NetworkBridgeMessage::ConnectToResolvedValidators { validator_addrs, @@ -417,11 +426,12 @@ async fn handle_incoming(ctx: &mut Context, network_service metrics.note_desired_peer_count(peer_set, validator_addrs.len()); let all_addrs = validator_addrs.into_iter().flatten().collect(); - network_service = validator_discovery.on_resolved_request( + let network_service = validator_discovery.on_resolved_request( all_addrs, peer_set, network_service, ).await; + return (network_service, authority_discovery_service); } NetworkBridgeMessage::NewGossipTopology { session, @@ -458,6 +468,7 @@ async fn handle_incoming(ctx: &mut Context, network_service ); } } + (network_service, authority_discovery_service) } async fn update_gossip_peers_1d( @@ -482,14 +493,17 @@ where peers } -async fn handle_network_messages( +async fn handle_network_messages( mut sender: impl overseer::NetworkBridgeSenderTrait, mut network_service: impl Network, network_stream: BoxStream<'static, NetworkEvent>, mut authority_discovery_service: AD, metrics: Metrics, shared: Shared, -) -> Result<(), Error> { +) -> Result<(), Error> +where + AD: validator_discovery::AuthorityDiscovery + Send, +{ let mut network_stream = network_stream.fuse(); loop { match network_stream.next().await { @@ -844,7 +858,7 @@ async fn run_network( ) -> SubsystemResult<()> where N: Network, - AD: validator_discovery::AuthorityDiscovery + Clone, + AD: validator_discovery::AuthorityDiscovery + Clone + Sync, { let shared = Shared::default(); From 32102129bfd49107df1271d2397409d40abbce91 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Mon, 30 May 2022 13:57:21 +0200 Subject: [PATCH 5/6] remove use statemetn --- node/subsystem-util/src/rolling_session_window.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index 86557355233d..dd9282b50fe5 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -28,7 +28,6 @@ use polkadot_node_subsystem::{ messages::{RuntimeApiMessage, RuntimeApiRequest}, overseer, }; -use thiserror::Error; /// Sessions unavailable in state to cache. #[derive(Debug, Clone, thiserror::Error)] From 90a093bb362541d6c219fa363a8ba5a9aa10a23b Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Mon, 30 May 2022 13:58:34 +0200 Subject: [PATCH 6/6] fmt --- node/network/bridge/src/lib.rs | 123 ++++++++++++++------------------- 1 file changed, 52 insertions(+), 71 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 18c32f8c3007..20c006085cd1 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -244,12 +244,9 @@ where ); } } - } + }, FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { - gum::trace!( - target: LOG_TARGET, - action = "BlockFinalized" - ); + gum::trace!(target: LOG_TARGET, action = "BlockFinalized"); debug_assert!(finalized_number < number); @@ -257,13 +254,19 @@ where // otherwise it might break assumptions of some of the subsystems // that we never send the same `ActiveLeavesUpdate` finalized_number = number; - } - FromOrchestra::Signal(OverseerSignal::Conclude) => { - return Ok(()); - } + }, + FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), FromOrchestra::Communication { msg } => { - (network_service, authority_discovery_service) = handle_incoming(&mut ctx, network_service, &mut validator_discovery, authority_discovery_service.clone(), msg, &metrics).await; - } + (network_service, authority_discovery_service) = handle_incoming( + &mut ctx, + network_service, + &mut validator_discovery, + authority_discovery_service.clone(), + msg, + &metrics, + ) + .await; + }, } } } @@ -284,17 +287,12 @@ where match msg { NetworkBridgeMessage::ReportPeer(peer, rep) => { if !rep.is_benefit() { - gum::debug!( - target: LOG_TARGET, - ?peer, - ?rep, - action = "ReportPeer" - ); + gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer"); } metrics.on_report_event(); network_service.report_peer(peer, rep); - } + }, NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { gum::trace!( target: LOG_TARGET, @@ -304,7 +302,7 @@ where ); network_service.disconnect_peer(peer, peer_set); - } + }, NetworkBridgeMessage::SendValidationMessage(peers, msg) => { gum::trace!( target: LOG_TARGET, @@ -320,7 +318,7 @@ where &metrics, ), } - } + }, NetworkBridgeMessage::SendValidationMessages(msgs) => { gum::trace!( target: LOG_TARGET, @@ -338,7 +336,7 @@ where ), } } - } + }, NetworkBridgeMessage::SendCollationMessage(peers, msg) => { gum::trace!( target: LOG_TARGET, @@ -354,7 +352,7 @@ where &metrics, ), } - } + }, NetworkBridgeMessage::SendCollationMessages(msgs) => { gum::trace!( target: LOG_TARGET, @@ -372,7 +370,7 @@ where ), } } - } + }, NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => { gum::trace!( target: LOG_TARGET, @@ -385,12 +383,8 @@ where .start_request(&mut authority_discovery_service, req, if_disconnected) .await; } - } - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - peer_set, - failed, - } => { + }, + NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, failed } => { gum::trace!( target: LOG_TARGET, action = "ConnectToValidators", @@ -401,20 +395,19 @@ where metrics.note_desired_peer_count(peer_set, validator_ids.len()); - let (network_service, ads) = validator_discovery.on_request( - validator_ids, - peer_set, - failed, - network_service, - authority_discovery_service, - ).await; - - return (network_service, ads); - } - NetworkBridgeMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - } => { + let (network_service, ads) = validator_discovery + .on_request( + validator_ids, + peer_set, + failed, + network_service, + authority_discovery_service, + ) + .await; + + return (network_service, ads) + }, + NetworkBridgeMessage::ConnectToResolvedValidators { validator_addrs, peer_set } => { gum::trace!( target: LOG_TARGET, action = "ConnectToPeers", @@ -426,18 +419,12 @@ where metrics.note_desired_peer_count(peer_set, validator_addrs.len()); let all_addrs = validator_addrs.into_iter().flatten().collect(); - let network_service = validator_discovery.on_resolved_request( - all_addrs, - peer_set, - network_service, - ).await; - return (network_service, authority_discovery_service); - } - NetworkBridgeMessage::NewGossipTopology { - session, - our_neighbors_x, - our_neighbors_y, - } => { + let network_service = validator_discovery + .on_resolved_request(all_addrs, peer_set, network_service) + .await; + return (network_service, authority_discovery_service) + }, + NetworkBridgeMessage::NewGossipTopology { session, our_neighbors_x, our_neighbors_y } => { gum::debug!( target: LOG_TARGET, action = "NewGossipTopology", @@ -446,27 +433,21 @@ where "Gossip topology has changed", ); - let gossip_peers_x = update_gossip_peers_1d( - &mut authority_discovery_service, - our_neighbors_x, - ).await; + let gossip_peers_x = + update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_x).await; - let gossip_peers_y = update_gossip_peers_1d( - &mut authority_discovery_service, - our_neighbors_y, - ).await; + let gossip_peers_y = + update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_y).await; dispatch_validation_event_to_all_unbounded( - NetworkBridgeEvent::NewGossipTopology( - NewGossipTopology { - session, - our_neighbors_x: gossip_peers_x, - our_neighbors_y: gossip_peers_y, - } - ), + NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session, + our_neighbors_x: gossip_peers_x, + our_neighbors_y: gossip_peers_y, + }), ctx.sender(), ); - } + }, } (network_service, authority_discovery_service) }