diff --git a/Cargo.lock b/Cargo.lock index f3cea4995ffd..6f782e7abe4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6433,6 +6433,7 @@ dependencies = [ "assert_matches", "async-trait", "bytes", + "fatality", "futures 0.3.21", "futures-timer", "parity-scale-codec", @@ -6448,6 +6449,7 @@ dependencies = [ "sp-consensus", "sp-core", "sp-keyring", + "thiserror", "tracing-gum", ] diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index d625c7e94f17..12d5fbc708e1 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -19,6 +19,8 @@ polkadot-node-network-protocol = { path = "../protocol" } polkadot-node-subsystem-util = { path = "../../subsystem-util"} parking_lot = "0.12.0" bytes = "1" +fatality = "0.0.6" +thiserror = "1" [dev-dependencies] assert_matches = "1.4.0" diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index f10f7fa5b67a..20c006085cd1 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -128,7 +128,7 @@ impl NetworkBridge { impl NetworkBridge where Net: Network + Sync, - AD: validator_discovery::AuthorityDiscovery + Clone, + AD: validator_discovery::AuthorityDiscovery + Clone + Sync, { fn start(mut self, ctx: Context) -> SpawnedSubsystem { // The stream of networking events has to be created at initialization, otherwise the @@ -150,23 +150,21 @@ struct PeerData { version: ProtocolVersion, } -#[derive(Debug)] -enum UnexpectedAbort { +#[fatality::fatality(splitable)] +enum Error { /// Received error from overseer: - SubsystemError(SubsystemError), + #[fatal] + #[error(transparent)] + SubsystemError(#[from] SubsystemError), /// The stream of incoming events concluded. + #[fatal] + #[error("Event stream closed unexpectedly")] EventStreamConcluded, } -impl From for UnexpectedAbort { - fn from(e: SubsystemError) -> Self { - UnexpectedAbort::SubsystemError(e) - } -} - -impl From for UnexpectedAbort { +impl From for Error { fn from(e: OverseerError) -> Self { - UnexpectedAbort::SubsystemError(SubsystemError::from(e)) + Error::SubsystemError(SubsystemError::from(e)) } } @@ -193,7 +191,7 @@ async fn handle_subsystem_messages( shared: Shared, sync_oracle: Box, metrics: Metrics, -) -> Result<(), UnexpectedAbort> +) -> Result<(), Error> where N: Network, AD: validator_discovery::AuthorityDiscovery + Clone, @@ -206,255 +204,252 @@ where let mut mode = Mode::Syncing(sync_oracle); loop { - futures::select! { - msg = ctx.recv().fuse() => match msg { - Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => { - let ActiveLeavesUpdate { activated, deactivated } = active_leaves; - gum::trace!( - target: LOG_TARGET, - action = "ActiveLeaves", - has_activated = activated.is_some(), - num_deactivated = %deactivated.len(), - ); - - for activated in activated { - let pos = live_heads - .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse()) - .unwrap_or_else(|i| i); - - live_heads.insert(pos, activated); - } - live_heads.retain(|h| !deactivated.contains(&h.hash)); - - // if we're done syncing, set the mode to `Mode::Active`. - // Otherwise, we don't need to send view updates. - { - let is_done_syncing = match mode { - Mode::Active => true, - Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(), - }; + match ctx.recv().fuse().await? { + FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves)) => { + let ActiveLeavesUpdate { activated, deactivated } = active_leaves; + gum::trace!( + target: LOG_TARGET, + action = "ActiveLeaves", + has_activated = activated.is_some(), + num_deactivated = %deactivated.len(), + ); - if is_done_syncing { - mode = Mode::Active; + for activated in activated { + let pos = live_heads + .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse()) + .unwrap_or_else(|i| i); - update_our_view( - &mut network_service, - &mut ctx, - &live_heads, - &shared, - finalized_number, - &metrics, - ); - } - } - } - Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number))) => { - gum::trace!( - target: LOG_TARGET, - action = "BlockFinalized" - ); - - debug_assert!(finalized_number < number); - - // we don't send the view updates here, but delay them until the next `ActiveLeaves` - // otherwise it might break assumptions of some of the subsystems - // that we never send the same `ActiveLeavesUpdate` - finalized_number = number; - } - Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => { - return Ok(()); + live_heads.insert(pos, activated); } - Ok(FromOrchestra::Communication { msg }) => match msg { - NetworkBridgeMessage::ReportPeer(peer, rep) => { - if !rep.is_benefit() { - gum::debug!( - target: LOG_TARGET, - ?peer, - ?rep, - action = "ReportPeer" - ); - } + live_heads.retain(|h| !deactivated.contains(&h.hash)); - metrics.on_report_event(); - network_service.report_peer(peer, rep); - } - NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { - gum::trace!( - target: LOG_TARGET, - action = "DisconnectPeer", - ?peer, - peer_set = ?peer_set, - ); - - network_service.disconnect_peer(peer, peer_set); - } - NetworkBridgeMessage::SendValidationMessage(peers, msg) => { - gum::trace!( - target: LOG_TARGET, - action = "SendValidationMessages", - num_messages = 1usize, - ); + // if we're done syncing, set the mode to `Mode::Active`. + // Otherwise, we don't need to send view updates. + { + let is_done_syncing = match mode { + Mode::Active => true, + Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(), + }; - match msg { - Versioned::V1(msg) => send_validation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - NetworkBridgeMessage::SendValidationMessages(msgs) => { - gum::trace!( - target: LOG_TARGET, - action = "SendValidationMessages", - num_messages = %msgs.len(), - ); + if is_done_syncing { + mode = Mode::Active; - for (peers, msg) in msgs { - match msg { - Versioned::V1(msg) => send_validation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - } - NetworkBridgeMessage::SendCollationMessage(peers, msg) => { - gum::trace!( - target: LOG_TARGET, - action = "SendCollationMessages", - num_messages = 1usize, + update_our_view( + &mut network_service, + &mut ctx, + &live_heads, + &shared, + finalized_number, + &metrics, ); - - match msg { - Versioned::V1(msg) => send_collation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } } - NetworkBridgeMessage::SendCollationMessages(msgs) => { - gum::trace!( - target: LOG_TARGET, - action = "SendCollationMessages", - num_messages = %msgs.len(), - ); + } + }, + FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { + gum::trace!(target: LOG_TARGET, action = "BlockFinalized"); - for (peers, msg) in msgs { - match msg { - Versioned::V1(msg) => send_collation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - } - NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => { - gum::trace!( - target: LOG_TARGET, - action = "SendRequests", - num_requests = %reqs.len(), - ); + debug_assert!(finalized_number < number); - for req in reqs { - network_service - .start_request(&mut authority_discovery_service, req, if_disconnected) - .await; - } - } - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - peer_set, - failed, - } => { - gum::trace!( - target: LOG_TARGET, - action = "ConnectToValidators", - peer_set = ?peer_set, - ids = ?validator_ids, - "Received a validator connection request", - ); + // we don't send the view updates here, but delay them until the next `ActiveLeaves` + // otherwise it might break assumptions of some of the subsystems + // that we never send the same `ActiveLeavesUpdate` + finalized_number = number; + }, + FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), + FromOrchestra::Communication { msg } => { + (network_service, authority_discovery_service) = handle_incoming( + &mut ctx, + network_service, + &mut validator_discovery, + authority_discovery_service.clone(), + msg, + &metrics, + ) + .await; + }, + } + } +} - metrics.note_desired_peer_count(peer_set, validator_ids.len()); +#[overseer::contextbounds(NetworkBridge, prefix = self::overseer)] +async fn handle_incoming( + ctx: &mut Context, + mut network_service: N, + validator_discovery: &mut validator_discovery::Service, + mut authority_discovery_service: AD, + msg: NetworkBridgeMessage, + metrics: &Metrics, +) -> (N, AD) +where + N: Network, + AD: validator_discovery::AuthorityDiscovery + Clone, +{ + match msg { + NetworkBridgeMessage::ReportPeer(peer, rep) => { + if !rep.is_benefit() { + gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer"); + } + + metrics.on_report_event(); + network_service.report_peer(peer, rep); + }, + NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { + gum::trace!( + target: LOG_TARGET, + action = "DisconnectPeer", + ?peer, + peer_set = ?peer_set, + ); - let (ns, ads) = validator_discovery.on_request( - validator_ids, - peer_set, - failed, - network_service, - authority_discovery_service, - ).await; + network_service.disconnect_peer(peer, peer_set); + }, + NetworkBridgeMessage::SendValidationMessage(peers, msg) => { + gum::trace!( + target: LOG_TARGET, + action = "SendValidationMessages", + num_messages = 1usize, + ); - network_service = ns; - authority_discovery_service = ads; - } - NetworkBridgeMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - } => { - gum::trace!( - target: LOG_TARGET, - action = "ConnectToPeers", - peer_set = ?peer_set, - ?validator_addrs, - "Received a resolved validator connection request", - ); + match msg { + Versioned::V1(msg) => send_validation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + }, + NetworkBridgeMessage::SendValidationMessages(msgs) => { + gum::trace!( + target: LOG_TARGET, + action = "SendValidationMessages", + num_messages = %msgs.len(), + ); - metrics.note_desired_peer_count(peer_set, validator_addrs.len()); + for (peers, msg) in msgs { + match msg { + Versioned::V1(msg) => send_validation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + } + }, + NetworkBridgeMessage::SendCollationMessage(peers, msg) => { + gum::trace!( + target: LOG_TARGET, + action = "SendCollationMessages", + num_messages = 1usize, + ); - let all_addrs = validator_addrs.into_iter().flatten().collect(); - network_service = validator_discovery.on_resolved_request( - all_addrs, - peer_set, - network_service, - ).await; - } - NetworkBridgeMessage::NewGossipTopology { - session, - our_neighbors_x, - our_neighbors_y, - } => { - gum::debug!( - target: LOG_TARGET, - action = "NewGossipTopology", - neighbors_x = our_neighbors_x.len(), - neighbors_y = our_neighbors_y.len(), - "Gossip topology has changed", - ); + match msg { + Versioned::V1(msg) => send_collation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + }, + NetworkBridgeMessage::SendCollationMessages(msgs) => { + gum::trace!( + target: LOG_TARGET, + action = "SendCollationMessages", + num_messages = %msgs.len(), + ); - let gossip_peers_x = update_gossip_peers_1d( - &mut authority_discovery_service, - our_neighbors_x, - ).await; - - let gossip_peers_y = update_gossip_peers_1d( - &mut authority_discovery_service, - our_neighbors_y, - ).await; - - dispatch_validation_event_to_all_unbounded( - NetworkBridgeEvent::NewGossipTopology( - NewGossipTopology { - session, - our_neighbors_x: gossip_peers_x, - our_neighbors_y: gossip_peers_y, - } - ), - ctx.sender(), - ); - } + for (peers, msg) in msgs { + match msg { + Versioned::V1(msg) => send_collation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), } - Err(e) => return Err(e.into()), - }, - } + } + }, + NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => { + gum::trace!( + target: LOG_TARGET, + action = "SendRequests", + num_requests = %reqs.len(), + ); + + for req in reqs { + network_service + .start_request(&mut authority_discovery_service, req, if_disconnected) + .await; + } + }, + NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, failed } => { + gum::trace!( + target: LOG_TARGET, + action = "ConnectToValidators", + peer_set = ?peer_set, + ids = ?validator_ids, + "Received a validator connection request", + ); + + metrics.note_desired_peer_count(peer_set, validator_ids.len()); + + let (network_service, ads) = validator_discovery + .on_request( + validator_ids, + peer_set, + failed, + network_service, + authority_discovery_service, + ) + .await; + + return (network_service, ads) + }, + NetworkBridgeMessage::ConnectToResolvedValidators { validator_addrs, peer_set } => { + gum::trace!( + target: LOG_TARGET, + action = "ConnectToPeers", + peer_set = ?peer_set, + ?validator_addrs, + "Received a resolved validator connection request", + ); + + metrics.note_desired_peer_count(peer_set, validator_addrs.len()); + + let all_addrs = validator_addrs.into_iter().flatten().collect(); + let network_service = validator_discovery + .on_resolved_request(all_addrs, peer_set, network_service) + .await; + return (network_service, authority_discovery_service) + }, + NetworkBridgeMessage::NewGossipTopology { session, our_neighbors_x, our_neighbors_y } => { + gum::debug!( + target: LOG_TARGET, + action = "NewGossipTopology", + neighbors_x = our_neighbors_x.len(), + neighbors_y = our_neighbors_y.len(), + "Gossip topology has changed", + ); + + let gossip_peers_x = + update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_x).await; + + let gossip_peers_y = + update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_y).await; + + dispatch_validation_event_to_all_unbounded( + NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session, + our_neighbors_x: gossip_peers_x, + our_neighbors_y: gossip_peers_y, + }), + ctx.sender(), + ); + }, } + (network_service, authority_discovery_service) } async fn update_gossip_peers_1d( @@ -479,18 +474,21 @@ where peers } -async fn handle_network_messages( +async fn handle_network_messages( mut sender: impl overseer::NetworkBridgeSenderTrait, mut network_service: impl Network, network_stream: BoxStream<'static, NetworkEvent>, mut authority_discovery_service: AD, metrics: Metrics, shared: Shared, -) -> Result<(), UnexpectedAbort> { +) -> Result<(), Error> +where + AD: validator_discovery::AuthorityDiscovery + Send, +{ let mut network_stream = network_stream.fuse(); loop { match network_stream.next().await { - None => return Err(UnexpectedAbort::EventStreamConcluded), + None => return Err(Error::EventStreamConcluded), Some(NetworkEvent::Dht(_)) | Some(NetworkEvent::SyncConnected { .. }) | Some(NetworkEvent::SyncDisconnected { .. }) => {}, @@ -841,7 +839,7 @@ async fn run_network( ) -> SubsystemResult<()> where N: Network, - AD: validator_discovery::AuthorityDiscovery + Clone, + AD: validator_discovery::AuthorityDiscovery + Clone + Sync, { let shared = Shared::default(); @@ -877,7 +875,7 @@ where .0 { Ok(()) => Ok(()), - Err(UnexpectedAbort::SubsystemError(err)) => { + Err(Error::SubsystemError(err)) => { gum::warn!( target: LOG_TARGET, err = ?err, @@ -889,7 +887,7 @@ where err ))) }, - Err(UnexpectedAbort::EventStreamConcluded) => { + Err(Error::EventStreamConcluded) => { gum::info!( target: LOG_TARGET, "Shutting down Network Bridge: underlying request stream concluded" diff --git a/node/orchestra/proc-macro/src/impl_orchestra.rs b/node/orchestra/proc-macro/src/impl_orchestra.rs index 62dc27e84573..66ff6c60939b 100644 --- a/node/orchestra/proc-macro/src/impl_orchestra.rs +++ b/node/orchestra/proc-macro/src/impl_orchestra.rs @@ -112,7 +112,7 @@ pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenS ).fuse(); loop { - select! { + #support_crate ::futures::select! { _ = self.running_subsystems.next() => if self.running_subsystems.is_empty() { break; diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index ba8a62d08118..dd9282b50fe5 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -28,16 +28,18 @@ use polkadot_node_subsystem::{ messages::{RuntimeApiMessage, RuntimeApiRequest}, overseer, }; -use thiserror::Error; /// Sessions unavailable in state to cache. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, thiserror::Error)] pub enum SessionsUnavailableReason { /// Runtime API subsystem was unavailable. - RuntimeApiUnavailable(oneshot::Canceled), + #[error(transparent)] + RuntimeApiUnavailable(#[from] oneshot::Canceled), /// The runtime API itself returned an error. - RuntimeApi(RuntimeApiError), + #[error(transparent)] + RuntimeApi(#[from] RuntimeApiError), /// Missing session info from runtime API for given `SessionIndex`. + #[error("Missing session index {0:?}")] Missing(SessionIndex), } @@ -53,20 +55,16 @@ pub struct SessionsUnavailableInfo { } /// Sessions were unavailable to fetch from the state for some reason. -#[derive(Debug, Error, Clone)] +#[derive(Debug, thiserror::Error, Clone)] +#[error("Sessions unavailable: {kind:?}, info: {info:?}")] pub struct SessionsUnavailable { /// The error kind. + #[source] kind: SessionsUnavailableReason, /// The info about the session window, if any. info: Option, } -impl core::fmt::Display for SessionsUnavailable { - fn fmt(&self, f: &mut core::fmt::Formatter) -> Result<(), core::fmt::Error> { - write!(f, "Sessions unavailable: {:?}, info: {:?}", self.kind, self.info) - } -} - /// An indicated update of the rolling session window. #[derive(Debug, PartialEq, Clone)] pub enum SessionWindowUpdate {