Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
First draft of all metric code
  • Loading branch information
BradleyOlson64 committed Mar 7, 2023
commit a8186fc7f26c229ecbf78ff3fb2f2768ce573834
18 changes: 18 additions & 0 deletions node/core/dispute-coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ struct MetricsInner {
refrained_participations: prometheus::Counter<prometheus::U64>,
/// Distribution of participation durations.
participation_durations: prometheus::Histogram,
/// Measures the duration of the full participation pipeline: From when
/// a participation request is first queued to when participation in the
/// requested dispute is complete.
participation_pipeline_durations: prometheus::Histogram,
/// Size of participation priority queue
priority_queue_size: prometheus::Gauge<prometheus::U64>,
/// Size of participation best effort queue
Expand Down Expand Up @@ -108,6 +112,11 @@ impl Metrics {
self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer())
}

/// Provide a timer for participation pipeline durations which updates on drop.
pub(crate) fn time_participation_pipeline(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.participation_pipeline_durations.start_timer())
}

/// Set the priority_queue_size metric
pub fn report_priority_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
Expand Down Expand Up @@ -197,6 +206,15 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
participation_pipeline_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_pipeline_durations",
"Measures the duration of the full participation pipeline: From when a participation request is first queued to when participation in the requested dispute is complete.",
)
)?,
registry,
)?,
priority_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should the metric names be updated to match the new variable names? e.g. "polkadot_parachain_dispute_participation_priority_queue_size"

"Number of disputes waiting for local participation in the priority queue.")?,
Expand Down
16 changes: 11 additions & 5 deletions node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use queues::Queues;
pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

/// How many participation processes do we want to run in parallel the most.
///
Expand Down Expand Up @@ -157,19 +158,21 @@ impl Participation {
req: ParticipationRequest,
metrics: &Metrics,
) -> Result<()> {
let request_timer = metrics.time_participation_pipeline();

// Participation already running - we can ignore that request:
if self.running_participations.contains(req.candidate_hash()) {
return Ok(())
}
// Available capacity - participate right away (if we already have a recent block):
if let Some((_, h)) = self.recent_block {
if self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
self.fork_participation(ctx, req, h, metrics)?;
self.fork_participation(ctx, req, h, request_timer, metrics)?;
return Ok(())
}
}
// Out of capacity/no recent block yet - queue:
self.queue.queue(ctx.sender(), priority, req, metrics).await
self.queue.queue(ctx.sender(), priority, req, request_timer, metrics).await
}

/// Message from a worker task was received - get the outcome.
Expand Down Expand Up @@ -242,8 +245,9 @@ impl Participation {
metrics: &Metrics,
) -> FatalResult<()> {
while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
if let Some(req) = self.queue.dequeue(metrics) {
self.fork_participation(ctx, req, recent_head, metrics)?;
let (maybe_req, maybe_timer) = self.queue.dequeue(metrics);
if let Some(req) = maybe_req {
self.fork_participation(ctx, req, recent_head, maybe_timer, metrics)?;
} else {
break
}
Expand All @@ -257,13 +261,14 @@ impl Participation {
ctx: &mut Context,
req: ParticipationRequest,
recent_head: Hash,
request_timer: Option<prometheus::HistogramTimer>,
metrics: &Metrics,
) -> FatalResult<()> {
if self.running_participations.insert(*req.candidate_hash()) {
let sender = ctx.sender().clone();
ctx.spawn(
"participation-worker",
participate(self.worker_sender.clone(), sender, recent_head, req, metrics.clone()).boxed(),
participate(self.worker_sender.clone(), sender, recent_head, req, request_timer, metrics.clone()).boxed(),
)
.map_err(FatalError::SpawnFailed)?;
}
Expand All @@ -276,6 +281,7 @@ async fn participate(
mut sender: impl overseer::DisputeCoordinatorSenderTrait,
block_hash: Hash,
req: ParticipationRequest,
_request_timer: Option<prometheus::HistogramTimer>, // Stops timer and sends metric data when dropped
metrics: Metrics,
) {
let _measure_duration = metrics.time_participation();
Expand Down
38 changes: 29 additions & 9 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -58,6 +59,10 @@ pub struct Queues {

/// Priority queue.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Timer handle for each participation request. Stored to measure full request
/// completion time.
request_timers: BTreeMap<CandidateComparator, Option<prometheus::HistogramTimer>>,
}

/// A dispute participation request that can be queued.
Expand Down Expand Up @@ -131,7 +136,7 @@ impl ParticipationRequest {
impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), }
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), request_timers: BTreeMap::new(), }
}

/// Will put message in queue, either priority or best effort depending on priority.
Expand All @@ -145,24 +150,34 @@ impl Queues {
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
priority: ParticipationPriority,
req: ParticipationRequest,
timer: Option<prometheus::HistogramTimer>,
metrics: &Metrics,
) -> Result<()> {
let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?;

self.queue_with_comparator(comparator, priority, req, metrics)?;
self.queue_with_comparator(comparator, priority, req, timer, metrics)?;
Ok(())
}

/// Get the next best request for dispute participation if any.
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self, metrics: &Metrics) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
/// We also get the corresponding request timer, if any.
pub fn dequeue(&mut self, metrics: &Metrics) -> (Option<ParticipationRequest>, Option<prometheus::HistogramTimer>) {
if let Some((comp, req)) = self.pop_priority() {
metrics.report_priority_queue_size(self.priority.len() as u64);
return Some(req.1)
if let Some(maybe_timer) = self.request_timers.remove(&comp) {
return (Some(req), maybe_timer);
}
return (Some(req), None);
}
if let Some((comp, req)) = self.pop_best_effort() {
metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
if let Some(maybe_timer) = self.request_timers.remove(&comp) {
return (Some(req), maybe_timer);
}
return (Some(req), None);
}
let request = self.pop_best_effort().map(|d| d.1);
metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
request
(None, None)
}

/// Reprioritizes any participation requests pertaining to the
Expand Down Expand Up @@ -200,14 +215,18 @@ impl Queues {
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
timer: Option<prometheus::HistogramTimer>,
metrics: &Metrics,
) -> std::result::Result<(), QueueError> {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
self.best_effort.remove(&comparator);
if let None = self.best_effort.remove(&comparator) {
// Only insert new timer if request wasn't in either queue
self.request_timers.insert(comparator, timer);
}
self.priority.insert(comparator, req);
metrics.report_priority_queue_size(self.priority.len() as u64);
metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
Expand All @@ -221,6 +240,7 @@ impl Queues {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
self.request_timers.insert(comparator, timer);
metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
Expand Down