diff --git a/node/network/collator-protocol/src/collator_side.rs b/node/network/collator-protocol/src/collator_side.rs index 297ccce7e389..ba00796742b2 100644 --- a/node/network/collator-protocol/src/collator_side.rs +++ b/node/network/collator-protocol/src/collator_side.rs @@ -29,6 +29,7 @@ use polkadot_subsystem::{ AllMessages, CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeMessage, }, + metrics::{self, prometheus}, }; use polkadot_node_network_protocol::{ v1 as protocol_v1, View, PeerId, PeerSet, NetworkBridgeEvent, RequestId, @@ -38,6 +39,54 @@ use polkadot_node_subsystem_util::{ request_validator_groups_ctx, }; +#[derive(Clone, Default)] +pub(super) struct Metrics(Option); + +impl Metrics { + fn on_advertisment_made(&self) { + if let Some(metrics) = &self.0 { + metrics.advertisments_made.inc(); + } + } + + fn on_collation_sent(&self) { + if let Some(metrics) = &self.0 { + metrics.collations_sent.inc(); + } + } +} + +#[derive(Clone)] +struct MetricsInner { + advertisments_made: prometheus::Counter, + collations_sent: prometheus::Counter, +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) + -> std::result::Result + { + let metrics = MetricsInner { + advertisments_made: prometheus::register( + prometheus::Counter::new( + "parachain_advertisments_made_total", + "A number of advertisments sent to validators.", + )?, + registry, + )?, + collations_sent: prometheus::register( + prometheus::Counter::new( + "parachain_collations_sent_total", + "A number of collations sent to validators.", + )?, + registry, + )?, + }; + + Ok(Metrics(Some(metrics))) + } +} + #[derive(Default)] struct State { /// Our id. @@ -69,6 +118,9 @@ struct State { /// Entries in this map will be cleared as validator groups in `our_validator_groups` /// go out of scope with their respective deactivated leafs. known_validators: HashMap, + + /// Metrics. + metrics: Metrics, } /// Distribute a collation. @@ -287,6 +339,8 @@ where ) )).await?; + state.metrics.on_advertisment_made(); + Ok(()) } @@ -367,6 +421,7 @@ where /// Issue a response to a previously requested collation. async fn send_collation( ctx: &mut Context, + state: &mut State, request_id: RequestId, origin: PeerId, receipt: CandidateReceipt, @@ -388,6 +443,8 @@ where ) )).await?; + state.metrics.on_collation_sent(); + Ok(()) } @@ -421,7 +478,7 @@ where Some(our_para_id) => { if our_para_id == para_id { if let Some(collation) = state.collations.get(&relay_parent).cloned() { - send_collation(ctx, request_id, origin, collation.0, collation.1).await?; + send_collation(ctx, state, request_id, origin, collation.0, collation.1).await?; } } else { warn!( @@ -555,14 +612,21 @@ async fn handle_our_view_change( } /// The collator protocol collator side main loop. -pub(crate) async fn run(mut ctx: Context, our_id: CollatorId) -> Result<()> +pub(crate) async fn run( + mut ctx: Context, + our_id: CollatorId, + metrics: Metrics, +) -> Result<()> where Context: SubsystemContext { use FromOverseer::*; use OverseerSignal::*; - let mut state = State::default(); + let mut state = State { + metrics, + ..Default::default() + }; state.our_id = our_id; @@ -597,7 +661,7 @@ mod tests { }; use polkadot_subsystem::ActiveLeavesUpdate; use polkadot_node_subsystem_util::TimeoutExt; - use polkadot_subsystem_testhelpers::{self as test_helpers}; + use polkadot_subsystem_testhelpers as test_helpers; use polkadot_node_network_protocol::ObservedRole; #[derive(Default)] @@ -719,7 +783,7 @@ mod tests { let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - let subsystem = run(context, collator_id); + let subsystem = run(context, collator_id, Metrics::default()); let test_fut = test(TestHarness { virtual_overseer }); diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index bf1c009b44f8..aa261fedf158 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -53,13 +53,15 @@ enum Error { RuntimeApi(RuntimeApiError), #[from] UtilError(util::Error), + #[from] + Prometheus(prometheus::PrometheusError), } type Result = std::result::Result; enum ProtocolSide { - Validator, - Collator(CollatorId), + Validator(validator_side::Metrics), + Collator(CollatorId, collator_side::Metrics), } /// The collator protocol subsystem. @@ -71,10 +73,12 @@ impl CollatorProtocolSubsystem { /// Start the collator protocol. /// If `id` is `Some` this is a collator side of the protocol. /// If `id` is `None` this is a validator side of the protocol. - pub fn new(id: Option) -> Self { + /// Caller must provide a registry for prometheus metrics. + pub fn new(id: Option, registry: Option<&prometheus::Registry>) -> Self { + use metrics::Metrics; let protocol_side = match id { - Some(id) => ProtocolSide::Collator(id), - None => ProtocolSide::Validator, + Some(id) => ProtocolSide::Collator(id, collator_side::Metrics::register(registry)), + None => ProtocolSide::Validator(validator_side::Metrics::register(registry)), }; Self { @@ -87,28 +91,26 @@ impl CollatorProtocolSubsystem { Context: SubsystemContext, { match self.protocol_side { - ProtocolSide::Validator => validator_side::run(ctx, REQUEST_TIMEOUT).await, - ProtocolSide::Collator(id) => collator_side::run(ctx, id).await, + ProtocolSide::Validator(metrics) => validator_side::run( + ctx, + REQUEST_TIMEOUT, + metrics, + ).await, + ProtocolSide::Collator(id, metrics) => collator_side::run( + ctx, + id, + metrics, + ).await, } } } -/// Collator protocol metrics. -#[derive(Default, Clone)] -pub struct Metrics; - -impl metrics::Metrics for Metrics { - fn try_register(_registry: &prometheus::Registry) - -> std::result::Result { - Ok(Metrics) - } -} - impl Subsystem for CollatorProtocolSubsystem where Context: SubsystemContext + Sync + Send, { - type Metrics = Metrics; + // The actual `Metrics` type depends on whether we're on the collator or validator side. + type Metrics = (); fn start(self, ctx: Context) -> SpawnedSubsystem { SpawnedSubsystem { diff --git a/node/network/collator-protocol/src/validator_side.rs b/node/network/collator-protocol/src/validator_side.rs index a41d2628b58a..39466ad66acb 100644 --- a/node/network/collator-protocol/src/validator_side.rs +++ b/node/network/collator-protocol/src/validator_side.rs @@ -34,6 +34,7 @@ use polkadot_subsystem::{ messages::{ AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage, }, + metrics::{self, prometheus}, }; use polkadot_node_network_protocol::{ v1 as protocol_v1, View, PeerId, ReputationChange as Rep, RequestId, @@ -48,6 +49,46 @@ const COST_REQUEST_TIMED_OUT: Rep = Rep::new(-20, "A collation request has timed const COST_REPORT_BAD: Rep = Rep::new(-50, "A collator was reported by another subsystem"); const BENEFIT_NOTIFY_GOOD: Rep = Rep::new(50, "A collator was noted good by another subsystem"); +#[derive(Clone, Default)] +pub(super) struct Metrics(Option); + +impl Metrics { + fn on_request(&self, succeeded: std::result::Result<(), ()>) { + if let Some(metrics) = &self.0 { + match succeeded { + Ok(()) => metrics.collation_requests.with_label_values(&["succeeded"]).inc(), + Err(()) => metrics.collation_requests.with_label_values(&["failed"]).inc(), + } + } + } +} + +#[derive(Clone)] +struct MetricsInner { + collation_requests: prometheus::CounterVec, +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) + -> std::result::Result + { + let metrics = MetricsInner { + collation_requests: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_collation_requests_total", + "Number of collations requested from Collators.", + ), + &["succeeded", "failed"], + )?, + registry, + )? + }; + + Ok(Metrics(Some(metrics))) + } +} + #[derive(Debug)] enum CollationRequestResult { Received(RequestId), @@ -134,6 +175,14 @@ struct State { /// Possessed collations. collations: HashMap<(Hash, ParaId), Vec<(CollatorId, CandidateReceipt, PoV)>>, + + /// Leaves have recently moved out of scope. + /// These are looked into when we receive previously requested collations that we + /// are no longer interested in. + recently_removed_heads: HashSet, + + /// Metrics. + metrics: Metrics, } /// Another subsystem has requested to fetch collations on a particular leaf for some para. @@ -291,6 +340,7 @@ where let _ = per_request.received.send(()); if let Some(collator_id) = state.known_collators.get(&origin) { let _ = per_request.result.send((receipt.clone(), pov.clone())); + state.metrics.on_request(Ok(())); state.collations .entry((relay_parent, para_id)) @@ -300,11 +350,11 @@ where } } } else { - // TODO: https://github.com/paritytech/polkadot/issues/1694 - // This is tricky. If our chain has moved on, we have already canceled - // the relevant request and removed it from the map; so and we are not expecting - // this reply although technically it is not a malicious behaviur. - modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?; + // If this collation is not just a delayed one that we were expecting, + // but our view has moved on, in that case modify peer's reputation. + if !state.recently_removed_heads.contains(&relay_parent) { + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?; + } } Ok(()) @@ -481,7 +531,11 @@ async fn handle_our_view_change( .cloned() .collect::>(); + // Update the set of recently removed chain heads. + state.recently_removed_heads.clear(); + for removed in removed.into_iter() { + state.recently_removed_heads.insert(removed.clone()); remove_relay_parent(state, removed).await?; } @@ -497,6 +551,8 @@ async fn request_timed_out( where Context: SubsystemContext { + state.metrics.on_request(Err(())); + // We have to go backwards in the map, again. if let Some(key) = find_val_in_map(&state.requested_collations, &id) { if let Some(_) = state.requested_collations.remove(&key) { @@ -595,7 +651,11 @@ where } /// The main run loop. -pub(crate) async fn run(mut ctx: Context, request_timeout: Duration) -> Result<()> +pub(crate) async fn run( + mut ctx: Context, + request_timeout: Duration, + metrics: Metrics, + ) -> Result<()> where Context: SubsystemContext { @@ -604,6 +664,7 @@ where let mut state = State { request_timeout, + metrics, ..Default::default() }; @@ -707,7 +768,7 @@ mod tests { let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - let subsystem = run(context, Duration::from_millis(50)); + let subsystem = run(context, Duration::from_millis(50), Metrics::default()); let test_fut = test(TestHarness { virtual_overseer });