diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index ebe742ac8f37..a5a5ab962f5a 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -14,7 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{cmp::Ordering, collections::BTreeMap}; +use std::{ + cmp::Ordering, + collections::{btree_map::Entry, BTreeMap}, +}; use futures::channel::oneshot; use polkadot_node_subsystem::{messages::ChainApiMessage, overseer}; @@ -70,7 +73,7 @@ pub struct ParticipationRequest { candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, - _request_timer: Option, // Sends metric data when request is dropped + request_timer: Option, // Sends metric data when request is dropped } /// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. @@ -119,12 +122,7 @@ impl ParticipationRequest { session: SessionIndex, request_timer: Option, ) -> Self { - Self { - candidate_hash: candidate_receipt.hash(), - candidate_receipt, - session, - _request_timer: request_timer, - } + Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, request_timer } } pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt { @@ -147,15 +145,11 @@ impl ParticipationRequest { #[cfg(test)] impl PartialEq for ParticipationRequest { fn eq(&self, other: &Self) -> bool { - let ParticipationRequest { - candidate_receipt, - candidate_hash, - session: _session, - _request_timer, - } = self; + let ParticipationRequest { candidate_receipt, candidate_hash, session, request_timer: _ } = + self; candidate_receipt == other.candidate_receipt() && candidate_hash == other.candidate_hash() && - self.session == other.session() + *session == other.session() } } #[cfg(test)] @@ -227,19 +221,46 @@ impl Queues { Ok(()) } + /// Will put message in queue, either priority or best effort depending on priority. + /// + /// If the message was already previously present on best effort, it will be moved to priority + /// if it is considered priority now. + /// + /// Returns error in case a queue was found full already. + /// + /// # Request timers + /// + /// [`ParticipationRequest`]s contain request timers. + /// Where an old request would be replaced by a new one, we keep the old request. + /// This prevents request timers from resetting on each new request. fn queue_with_comparator( &mut self, comparator: CandidateComparator, priority: ParticipationPriority, - req: ParticipationRequest, + mut req: ParticipationRequest, ) -> std::result::Result<(), QueueError> { if priority.is_priority() { if self.priority.len() >= PRIORITY_QUEUE_SIZE { return Err(QueueError::PriorityFull) } - // Remove any best effort entry: - self.best_effort.remove(&comparator); - self.priority.insert(comparator, req); + // Remove any best effort entry, using it to replace our new + // request. + if let Some(older_request) = self.best_effort.remove(&comparator) { + if let Some(timer) = req.request_timer { + timer.stop_and_discard(); + } + req = older_request; + } + // Keeping old request if any. + match self.priority.entry(comparator) { + Entry::Occupied(_) => + if let Some(timer) = req.request_timer { + timer.stop_and_discard(); + }, + Entry::Vacant(vac) => { + vac.insert(req); + }, + } self.metrics.report_priority_queue_size(self.priority.len() as u64); self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } else { @@ -251,7 +272,16 @@ impl Queues { if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE { return Err(QueueError::BestEffortFull) } - self.best_effort.insert(comparator, req); + // Keeping old request if any. + match self.best_effort.entry(comparator) { + Entry::Occupied(_) => + if let Some(timer) = req.request_timer { + timer.stop_and_discard(); + }, + Entry::Vacant(vac) => { + vac.insert(req); + }, + } self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index f4623e76389c..e4ccd0de8d9f 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -46,7 +46,7 @@ fn clone_request(request: &ParticipationRequest) -> ParticipationRequest { candidate_receipt: request.candidate_receipt.clone(), candidate_hash: request.candidate_hash.clone(), session: request.session, - _request_timer: None, + request_timer: None, } }