Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Filter unconfirmed disputes in provisioner - random_selection
* Rework dispute coordinator to return `DisputeStatus` with
`ActiveDisputes` message.
* Rework the random_selection implementation of `select_disptues` in
  `provisioner` to return only confirmed disputes.
  • Loading branch information
tdimitrov committed Nov 15, 2022
commit 2a0d5982d11ba1eb45ebcd6a0336c9c0f44b7895
4 changes: 3 additions & 1 deletion node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,9 @@ impl Initialized {

let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.map(|((session_idx, candidate_hash), dispute_status)| {
(session_idx, candidate_hash, dispute_status)
})
.collect(),
);
},
Expand Down
29 changes: 23 additions & 6 deletions node/core/dispute-coordinator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use futures::{

use polkadot_node_subsystem_util::database::Database;

use polkadot_node_primitives::{SignedDisputeStatement, SignedFullStatement, Statement};
use polkadot_node_primitives::{
DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement,
};
use polkadot_node_subsystem::{
messages::{
ApprovalVotingMessage, ChainApiMessage, DisputeCoordinatorMessage,
Expand Down Expand Up @@ -682,7 +684,10 @@ fn too_many_unconfirmed_statements_are_considered_spam() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -812,7 +817,10 @@ fn approval_vote_import_works() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -934,7 +942,10 @@ fn dispute_gets_confirmed_via_participation() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -1099,7 +1110,10 @@ fn dispute_gets_confirmed_at_byzantine_threshold() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Confirmed)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -1358,7 +1372,10 @@ fn conflicting_votes_lead_to_dispute_participation() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down
64 changes: 24 additions & 40 deletions node/core/provisioner/src/disputes/random_selection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,51 +42,35 @@ enum RequestType {
}

/// Request open disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_disputes(
/// Returns only confirmed/concluded disputes. The rest are filtered out.
async fn request_confirmed_disputes(
sender: &mut impl overseer::ProvisionerSenderTrait,
active_or_recent: RequestType,
) -> Vec<(SessionIndex, CandidateHash)> {
let disputes = match active_or_recent {
RequestType::Recent => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::RecentDisputes(tx);
sender.send_unbounded_message(msg);
let recent_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Channel closed: unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
recent_disputes
.into_iter()
.map(|(session_idx, candidate_hash, _)| (session_idx, candidate_hash))
.collect::<Vec<_>>()
},
RequestType::Active => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::ActiveDisputes(tx);
sender.send_unbounded_message(msg);
let active_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
active_disputes
let (tx, rx) = oneshot::channel();
let msg = match active_or_recent {
RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx),
RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx),
};

sender.send_unbounded_message(msg);
let disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Channel closed: unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};

disputes
.into_iter()
.filter(|d| d.2.is_confirmed_concluded())
.map(|d| (d.0, d.1))
.collect()
}

/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent.
Expand Down Expand Up @@ -132,15 +116,15 @@ where
// In case of an overload condition, we limit ourselves to active disputes, and fill up to the
// upper bound of disputes to pass to wasm `fn create_inherent_data`.
// If the active ones are already exceeding the bounds, randomly select a subset.
let recent = request_disputes(sender, RequestType::Recent).await;
let recent = request_confirmed_disputes(sender, RequestType::Recent).await;
let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
gum::warn!(
target: LOG_TARGET,
"Recent disputes are excessive ({} > {}), reduce to active ones, and selected",
recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME
);
let mut active = request_disputes(sender, RequestType::Active).await;
let mut active = request_confirmed_disputes(sender, RequestType::Active).await;
let n_active = active.len();
let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME);
Expand Down
4 changes: 3 additions & 1 deletion node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,9 @@ async fn get_active_disputes<Context>(

// Caller scope is in `update_leaves` and this is bounded by fork count.
ctx.send_unbounded_message(DisputeCoordinatorMessage::ActiveDisputes(tx));
rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
rx.await
.map_err(|_| JfyiError::AskActiveDisputesCanceled)
.map(|disputes| disputes.into_iter().map(|d| (d.0, d.1)).collect())
}

/// Get all locally available dispute votes for a given dispute.
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ pub enum DisputeCoordinatorMessage {
RecentDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash, DisputeStatus)>>),
/// Fetch a list of all active disputes that the coordinator is aware of.
/// These disputes are either not yet concluded or recently concluded.
ActiveDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash)>>),
ActiveDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash, DisputeStatus)>>),
/// Get candidate votes for a candidate.
QueryCandidateVotes(
Vec<(SessionIndex, CandidateHash)>,
Expand Down