diff --git a/Cargo.lock b/Cargo.lock index d79c9e2ec6d1..5629bd933ee5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5144,6 +5144,7 @@ dependencies = [ "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", "polkadot-primitives", "sp-core", "sp-runtime", @@ -5499,6 +5500,7 @@ dependencies = [ "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", "polkadot-primitives", "sp-core", "sp-keyring", diff --git a/node/core/candidate-selection/src/lib.rs b/node/core/candidate-selection/src/lib.rs index 004df6639169..667963d8afb2 100644 --- a/node/core/candidate-selection/src/lib.rs +++ b/node/core/candidate-selection/src/lib.rs @@ -282,18 +282,18 @@ impl CandidateSelectionJob { candidate_receipt ); - let succeeded = + let result = if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await { log::warn!( target: TARGET, "failed to forward invalidity note: {:?}", err ); - false + Err(()) } else { - true + Ok(()) }; - self.metrics.on_invalid_selection(succeeded); + self.metrics.on_invalid_selection(result); } } @@ -363,11 +363,11 @@ async fn second_candidate( { Err(err) => { log::warn!(target: TARGET, "failed to send a seconding message"); - metrics.on_second(false); + metrics.on_second(Err(())); Err(err.into()) } Ok(_) => { - metrics.on_second(true); + metrics.on_second(Ok(())); Ok(()) } } @@ -391,21 +391,21 @@ struct MetricsInner { invalid_selections: prometheus::CounterVec, } -/// Candidate backing metrics. +/// Candidate selection metrics. #[derive(Default, Clone)] pub struct Metrics(Option); impl Metrics { - fn on_second(&self, succeeded: bool) { + fn on_second(&self, result: Result<(), ()>) { if let Some(metrics) = &self.0 { - let label = if succeeded { "succeeded" } else { "failed" }; + let label = if result.is_ok() { "succeeded" } else { "failed" }; metrics.seconds.with_label_values(&[label]).inc(); } } - fn on_invalid_selection(&self, succeeded: bool) { + fn on_invalid_selection(&self, result: Result<(), ()>) { if let Some(metrics) = &self.0 { - let label = if succeeded { "succeeded" } else { "failed" }; + let label = if result.is_ok() { "succeeded" } else { "failed" }; metrics.invalid_selections.with_label_values(&[label]).inc(); } } diff --git a/node/core/provisioner/src/lib.rs b/node/core/provisioner/src/lib.rs index 6579b95d9122..92f16c462be2 100644 --- a/node/core/provisioner/src/lib.rs +++ b/node/core/provisioner/src/lib.rs @@ -196,9 +196,9 @@ impl ProvisioningJob { .await { log::warn!(target: "provisioner", "failed to assemble or send inherent data: {:?}", err); - self.metrics.on_inherent_data_request(false); + self.metrics.on_inherent_data_request(Err(())); } else { - self.metrics.on_inherent_data_request(true); + self.metrics.on_inherent_data_request(Ok(())); } } ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => { @@ -467,17 +467,16 @@ struct MetricsInner { inherent_data_requests: prometheus::CounterVec, } -/// Candidate backing metrics. +/// Provisioner metrics. #[derive(Default, Clone)] pub struct Metrics(Option); impl Metrics { - fn on_inherent_data_request(&self, succeeded: bool) { + fn on_inherent_data_request(&self, response: Result<(), ()>) { if let Some(metrics) = &self.0 { - if succeeded { - metrics.inherent_data_requests.with_label_values(&["succeded"]).inc(); - } else { - metrics.inherent_data_requests.with_label_values(&["failed"]).inc(); + match response { + Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeded"]).inc(), + Err(()) => metrics.inherent_data_requests.with_label_values(&["failed"]).inc(), } } } diff --git a/node/network/availability-distribution/src/lib.rs b/node/network/availability-distribution/src/lib.rs index 2a5a08606575..be742368393f 100644 --- a/node/network/availability-distribution/src/lib.rs +++ b/node/network/availability-distribution/src/lib.rs @@ -49,6 +49,9 @@ use polkadot_subsystem::{ ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, }; +use polkadot_node_subsystem_util::{ + metrics::{self, prometheus}, +}; use polkadot_node_network_protocol::{ v1 as protocol_v1, View, ReputationChange as Rep, PeerId, NetworkBridgeEvent, @@ -292,6 +295,7 @@ async fn handle_network_msg( ctx: &mut Context, keystore: KeyStorePtr, state: &mut ProtocolState, + metrics: &Metrics, bridge_message: NetworkBridgeEvent, ) -> Result<()> where @@ -307,10 +311,10 @@ where state.peer_views.remove(&peerid); } NetworkBridgeEvent::PeerViewChange(peerid, view) => { - handle_peer_view_change(ctx, state, peerid, view).await?; + handle_peer_view_change(ctx, state, peerid, view, metrics).await?; } NetworkBridgeEvent::OurViewChange(view) => { - handle_our_view_change(ctx, keystore, state, view).await?; + handle_our_view_change(ctx, keystore, state, view, metrics).await?; } NetworkBridgeEvent::PeerMessage(remote, msg) => { let gossiped_availability = match msg { @@ -318,7 +322,7 @@ where AvailabilityGossipMessage { candidate_hash, erasure_chunk: chunk } }; - process_incoming_peer_message(ctx, state, remote, gossiped_availability).await?; + process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics).await?; } } Ok(()) @@ -331,6 +335,7 @@ async fn handle_our_view_change( keystore: KeyStorePtr, state: &mut ProtocolState, view: View, + metrics: &Metrics, ) -> Result<()> where Context: SubsystemContext, @@ -426,7 +431,7 @@ where erasure_chunk, }; - send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await?; + send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await?; } } @@ -442,31 +447,34 @@ where async fn send_tracked_gossip_message_to_peers( ctx: &mut Context, per_candidate: &mut PerCandidate, + metrics: &Metrics, peers: Vec, message: AvailabilityGossipMessage, ) -> Result<()> where Context: SubsystemContext, { - send_tracked_gossip_messages_to_peers(ctx, per_candidate, peers, iter::once(message)).await + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await } #[inline(always)] async fn send_tracked_gossip_messages_to_peer( ctx: &mut Context, per_candidate: &mut PerCandidate, + metrics: &Metrics, peer: PeerId, message_iter: impl IntoIterator, ) -> Result<()> where Context: SubsystemContext, { - send_tracked_gossip_messages_to_peers(ctx, per_candidate, vec![peer], message_iter).await + send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await } async fn send_tracked_gossip_messages_to_peers( ctx: &mut Context, per_candidate: &mut PerCandidate, + metrics: &Metrics, peers: Vec, message_iter: impl IntoIterator, ) -> Result<()> @@ -503,6 +511,8 @@ where )) .await .map_err::(Into::into)?; + + metrics.on_chunk_distributed(); } Ok(()) @@ -515,6 +525,7 @@ async fn handle_peer_view_change( state: &mut ProtocolState, origin: PeerId, view: View, + metrics: &Metrics, ) -> Result<()> where Context: SubsystemContext, @@ -560,7 +571,7 @@ where .cloned() .collect::>(); - send_tracked_gossip_messages_to_peer(ctx, per_candidate, origin.clone(), messages).await?; + send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await?; } Ok(()) } @@ -588,6 +599,7 @@ async fn process_incoming_peer_message( state: &mut ProtocolState, origin: PeerId, message: AvailabilityGossipMessage, + metrics: &Metrics, ) -> Result<()> where Context: SubsystemContext, @@ -694,13 +706,15 @@ where .collect::>(); // gossip that message to interested peers - send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await + send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await } /// The bitfield distribution subsystem. pub struct AvailabilityDistributionSubsystem { /// Pointer to a keystore, which is required for determining this nodes validator index. keystore: KeyStorePtr, + /// Prometheus metrics. + metrics: Metrics, } impl AvailabilityDistributionSubsystem { @@ -708,8 +722,8 @@ impl AvailabilityDistributionSubsystem { const K: usize = 3; /// Create a new instance of the availability distribution. - pub fn new(keystore: KeyStorePtr) -> Self { - Self { keystore } + pub fn new(keystore: KeyStorePtr, metrics: Metrics) -> Self { + Self { keystore, metrics } } /// Start processing work as passed on from the Overseer. @@ -729,7 +743,8 @@ impl AvailabilityDistributionSubsystem { &mut ctx, self.keystore.clone(), &mut state, - event + &self.metrics, + event, ).await { warn!( target: TARGET, @@ -1073,5 +1088,37 @@ where } +#[derive(Clone)] +struct MetricsInner { + gossipped_availability_chunks: prometheus::Counter, +} + +/// Availability Distribution metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_chunk_distributed(&self) { + if let Some(metrics) = &self.0 { + metrics.gossipped_availability_chunks.inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> std::result::Result { + let metrics = MetricsInner { + gossipped_availability_chunks: prometheus::register( + prometheus::Counter::new( + "parachain_gossipped_availability_chunks_total", + "Number of availability chunks gossipped to other peers." + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + #[cfg(test)] mod tests; diff --git a/node/network/availability-distribution/src/tests.rs b/node/network/availability-distribution/src/tests.rs index 0c27e4c21264..293bd1fffff8 100644 --- a/node/network/availability-distribution/src/tests.rs +++ b/node/network/availability-distribution/src/tests.rs @@ -72,7 +72,7 @@ fn test_harness>( let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - let subsystem = AvailabilityDistributionSubsystem::new(keystore); + let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default()); let subsystem = subsystem.run(context); let test_fut = test(TestHarness { virtual_overseer }); diff --git a/node/network/bitfield-distribution/Cargo.toml b/node/network/bitfield-distribution/Cargo.toml index b8b19cbecbda..908bc1654420 100644 --- a/node/network/bitfield-distribution/Cargo.toml +++ b/node/network/bitfield-distribution/Cargo.toml @@ -13,7 +13,7 @@ codec = { package="parity-scale-codec", version = "1.3.4" } node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } polkadot-primitives = { path = "../../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } -polkadot-node-subsystem-util = { package = "polkadot-node-subsystem-util", path = "../../subsystem-util" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-network-bridge = { path = "../../network/bridge" } polkadot-node-network-protocol = { path = "../../network/protocol" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/network/bitfield-distribution/src/lib.rs b/node/network/bitfield-distribution/src/lib.rs index 7b05b88b1787..0cb71ea2d8c8 100644 --- a/node/network/bitfield-distribution/src/lib.rs +++ b/node/network/bitfield-distribution/src/lib.rs @@ -28,6 +28,9 @@ use polkadot_subsystem::messages::*; use polkadot_subsystem::{ ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, }; +use polkadot_node_subsystem_util::{ + metrics::{self, prometheus}, +}; use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange}; use std::collections::{HashMap, HashSet}; @@ -130,11 +133,18 @@ impl PerRelayParentData { const TARGET: &'static str = "bitd"; /// The bitfield distribution subsystem. -pub struct BitfieldDistribution; +pub struct BitfieldDistribution { + metrics: Metrics, +} impl BitfieldDistribution { + /// Create a new instance of the `BitfieldDistribution` subsystem. + pub fn new(metrics: Metrics) -> Self { + Self { metrics } + } + /// Start processing work as passed on from the Overseer. - async fn run(mut ctx: Context) -> SubsystemResult<()> + async fn run(self, mut ctx: Context) -> SubsystemResult<()> where Context: SubsystemContext, { @@ -147,7 +157,7 @@ impl BitfieldDistribution { msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability), } => { trace!(target: TARGET, "Processing DistributeBitfield"); - handle_bitfield_distribution(&mut ctx, &mut state, hash, signed_availability) + handle_bitfield_distribution(&mut ctx, &mut state, &self.metrics, hash, signed_availability) .await?; } FromOverseer::Communication { @@ -155,7 +165,7 @@ impl BitfieldDistribution { } => { trace!(target: TARGET, "Processing NetworkMessage"); // a network message was received - if let Err(e) = handle_network_msg(&mut ctx, &mut state, event).await { + if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await { warn!(target: TARGET, "Failed to handle incomming network messages: {:?}", e); } } @@ -221,6 +231,7 @@ where async fn handle_bitfield_distribution( ctx: &mut Context, state: &mut ProtocolState, + metrics: &Metrics, relay_parent: Hash, signed_availability: SignedAvailabilityBitfield, ) -> SubsystemResult<()> @@ -262,6 +273,8 @@ where relay_message(ctx, job_data, peer_views, validator, msg).await?; + metrics.on_own_bitfield_gossipped(); + Ok(()) } @@ -330,6 +343,7 @@ where async fn process_incoming_peer_message( ctx: &mut Context, state: &mut ProtocolState, + metrics: &Metrics, origin: PeerId, message: BitfieldGossipMessage, ) -> SubsystemResult<()> @@ -388,6 +402,7 @@ where .check_signature(&signing_context, &validator) .is_ok() { + metrics.on_bitfield_received(); let one_per_validator = &mut (job_data.one_per_validator); // only relay_message a message of a validator once @@ -415,6 +430,7 @@ where async fn handle_network_msg( ctx: &mut Context, state: &mut ProtocolState, + metrics: &Metrics, bridge_message: NetworkBridgeEvent, ) -> SubsystemResult<()> where @@ -443,7 +459,7 @@ where relay_parent, signed_availability: bitfield, }; - process_incoming_peer_message(ctx, state, remote, gossiped_bitfield).await?; + process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await?; } } } @@ -564,7 +580,7 @@ where fn start(self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem { name: "bitfield-distribution-subsystem", - future: Box::pin(async move { Self::run(ctx) }.map(|_| ())), + future: Box::pin(async move { Self::run(self, ctx) }.map(|_| ())), } } } @@ -605,6 +621,53 @@ where } } +#[derive(Clone)] +struct MetricsInner { + gossipped_own_availability_bitfields: prometheus::Counter, + received_availability_bitfields: prometheus::Counter, +} + +/// Bitfield Distribution metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_own_bitfield_gossipped(&self) { + if let Some(metrics) = &self.0 { + metrics.gossipped_own_availability_bitfields.inc(); + } + } + + fn on_bitfield_received(&self) { + if let Some(metrics) = &self.0 { + metrics.received_availability_bitfields.inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + gossipped_own_availability_bitfields: prometheus::register( + prometheus::Counter::new( + "parachain_gossipped_own_availabilty_bitfields_total", + "Number of own availability bitfields sent to other peers." + )?, + registry, + )?, + received_availability_bitfields: prometheus::register( + prometheus::Counter::new( + "parachain_received_availabilty_bitfields_total", + "Number of valid availability bitfields received from other peers." + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + + #[cfg(test)] mod test { use super::*; @@ -748,6 +811,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()), )); @@ -801,6 +865,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()), )); @@ -854,6 +919,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerMessage( peer_b.clone(), msg.clone().into_network_message(), @@ -887,6 +953,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerMessage( peer_a.clone(), msg.clone().into_network_message(), @@ -907,6 +974,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerMessage( peer_b.clone(), msg.clone().into_network_message(), @@ -960,6 +1028,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), )); @@ -967,6 +1036,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a, hash_b]), )); @@ -976,6 +1046,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerMessage( peer_b.clone(), msg.clone().into_network_message(), @@ -1018,6 +1089,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![]), )); @@ -1032,6 +1104,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerMessage( peer_b.clone(), msg.clone().into_network_message(), @@ -1052,6 +1125,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerDisconnected(peer_b.clone()), )); @@ -1063,6 +1137,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, + &Default::default(), NetworkBridgeEvent::PeerMessage( peer_a.clone(), msg.clone().into_network_message(), diff --git a/node/network/pov-distribution/Cargo.toml b/node/network/pov-distribution/Cargo.toml index cedc94732cfb..24f2107ca18f 100644 --- a/node/network/pov-distribution/Cargo.toml +++ b/node/network/pov-distribution/Cargo.toml @@ -14,6 +14,7 @@ node-primitives = { package = "polkadot-node-primitives", path = "../../primitiv parity-scale-codec = "1.3.4" sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-node-network-protocol = { path = "../../network/protocol" } [dev-dependencies] diff --git a/node/network/pov-distribution/src/lib.rs b/node/network/pov-distribution/src/lib.rs index 994090c0d588..ccce091c1af2 100644 --- a/node/network/pov-distribution/src/lib.rs +++ b/node/network/pov-distribution/src/lib.rs @@ -22,9 +22,12 @@ use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor}; use polkadot_subsystem::{ ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem, + messages::{ + PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage, + }, }; -use polkadot_subsystem::messages::{ - PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage, +use polkadot_node_subsystem_util::{ + metrics::{self, prometheus}, }; use polkadot_node_network_protocol::{ v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View, @@ -46,7 +49,10 @@ const BENEFIT_LATE_POV: Rep = Rep::new(10, "Peer supplied us with an awaited PoV but was not the first to do so"); /// The PoV Distribution Subsystem. -pub struct PoVDistribution; +pub struct PoVDistribution { + // Prometheus metrics + metrics: Metrics, +} impl Subsystem for PoVDistribution where C: SubsystemContext @@ -56,7 +62,7 @@ impl Subsystem for PoVDistribution // within `run`. SpawnedSubsystem { name: "pov-distribution-subsystem", - future: run(ctx).map(|_| ()).boxed(), + future: self.run(ctx).map(|_| ()).boxed(), } } } @@ -65,6 +71,7 @@ struct State { relay_parent_state: HashMap, peer_state: HashMap, our_view: View, + metrics: Metrics, } struct BlockBasedState { @@ -206,6 +213,7 @@ async fn notify_one_we_are_awaiting_many( async fn distribute_to_awaiting( peers: &mut HashMap, ctx: &mut impl SubsystemContext, + metrics: &Metrics, relay_parent: Hash, pov_hash: Hash, pov: &PoV, @@ -230,7 +238,11 @@ async fn distribute_to_awaiting( ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, payload, - ))).await + ))).await?; + + metrics.on_pov_distributed(); + + Ok(()) } /// Handles a `FetchPoV` message. @@ -307,6 +319,7 @@ async fn handle_distribute( distribute_to_awaiting( &mut state.peer_state, ctx, + &state.metrics, relay_parent, descriptor.pov_hash, &*pov, @@ -436,6 +449,7 @@ async fn handle_incoming_pov( distribute_to_awaiting( &mut state.peer_state, ctx, + &state.metrics, relay_parent, pov_hash, &*pov, @@ -508,48 +522,91 @@ async fn handle_network_update( } } -async fn run( - mut ctx: impl SubsystemContext, -) -> SubsystemResult<()> { - let mut state = State { - relay_parent_state: HashMap::new(), - peer_state: HashMap::new(), - our_view: View(Vec::new()), - }; +impl PoVDistribution { + /// Create a new instance of `PovDistribution`. + pub fn new(metrics: Metrics) -> Self { + Self { metrics } + } - loop { - match ctx.recv().await? { - FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? { - return Ok(()); - }, - FromOverseer::Communication { msg } => match msg { - PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) => - handle_fetch( - &mut state, - &mut ctx, - relay_parent, - descriptor, - response_sender, - ).await?, - PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) => - handle_distribute( - &mut state, - &mut ctx, - relay_parent, - descriptor, - pov, - ).await?, - PoVDistributionMessage::NetworkBridgeUpdateV1(event) => - handle_network_update( - &mut state, - &mut ctx, - event, - ).await?, - }, + async fn run( + self, + mut ctx: impl SubsystemContext, + ) -> SubsystemResult<()> { + let mut state = State { + relay_parent_state: HashMap::new(), + peer_state: HashMap::new(), + our_view: View(Vec::new()), + metrics: self.metrics, + }; + + loop { + match ctx.recv().await? { + FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? { + return Ok(()); + }, + FromOverseer::Communication { msg } => match msg { + PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) => + handle_fetch( + &mut state, + &mut ctx, + relay_parent, + descriptor, + response_sender, + ).await?, + PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) => + handle_distribute( + &mut state, + &mut ctx, + relay_parent, + descriptor, + pov, + ).await?, + PoVDistributionMessage::NetworkBridgeUpdateV1(event) => + handle_network_update( + &mut state, + &mut ctx, + event, + ).await?, + }, + } } } } + + +#[derive(Clone)] +struct MetricsInner { + povs_distributed: prometheus::Counter, +} + +/// Availability Distribution metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_pov_distributed(&self) { + if let Some(metrics) = &self.0 { + metrics.povs_distributed.inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> std::result::Result { + let metrics = MetricsInner { + povs_distributed: prometheus::register( + prometheus::Counter::new( + "parachain_povs_distributed_total", + "Number of PoVs distributed to other peers." + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + #[cfg(test)] mod tests { use super::*; @@ -619,6 +676,7 @@ mod tests { s }, our_view: View(vec![hash_a, hash_b]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -698,6 +756,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -775,6 +834,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -846,6 +906,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -934,6 +995,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -997,6 +1059,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -1058,6 +1121,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -1116,6 +1180,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -1201,6 +1266,7 @@ mod tests { s }, our_view: View(vec![hash_a, hash_b]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -1263,6 +1329,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -1340,6 +1407,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); @@ -1422,6 +1490,7 @@ mod tests { s }, our_view: View(vec![hash_a]), + metrics: Default::default(), }; let pool = sp_core::testing::TaskExecutor::new(); diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml index c92fdcf6db90..d729162c1f46 100644 --- a/node/network/statement-distribution/Cargo.toml +++ b/node/network/statement-distribution/Cargo.toml @@ -16,6 +16,7 @@ parity-scale-codec = "1.3.4" sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-node-network-protocol = { path = "../../network/protocol" } arrayvec = "0.5.1" indexmap = "1.4.0" diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index d16a58d08b7f..4dac16bcc81c 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -22,10 +22,13 @@ use polkadot_subsystem::{ Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem, ActiveLeavesUpdate, FromOverseer, OverseerSignal, + messages::{ + AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage, + RuntimeApiMessage, RuntimeApiRequest, + }, }; -use polkadot_subsystem::messages::{ - AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage, - RuntimeApiMessage, RuntimeApiRequest, +use polkadot_node_subsystem_util::{ + metrics::{self, prometheus}, }; use node_primitives::SignedFullStatement; use polkadot_primitives::v1::{ @@ -59,8 +62,13 @@ const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::new( /// Typically we will only keep 1, but when a validator equivocates we will need to track 2. const VC_THRESHOLD: usize = 2; +const LOG_TARGET: &str = "statement_distribution"; + /// The statement distribution subsystem. -pub struct StatementDistribution; +pub struct StatementDistribution { + // Prometheus metrics + metrics: Metrics, +} impl Subsystem for StatementDistribution where C: SubsystemContext @@ -70,7 +78,7 @@ impl Subsystem for StatementDistribution // within `run`. SpawnedSubsystem { name: "statement-distribution-subsystem", - future: run(ctx).map(|_| ()).boxed(), + future: self.run(ctx).map(|_| ()).boxed(), } } } @@ -111,14 +119,7 @@ fn note_hash( ) -> bool { if observed.contains(&h) { return true; } - if observed.is_full() { - false - } else { - observed.try_push(h).expect("length of storage guarded above; \ - only panics if length exceeds capacity; qed"); - - true - } + observed.try_push(h).is_ok() } /// knowledge that a peer has about goings-on in a relay parent. @@ -502,6 +503,7 @@ async fn circulate_statement_and_dependents( ctx: &mut impl SubsystemContext, relay_parent: Hash, statement: SignedFullStatement, + metrics: &Metrics, ) -> SubsystemResult<()> { if let Some(active_head)= active_heads.get_mut(&relay_parent) { @@ -529,7 +531,8 @@ async fn circulate_statement_and_dependents( ctx, relay_parent, candidate_hash, - &*active_head + &*active_head, + metrics, ).await?; } } @@ -589,6 +592,7 @@ async fn send_statements_about( relay_parent: Hash, candidate_hash: Hash, active_head: &ActiveHeadData, + metrics: &Metrics, ) -> SubsystemResult<()> { for statement in active_head.statements_about(candidate_hash) { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { @@ -600,6 +604,8 @@ async fn send_statements_about( ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) )).await?; + + metrics.on_statement_distributed(); } } @@ -612,7 +618,8 @@ async fn send_statements( peer_data: &mut PeerData, ctx: &mut impl SubsystemContext, relay_parent: Hash, - active_head: &ActiveHeadData + active_head: &ActiveHeadData, + metrics: &Metrics, ) -> SubsystemResult<()> { for statement in active_head.statements() { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { @@ -624,6 +631,8 @@ async fn send_statements( ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) )).await?; + + metrics.on_statement_distributed(); } } @@ -652,6 +661,7 @@ async fn handle_incoming_message<'a>( active_heads: &'a mut HashMap, ctx: &mut impl SubsystemContext, message: protocol_v1::StatementDistributionMessage, + metrics: &Metrics, ) -> SubsystemResult> { let (relay_parent, statement) = match message { protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s), @@ -697,6 +707,7 @@ async fn handle_incoming_message<'a>( relay_parent, fingerprint.0.candidate_hash().clone(), &*active_head, + metrics, ).await? } Ok(false) => {} @@ -724,6 +735,7 @@ async fn update_peer_view_and_send_unlocked( ctx: &mut impl SubsystemContext, active_heads: &HashMap, new_view: View, + metrics: &Metrics, ) -> SubsystemResult<()> { let old_view = std::mem::replace(&mut peer_data.view, new_view); @@ -745,6 +757,7 @@ async fn update_peer_view_and_send_unlocked( ctx, new, active_head, + metrics, ).await?; } } @@ -758,6 +771,7 @@ async fn handle_network_update( ctx: &mut impl SubsystemContext, our_view: &mut View, update: NetworkBridgeEvent, + metrics: &Metrics, ) -> SubsystemResult<()> { match update { NetworkBridgeEvent::PeerConnected(peer, _role) => { @@ -782,6 +796,7 @@ async fn handle_network_update( active_heads, ctx, message, + metrics, ).await?; if let Some((relay_parent, new)) = new_stored { @@ -808,6 +823,7 @@ async fn handle_network_update( ctx, &*active_heads, view, + metrics, ).await } None => Ok(()), @@ -819,7 +835,7 @@ async fn handle_network_update( for new in our_view.difference(&old_view) { if !active_heads.contains_key(&new) { - log::warn!(target: "statement_distribution", "Our network bridge view update \ + log::warn!(target: LOG_TARGET, "Our network bridge view update \ inconsistent with `StartWork` messages we have received from overseer. \ Contains unknown hash {}", new); } @@ -831,94 +847,132 @@ async fn handle_network_update( } -async fn run( - mut ctx: impl SubsystemContext, -) -> SubsystemResult<()> { - let mut peers: HashMap = HashMap::new(); - let mut our_view = View::default(); - let mut active_heads: HashMap = HashMap::new(); - let mut statement_listeners: Vec> = Vec::new(); - - loop { - let message = ctx.recv().await?; - match message { - FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { - for relay_parent in activated { - let (validators, session_index) = { - let (val_tx, val_rx) = oneshot::channel(); - let (session_tx, session_rx) = oneshot::channel(); - - let val_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(val_tx), - ), - ); - let session_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(session_tx), - ), - ); - - ctx.send_messages( - std::iter::once(val_message).chain(std::iter::once(session_message)) - ).await?; - - match (val_rx.await?, session_rx.await?) { - (Ok(v), Ok(s)) => (v, s), - (Err(e), _) | (_, Err(e)) => { - log::warn!( - target: "statement_distribution", - "Failed to fetch runtime API data for active leaf: {:?}", - e, - ); - - // Lacking this bookkeeping might make us behave funny, although - // not in any slashable way. But we shouldn't take down the node - // on what are likely spurious runtime API errors. - continue; +impl StatementDistribution { + async fn run( + self, + mut ctx: impl SubsystemContext, + ) -> SubsystemResult<()> { + let mut peers: HashMap = HashMap::new(); + let mut our_view = View::default(); + let mut active_heads: HashMap = HashMap::new(); + let mut statement_listeners: Vec> = Vec::new(); + let metrics = self.metrics; + + loop { + let message = ctx.recv().await?; + match message { + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { + for relay_parent in activated { + let (validators, session_index) = { + let (val_tx, val_rx) = oneshot::channel(); + let (session_tx, session_rx) = oneshot::channel(); + + let val_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(val_tx), + ), + ); + let session_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(session_tx), + ), + ); + + ctx.send_messages( + std::iter::once(val_message).chain(std::iter::once(session_message)) + ).await?; + + match (val_rx.await?, session_rx.await?) { + (Ok(v), Ok(s)) => (v, s), + (Err(e), _) | (_, Err(e)) => { + log::warn!( + target: LOG_TARGET, + "Failed to fetch runtime API data for active leaf: {:?}", + e, + ); + + // Lacking this bookkeeping might make us behave funny, although + // not in any slashable way. But we shouldn't take down the node + // on what are likely spurious runtime API errors. + continue; + } } - } - }; + }; - active_heads.entry(relay_parent) - .or_insert(ActiveHeadData::new(validators, session_index)); + active_heads.entry(relay_parent) + .or_insert(ActiveHeadData::new(validators, session_index)); + } } - } - FromOverseer::Signal(OverseerSignal::BlockFinalized(_block_hash)) => { - // do nothing - } - FromOverseer::Signal(OverseerSignal::Conclude) => break, - FromOverseer::Communication { msg } => match msg { - StatementDistributionMessage::Share(relay_parent, statement) => { - inform_statement_listeners( - &statement, - &mut statement_listeners, - ).await; - circulate_statement_and_dependents( - &mut peers, - &mut active_heads, - &mut ctx, - relay_parent, - statement, - ).await?; + FromOverseer::Signal(OverseerSignal::BlockFinalized(_block_hash)) => { + // do nothing } - StatementDistributionMessage::NetworkBridgeUpdateV1(event) => - handle_network_update( - &mut peers, - &mut active_heads, - &mut ctx, - &mut our_view, - event, - ).await?, - StatementDistributionMessage::RegisterStatementListener(tx) => { - statement_listeners.push(tx); + FromOverseer::Signal(OverseerSignal::Conclude) => break, + FromOverseer::Communication { msg } => match msg { + StatementDistributionMessage::Share(relay_parent, statement) => { + inform_statement_listeners( + &statement, + &mut statement_listeners, + ).await; + circulate_statement_and_dependents( + &mut peers, + &mut active_heads, + &mut ctx, + relay_parent, + statement, + &metrics, + ).await?; + } + StatementDistributionMessage::NetworkBridgeUpdateV1(event) => + handle_network_update( + &mut peers, + &mut active_heads, + &mut ctx, + &mut our_view, + event, + &metrics, + ).await?, + StatementDistributionMessage::RegisterStatementListener(tx) => { + statement_listeners.push(tx); + } } } } + Ok(()) + } +} + +#[derive(Clone)] +struct MetricsInner { + statements_distributed: prometheus::Counter, +} + +/// Statement Distribution metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_statement_distributed(&self) { + if let Some(metrics) = &self.0 { + metrics.statements_distributed.inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> std::result::Result { + let metrics = MetricsInner { + statements_distributed: prometheus::register( + prometheus::Counter::new( + "parachain_statements_distributed_total", + "Number of candidate validity statements distributed to other peers." + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) } - Ok(()) } #[cfg(test)] @@ -1256,6 +1310,7 @@ mod tests { &mut ctx, &active_heads, new_view.clone(), + &Default::default(), ).await.unwrap(); assert_eq!(peer_data.view, new_view); diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 4589ee553c91..73453c5d2753 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -470,6 +470,7 @@ pub struct AllSubsystems, deactivated_heads_total: prometheus::Counter, + messages_relayed_total: prometheus::Counter, } #[derive(Default, Clone)] @@ -487,6 +488,12 @@ impl Metrics { metrics.deactivated_heads_total.inc(); } } + + fn on_message_relayed(&self) { + if let Some(metrics) = &self.0 { + metrics.messages_relayed_total.inc(); + } + } } impl metrics::Metrics for Metrics { @@ -506,6 +513,13 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + messages_relayed_total: prometheus::register( + prometheus::Counter::new( + "parachain_messages_relayed_total", + "Number of messages relayed by Overseer." + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } @@ -1046,10 +1060,11 @@ where } async fn route_message(&mut self, msg: AllMessages) { + self.metrics.on_message_relayed(); match msg { AllMessages::CandidateValidation(msg) => { if let Some(ref mut s) = self.candidate_validation_subsystem.instance { - let _= s.tx.send(FromOverseer::Communication { msg }).await; + let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } AllMessages::CandidateBacking(msg) => { @@ -1209,6 +1224,7 @@ fn spawn( #[cfg(test)] mod tests { use std::sync::atomic; + use std::collections::HashMap; use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; use polkadot_primitives::v1::{BlockData, CollatorPair, PoV}; @@ -1435,27 +1451,35 @@ mod tests { handler.block_imported(second_block).await.unwrap(); handler.block_imported(third_block).await.unwrap(); + handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap(); handler.stop().await.unwrap(); select! { res = overseer_fut => { assert!(res.is_ok()); - let (activated, deactivated) = extract_metrics(®istry); - assert_eq!(activated, 3); - assert_eq!(deactivated, 2); + let metrics = extract_metrics(®istry); + assert_eq!(metrics["activated"], 3); + assert_eq!(metrics["deactivated"], 2); + assert_eq!(metrics["relayed"], 1); }, complete => (), } }); } - fn extract_metrics(registry: &prometheus::Registry) -> (u64, u64) { + fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> { let gather = registry.gather(); assert_eq!(gather[0].get_name(), "parachain_activated_heads_total"); assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); + assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total"); let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; - (activated, deactivated) + let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64; + let mut result = HashMap::new(); + result.insert("activated", activated); + result.insert("deactivated", deactivated); + result.insert("relayed", relayed); + result } // Spawn a subsystem that immediately exits.