-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Handling timers for repeat dispute participation requests #6901
Changes from 24 commits
107f971
a8186fc
b6cb2c7
faa7eb5
bc7049e
7b0bfc0
56e7723
4185d4d
20ceda9
02e5608
6444e6d
008063f
207bb2a
3fcb639
ef611aa
1a4eb45
eed5535
27fd59f
d8e838f
9bd4dcf
fea1997
a51c115
0ac7f5b
e7f8112
70b21f3
81de5d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,7 +70,7 @@ pub struct ParticipationRequest { | |
| candidate_hash: CandidateHash, | ||
| candidate_receipt: CandidateReceipt, | ||
| session: SessionIndex, | ||
| _request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when request is dropped | ||
| request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when request is dropped | ||
| } | ||
|
|
||
| /// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. | ||
|
|
@@ -119,12 +119,7 @@ impl ParticipationRequest { | |
| session: SessionIndex, | ||
| request_timer: Option<prometheus::HistogramTimer>, | ||
| ) -> Self { | ||
| Self { | ||
| candidate_hash: candidate_receipt.hash(), | ||
| candidate_receipt, | ||
| session, | ||
| _request_timer: request_timer, | ||
| } | ||
| Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, request_timer } | ||
| } | ||
|
|
||
| pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt { | ||
|
|
@@ -147,15 +142,11 @@ impl ParticipationRequest { | |
| #[cfg(test)] | ||
| impl PartialEq for ParticipationRequest { | ||
| fn eq(&self, other: &Self) -> bool { | ||
| let ParticipationRequest { | ||
| candidate_receipt, | ||
| candidate_hash, | ||
| session: _session, | ||
| _request_timer, | ||
| } = self; | ||
| let ParticipationRequest { candidate_receipt, candidate_hash, session, request_timer: _ } = | ||
| self; | ||
| candidate_receipt == other.candidate_receipt() && | ||
| candidate_hash == other.candidate_hash() && | ||
| self.session == other.session() | ||
| *session == other.session() | ||
| } | ||
| } | ||
| #[cfg(test)] | ||
|
|
@@ -227,19 +218,44 @@ impl Queues { | |
| Ok(()) | ||
| } | ||
|
|
||
| /// Will put message in queue, either priority or best effort depending on priority. | ||
| /// | ||
| /// If the message was already previously present on best effort, it will be moved to priority | ||
| /// if it is considered priority now. | ||
| /// | ||
| /// Returns error in case a queue was found full already. | ||
| /// | ||
| /// # Request timers | ||
| /// | ||
| /// [`ParticipationRequest`]s contain request timers. | ||
| /// Where an old request would be replaced by a new one, we keep the old request. | ||
| /// This prevents request timers from resetting on each new request. | ||
| fn queue_with_comparator( | ||
| &mut self, | ||
| comparator: CandidateComparator, | ||
| priority: ParticipationPriority, | ||
| req: ParticipationRequest, | ||
| mut req: ParticipationRequest, | ||
| ) -> std::result::Result<(), QueueError> { | ||
| if priority.is_priority() { | ||
| if self.priority.len() >= PRIORITY_QUEUE_SIZE { | ||
| return Err(QueueError::PriorityFull) | ||
| } | ||
| // Remove any best effort entry: | ||
| self.best_effort.remove(&comparator); | ||
| self.priority.insert(comparator, req); | ||
| // Remove any best effort entry, using it to replace our new | ||
| // request. | ||
| if let Some(older_request) = self.best_effort.remove(&comparator) { | ||
| if let Some(timer) = req.request_timer { | ||
| timer.stop_and_discard(); | ||
| } | ||
| req = older_request; | ||
| } | ||
| // Keeping old request if any. | ||
| if self.priority.contains_key(&comparator) { | ||
| if let Some(timer) = req.request_timer { | ||
| timer.stop_and_discard(); | ||
| } | ||
| } else { | ||
| self.priority.insert(comparator, req); | ||
| } | ||
| self.metrics.report_priority_queue_size(self.priority.len() as u64); | ||
| self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); | ||
| } else { | ||
|
|
@@ -251,7 +267,14 @@ impl Queues { | |
| if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE { | ||
| return Err(QueueError::BestEffortFull) | ||
| } | ||
| self.best_effort.insert(comparator, req); | ||
| // Keeping old request if any. | ||
| if self.best_effort.contains_key(&comparator) { | ||
| if let Some(timer) = req.request_timer { | ||
| timer.stop_and_discard(); | ||
| } | ||
| } else { | ||
| self.best_effort.insert(comparator, req); | ||
|
||
| } | ||
| self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); | ||
| } | ||
| Ok(()) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.