Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor, placing timer in ParticipationRequest
  • Loading branch information
BradleyOlson64 committed Mar 9, 2023
commit 4185d4d55b1a035d535d5d5154bd5c9f0bd1ec47
3 changes: 2 additions & 1 deletion node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,12 +916,13 @@ impl Initialized {
} else {
self.metrics.on_queued_best_effort_participation();
}
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
let r = self
.participation
.queue_participation(
ctx,
priority,
ParticipationRequest::new(new_state.candidate_receipt().clone(), session),
ParticipationRequest::new(new_state.candidate_receipt().clone(), session, request_timer),
)
.await;
log_error(r)?;
Expand Down
2 changes: 2 additions & 0 deletions node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,13 @@ impl DisputeCoordinatorSubsystem {
?candidate_hash,
"Found valid dispute, with no vote from us on startup - participating."
);
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
participation_requests.push((
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(
vote_state.votes().candidate_receipt.clone(),
session,
request_timer,
),
));
}
Expand Down
15 changes: 5 additions & 10 deletions node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,19 @@ impl Participation {
priority: ParticipationPriority,
req: ParticipationRequest,
) -> Result<()> {
let request_timer = self.metrics.time_participation_pipeline();

// Participation already running - we can ignore that request:
if self.running_participations.contains(req.candidate_hash()) {
return Ok(())
}
// Available capacity - participate right away (if we already have a recent block):
if let Some((_, h)) = self.recent_block {
if self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
self.fork_participation(ctx, req, h, request_timer)?;
self.fork_participation(ctx, req, h)?;
return Ok(())
}
}
// Out of capacity/no recent block yet - queue:
self.queue.queue(ctx.sender(), priority, req, request_timer).await
self.queue.queue(ctx.sender(), priority, req).await
}

/// Message from a worker task was received - get the outcome.
Expand Down Expand Up @@ -243,9 +241,9 @@ impl Participation {
recent_head: Hash,
) -> FatalResult<()> {
while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
let (maybe_req, maybe_timer) = self.queue.dequeue();
let maybe_req = self.queue.dequeue();
if let Some(req) = maybe_req {
self.fork_participation(ctx, req, recent_head, maybe_timer)?;
self.fork_participation(ctx, req, recent_head)?;
} else {
break
}
Expand All @@ -259,7 +257,6 @@ impl Participation {
ctx: &mut Context,
req: ParticipationRequest,
recent_head: Hash,
request_timer: Option<prometheus::HistogramTimer>,
) -> FatalResult<()> {
let participation_timer = self.metrics.time_participation();
if self.running_participations.insert(*req.candidate_hash()) {
Expand All @@ -271,7 +268,6 @@ impl Participation {
sender,
recent_head,
req,
request_timer,
participation_timer,
)
.boxed(),
Expand All @@ -286,8 +282,7 @@ async fn participate(
mut result_sender: WorkerMessageSender,
mut sender: impl overseer::DisputeCoordinatorSenderTrait,
block_hash: Hash,
req: ParticipationRequest,
_request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
req: ParticipationRequest, // Sends metric data via request_timer field when dropped
_participation_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
) {
#[cfg(test)]
Expand Down
58 changes: 27 additions & 31 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{cmp::Ordering, collections::BTreeMap};
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand Down Expand Up @@ -60,21 +60,17 @@ pub struct Queues {
/// Priority queue.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Timer handle for each participation request. Stored to measure full request
/// completion time. Optimally these would have been stored in the participation
/// request itself, but HistogramTimer doesn't implement the Clone trait.
request_timers: BTreeMap<CandidateComparator, Option<prometheus::HistogramTimer>>,

/// Handle for recording queues data in metrics
metrics: Metrics,
}

/// A dispute participation request that can be queued.
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Clone)]
pub struct ParticipationRequest {
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
_request_timer: Arc<Option<prometheus::HistogramTimer>> // Sends metric data when request is dropped
}

/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
Expand Down Expand Up @@ -118,8 +114,8 @@ pub enum QueueError {

impl ParticipationRequest {
/// Create a new `ParticipationRequest` to be queued.
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self {
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session }
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex, request_timer: Arc<Option<prometheus::HistogramTimer>>) -> Self {
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, _request_timer: request_timer }
}

pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
Expand All @@ -135,6 +131,20 @@ impl ParticipationRequest {
let Self { candidate_hash, candidate_receipt, .. } = self;
(candidate_hash, candidate_receipt)
}
// For tests we want to check whether requests are equal, but the
// request_timer field of ParticipationRequest doesn't implement
// eq. This helper checks whether all other fields are equal,
// which is sufficient.
#[cfg(test)]
pub fn functionally_equal(&self, other: ParticipationRequest) -> bool {
if &self.candidate_receipt == other.candidate_receipt() &&
&self.candidate_hash == other.candidate_hash() &&
self.session == other.session()
{
return true;
}
false
}
}

impl Queues {
Expand All @@ -143,7 +153,6 @@ impl Queues {
Self {
best_effort: BTreeMap::new(),
priority: BTreeMap::new(),
request_timers: BTreeMap::new(),
metrics,
}
}
Expand All @@ -159,35 +168,27 @@ impl Queues {
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
priority: ParticipationPriority,
req: ParticipationRequest,
timer: Option<prometheus::HistogramTimer>,
) -> Result<()> {
let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?;

self.queue_with_comparator(comparator, priority, req, timer)?;
self.queue_with_comparator(comparator, priority, req)?;
Ok(())
}

/// Get the next best request for dispute participation if any.
/// First the priority queue is considered and then the best effort one.
/// We also get the corresponding request timer, if any.
pub fn dequeue(
&mut self,
) -> (Option<ParticipationRequest>, Option<prometheus::HistogramTimer>) {
if let Some((comp, req)) = self.pop_priority() {
) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
self.metrics.report_priority_queue_size(self.priority.len() as u64);
if let Some(maybe_timer) = self.request_timers.remove(&comp) {
return (Some(req), maybe_timer)
}
return (Some(req), None)
return Some(req.1)
}
if let Some((comp, req)) = self.pop_best_effort() {
if let Some(req) = self.pop_best_effort() {
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
if let Some(maybe_timer) = self.request_timers.remove(&comp) {
return (Some(req), maybe_timer)
}
return (Some(req), None)
return Some(req.1)
}
(None, None)
None
}

/// Reprioritizes any participation requests pertaining to the
Expand Down Expand Up @@ -223,17 +224,13 @@ impl Queues {
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
timer: Option<prometheus::HistogramTimer>,
) -> std::result::Result<(), QueueError> {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
if let None = self.best_effort.remove(&comparator) {
// Only insert new timer if request wasn't in either queue
self.request_timers.insert(comparator, timer);
}
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
Expand All @@ -247,7 +244,6 @@ impl Queues {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
self.request_timers.insert(comparator, timer);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
Expand Down
Loading