diff --git a/Cargo.lock b/Cargo.lock index 5f90fba39664..60defeacbca9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6753,6 +6753,7 @@ dependencies = [ "assert_matches", "fatality", "futures 0.3.21", + "futures-timer", "kvdb", "kvdb-memorydb", "lru 0.7.5", diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index eba4c21e7113..b523bbca39ce 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -950,15 +950,12 @@ async fn handle_actions( dispute_statement, validator_index, } => { - // TODO: Log confirmation results in an efficient way: - // https://github.com/paritytech/polkadot/issues/5156 - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); ctx.send_message(DisputeCoordinatorMessage::ImportStatements { candidate_hash, candidate_receipt, session, statements: vec![(dispute_statement, validator_index)], - pending_confirmation, + pending_confirmation: None, }) .await; }, diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index 0bb6104dae3f..165b66828ae6 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -25,7 +25,6 @@ use polkadot_node_primitives::{ use polkadot_node_subsystem::{ messages::{ AllMessages, ApprovalVotingMessage, AssignmentCheckResult, AvailabilityRecoveryMessage, - ImportStatementsResult, }, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, }; @@ -605,11 +604,10 @@ async fn check_and_import_approval( overseer_recv(overseer).await, AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ImportStatements { candidate_hash: c_hash, - pending_confirmation, + pending_confirmation: None, .. }) => { assert_eq!(c_hash, candidate_hash); - let _ = pending_confirmation.send(ImportStatementsResult::ValidImport); } ); } diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index 9e65e6e1ab98..f7212db27077 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -887,16 +887,13 @@ impl CandidateBackingJob { if let (Some(candidate_receipt), Some(dispute_statement)) = (maybe_candidate_receipt, maybe_signed_dispute_statement) { - // TODO: Log confirmation results in an efficient way: - // https://github.com/paritytech/polkadot/issues/5156 - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); sender .send_message(DisputeCoordinatorMessage::ImportStatements { candidate_hash, candidate_receipt, session: self.session_index, statements: vec![(dispute_statement, validator_index)], - pending_confirmation, + pending_confirmation: None, }) .await; } diff --git a/node/core/backing/src/tests.rs b/node/core/backing/src/tests.rs index eab8b68b3309..5eaee56bcac7 100644 --- a/node/core/backing/src/tests.rs +++ b/node/core/backing/src/tests.rs @@ -28,9 +28,7 @@ use polkadot_primitives::v2::{ ScheduledCore, }; use polkadot_subsystem::{ - messages::{ - CollatorProtocolMessage, ImportStatementsResult, RuntimeApiMessage, RuntimeApiRequest, - }, + messages::{CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest}, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal, }; use sp_application_crypto::AppKey; @@ -284,7 +282,7 @@ async fn test_dispute_coordinator_notifications( candidate_receipt: c_receipt, session: s, statements, - pending_confirmation, + pending_confirmation: None, } ) => { assert_eq!(c_hash, candidate_hash); @@ -292,7 +290,6 @@ async fn test_dispute_coordinator_notifications( assert_eq!(s, session); assert_eq!(statements.len(), 1); assert_eq!(statements[0].1, validator_index); - let _ = pending_confirmation.send(ImportStatementsResult::ValidImport); } ) } diff --git a/node/core/dispute-coordinator/Cargo.toml b/node/core/dispute-coordinator/Cargo.toml index 9c207f876cd6..2d8ae793989d 100644 --- a/node/core/dispute-coordinator/Cargo.toml +++ b/node/core/dispute-coordinator/Cargo.toml @@ -29,6 +29,7 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } assert_matches = "1.4.0" test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" } +futures-timer = "3.0.2" [features] # If not enabled, the dispute coordinator will do nothing. diff --git a/node/core/dispute-coordinator/src/error.rs b/node/core/dispute-coordinator/src/error.rs index 5eeddf0a9583..e50a550bc605 100644 --- a/node/core/dispute-coordinator/src/error.rs +++ b/node/core/dispute-coordinator/src/error.rs @@ -82,7 +82,7 @@ pub enum Error { #[error(transparent)] Oneshot(#[from] oneshot::Canceled), - #[error("Dispute import confirmation send failed (receiver canceled)")] + #[error("Could not send import confirmation (receiver canceled)")] DisputeImportOneshotSend, #[error(transparent)] @@ -118,7 +118,7 @@ impl JfyiError { pub fn log(self) { match self { // don't spam the log with spurious errors - Self::Runtime(_) | Self::Oneshot(_) | Self::DisputeImportOneshotSend => { + Self::Runtime(_) | Self::Oneshot(_) => { gum::debug!(target: LOG_TARGET, error = ?self) }, // it's worth reporting otherwise diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index f29a2fe96635..b1b4e1c4844e 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -21,22 +21,18 @@ use std::{ sync::Arc, }; -use futures::{ - channel::{mpsc, oneshot}, - FutureExt, StreamExt, -}; -use lru::LruCache; +use futures::{channel::mpsc, FutureExt, StreamExt}; use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement, - DISPUTE_WINDOW, MAX_FINALITY_LAG, + DISPUTE_WINDOW, }; use polkadot_node_subsystem::{ messages::{ BlockDescription, DisputeCoordinatorMessage, DisputeDistributionMessage, - ImportStatementsResult, RuntimeApiMessage, RuntimeApiRequest, + ImportStatementsResult, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SubsystemContext, }; @@ -52,7 +48,7 @@ use polkadot_primitives::v2::{ use crate::{ error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, metrics::Metrics, - real::{ordering::get_finalized_block_number, DisputeCoordinatorSubsystem}, + real::DisputeCoordinatorSubsystem, status::{get_active_with_status, Clock, DisputeStatus, Timestamp}, LOG_TARGET, }; @@ -60,19 +56,15 @@ use crate::{ use super::{ backend::Backend, db, - ordering::{CandidateComparator, OrderingProvider}, participation::{ - self, Participation, ParticipationRequest, ParticipationStatement, WorkerMessageReceiver, + self, Participation, ParticipationPriority, ParticipationRequest, ParticipationStatement, + WorkerMessageReceiver, }, + scraping::ChainScraper, spam_slots::SpamSlots, OverlayedBackend, }; -// The capacity and scrape depth are equal to the maximum allowed unfinalized depth. -const LRU_SCRAPED_BLOCKS_CAPACITY: usize = MAX_FINALITY_LAG as usize; -// This is in sync with `MAX_FINALITY_LAG` in relay chain selection & node primitives. -const MAX_BATCH_SCRAPE_ANCESTORS: u32 = MAX_FINALITY_LAG; - /// After the first active leaves update we transition to `Initialized` state. /// /// Before the first active leaves update we can't really do much. We cannot check incoming @@ -84,14 +76,12 @@ pub struct Initialized { highest_session: SessionIndex, spam_slots: SpamSlots, participation: Participation, - ordering_provider: OrderingProvider, + scraper: ChainScraper, participation_receiver: WorkerMessageReceiver, metrics: Metrics, // This tracks only rolling session window failures. // It can be a `Vec` if the need to track more arises. error: Option, - /// Latest relay blocks that have been successfully scraped. - last_scraped_blocks: LruCache, } impl Initialized { @@ -100,7 +90,7 @@ impl Initialized { subsystem: DisputeCoordinatorSubsystem, rolling_session_window: RollingSessionWindow, spam_slots: SpamSlots, - ordering_provider: OrderingProvider, + scraper: ChainScraper, ) -> Self { let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem; @@ -113,12 +103,11 @@ impl Initialized { rolling_session_window, highest_session, spam_slots, - ordering_provider, + scraper, participation, participation_receiver, metrics, error: None, - last_scraped_blocks: LruCache::new(LRU_SCRAPED_BLOCKS_CAPACITY), } } @@ -129,7 +118,8 @@ impl Initialized { mut self, mut ctx: Context, mut backend: B, - mut participations: Vec<(Option, ParticipationRequest)>, + mut participations: Vec<(ParticipationPriority, ParticipationRequest)>, + mut votes: Vec, mut first_leaf: Option, clock: Box, ) -> FatalResult<()> @@ -144,6 +134,7 @@ impl Initialized { &mut ctx, &mut backend, &mut participations, + &mut votes, &mut first_leaf, &*clock, ) @@ -165,7 +156,8 @@ impl Initialized { &mut self, ctx: &mut Context, backend: &mut B, - participations: &mut Vec<(Option, ParticipationRequest)>, + participations: &mut Vec<(ParticipationPriority, ParticipationRequest)>, + on_chain_votes: &mut Vec, first_leaf: &mut Option, clock: &dyn Clock, ) -> Result<()> @@ -174,17 +166,31 @@ impl Initialized { Context: SubsystemContext, B: Backend, { - for (comparator, request) in participations.drain(..) { - self.participation.queue_participation(ctx, comparator, request).await?; + for (priority, request) in participations.drain(..) { + self.participation.queue_participation(ctx, priority, request).await?; } - if let Some(first_leaf) = first_leaf.take() { + + { let mut overlay_db = OverlayedBackend::new(backend); - self.scrape_on_chain_votes(ctx, &mut overlay_db, first_leaf.hash, clock.now()) - .await?; + for votes in on_chain_votes.drain(..) { + let _ = self + .process_on_chain_votes(ctx, &mut overlay_db, votes, clock.now()) + .await + .map_err(|error| { + gum::warn!( + target: LOG_TARGET, + ?error, + "Skipping scraping block due to error", + ); + }); + } if !overlay_db.is_empty() { let ops = overlay_db.into_write_ops(); backend.write(ops)?; } + } + + if let Some(first_leaf) = first_leaf.take() { // Also provide first leaf to participation for good measure. self.participation .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf)) @@ -230,7 +236,7 @@ impl Initialized { default_confirm }, FromOverseer::Signal(OverseerSignal::BlockFinalized(_, n)) => { - self.ordering_provider.process_finalized_block(&n); + self.scraper.process_finalized_block(&n); default_confirm }, FromOverseer::Communication { msg } => @@ -256,9 +262,8 @@ impl Initialized { update: ActiveLeavesUpdate, now: u64, ) -> Result<()> { - self.ordering_provider - .process_active_leaves_update(ctx.sender(), &update) - .await?; + let on_chain_votes = + self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { @@ -294,76 +299,14 @@ impl Initialized { Ok(SessionWindowUpdate::Unchanged) => {}, }; - // Scrape the head if above rolling session update went well. - if self.error.is_none() { - let _ = self - .scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now) - .await - .map_err(|err| { - gum::warn!( - target: LOG_TARGET, - "Skipping scraping block #{}({}) due to error: {}", - new_leaf.number, - new_leaf.hash, - err - ); - }); - } - - // Try to scrape any blocks for which we could not get the current session or did not receive an - // active leaves update. - let ancestors = match get_finalized_block_number(ctx.sender()).await { - Ok(block_number) => { - // Limit our search to last finalized block, or up to max finality lag. - let block_number = std::cmp::max( - block_number, - new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS), - ); - // Fetch ancestry up to and including the last finalized block. - // `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to - // pass in it's parent. - OrderingProvider::get_block_ancestors( - ctx.sender(), - new_leaf.hash, - new_leaf.number, - block_number.saturating_sub(1), - &mut self.last_scraped_blocks, - ) - .await - .unwrap_or_else(|err| { - gum::debug!( - target: LOG_TARGET, - activated_leaf = ?new_leaf, - error = ?err, - "Skipping leaf ancestors due to an error", - ); - // We assume this is a spurious error so we'll move forward with an - // empty ancestry. - Vec::new() - }) - }, - Err(err) => { - gum::debug!( - target: LOG_TARGET, - activated_leaf = ?new_leaf, - error = ?err, - "Skipping leaf ancestors scraping", - ); - // We assume this is a spurious error so we'll move forward with an - // empty ancestry. - Vec::new() - }, - }; - // The `runtime-api` subsystem has an internal queue which serializes the execution, // so there is no point in running these in parallel. - for ancestor in ancestors { - let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err( - |err| { + for votes in on_chain_votes { + let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err( + |error| { gum::warn!( target: LOG_TARGET, - hash = ?ancestor, - error = ?err, + ?error, "Skipping scraping block due to error", ); }, @@ -376,60 +319,17 @@ impl Initialized { /// Scrapes on-chain votes (backing votes and concluded disputes) for a active leaf of the /// relay chain. - async fn scrape_on_chain_votes( + async fn process_on_chain_votes( &mut self, ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), overlay_db: &mut OverlayedBackend<'_, impl Backend>, - new_leaf: Hash, + votes: ScrapedOnChainVotes, now: u64, ) -> Result<()> { - // Avoid scraping twice. - if self.last_scraped_blocks.get(&new_leaf).is_some() { - return Ok(()) - } - - // obtain the concluded disputes as well as the candidate backing votes - // from the new leaf - let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = { - let (tx, rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - new_leaf, - RuntimeApiRequest::FetchOnChainVotes(tx), - )) - .await; - match rx.await { - Ok(Ok(Some(val))) => val, - Ok(Ok(None)) => { - gum::trace!( - target: LOG_TARGET, - relay_parent = ?new_leaf, - "No on chain votes stored for relay chain leaf"); - return Ok(()) - }, - Ok(Err(e)) => { - gum::debug!( - target: LOG_TARGET, - relay_parent = ?new_leaf, - error = ?e, - "Could not retrieve on chain votes due to an API error"); - return Ok(()) - }, - Err(e) => { - gum::debug!( - target: LOG_TARGET, - relay_parent = ?new_leaf, - error = ?e, - "Could not retrieve onchain votes due to oneshot cancellation"); - return Ok(()) - }, - } - }; + let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes; if backing_validators_per_candidate.is_empty() && disputes.is_empty() { - // This block is not interesting as it doesnt contain any backing votes or disputes. We'll - // mark it here as scraped to prevent further processing. - self.last_scraped_blocks.put(new_leaf, ()); return Ok(()) } @@ -444,7 +344,6 @@ impl Initialized { } else { gum::warn!( target: LOG_TARGET, - relay_parent = ?new_leaf, ?session, "Could not retrieve session info from rolling session window", ); @@ -454,6 +353,7 @@ impl Initialized { // Scraped on-chain backing votes for the candidates with // the new active leaf as if we received them via gossip. for (candidate_receipt, backers) in backing_validators_per_candidate { + let relay_parent = candidate_receipt.descriptor.relay_parent; let candidate_hash = candidate_receipt.hash(); let statements = backers .into_iter() @@ -463,10 +363,11 @@ impl Initialized { .get(validator_index.0 as usize) .or_else(|| { gum::error!( - target: LOG_TARGET, - relay_parent = ?new_leaf, - "Missing public key for validator {:?}", - &validator_index); + target: LOG_TARGET, + ?session, + ?validator_index, + "Missing public key for validator", + ); None }) .cloned()?; @@ -474,9 +375,9 @@ impl Initialized { let valid_statement_kind = match attestation.to_compact_statement(candidate_hash) { CompactStatement::Seconded(_) => - ValidDisputeStatementKind::BackingSeconded(new_leaf), + ValidDisputeStatementKind::BackingSeconded(relay_parent), CompactStatement::Valid(_) => - ValidDisputeStatementKind::BackingValid(new_leaf), + ValidDisputeStatementKind::BackingValid(relay_parent), }; let signed_dispute_statement = SignedDisputeStatement::new_unchecked_from_trusted_source( @@ -502,22 +403,21 @@ impl Initialized { ) .await?; match import_result { - ImportStatementsResult::ValidImport => gum::trace!(target: LOG_TARGET, - relay_parent = ?new_leaf, - ?session, - "Imported backing vote from on-chain"), - ImportStatementsResult::InvalidImport => gum::warn!(target: LOG_TARGET, - relay_parent = ?new_leaf, - ?session, - "Attempted import of on-chain backing votes failed"), + ImportStatementsResult::ValidImport => gum::trace!( + target: LOG_TARGET, + ?relay_parent, + ?session, + "Imported backing votes from chain" + ), + ImportStatementsResult::InvalidImport => gum::warn!( + target: LOG_TARGET, + ?relay_parent, + ?session, + "Attempted import of on-chain backing votes failed" + ), } } - if disputes.is_empty() { - self.last_scraped_blocks.put(new_leaf, ()); - return Ok(()) - } - // Import concluded disputes from on-chain, this already went through a vote so it's assumed // as verified. This will only be stored, gossiping it is not necessary. @@ -535,7 +435,7 @@ impl Initialized { } else { gum::warn!( target: LOG_TARGET, - relay_parent = ?new_leaf, + ?candidate_hash, ?session, "Could not retrieve session info from rolling session window for recently concluded dispute"); return None @@ -547,7 +447,7 @@ impl Initialized { .or_else(|| { gum::error!( target: LOG_TARGET, - relay_parent = ?new_leaf, + ?candidate_hash, ?session, "Missing public key for validator {:?} that participated in concluded dispute", &validator_index); @@ -580,20 +480,21 @@ impl Initialized { ) .await?; match import_result { - ImportStatementsResult::ValidImport => gum::trace!(target: LOG_TARGET, - relay_parent = ?new_leaf, - ?candidate_hash, - ?session, - "Imported statement of concluded dispute from on-chain"), - ImportStatementsResult::InvalidImport => gum::warn!(target: LOG_TARGET, - relay_parent = ?new_leaf, - ?candidate_hash, - ?session, - "Attempted import of on-chain statement of concluded dispute failed"), + ImportStatementsResult::ValidImport => gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + ?session, + "Imported statement of concluded dispute from on-chain" + ), + ImportStatementsResult::InvalidImport => gum::warn!( + target: LOG_TARGET, + ?candidate_hash, + ?session, + "Attempted import of on-chain statement of concluded dispute failed" + ), } } - self.last_scraped_blocks.put(new_leaf, ()); Ok(()) } @@ -623,11 +524,13 @@ impl Initialized { now, ) .await?; - let report = move || { - pending_confirmation + let report = move || match pending_confirmation { + Some(pending_confirmation) => pending_confirmation .send(outcome) - .map_err(|_| JfyiError::DisputeImportOneshotSend) + .map_err(|_| JfyiError::DisputeImportOneshotSend), + None => Ok(()), }; + match outcome { ImportStatementsResult::InvalidImport => { report()?; @@ -811,14 +714,13 @@ impl Initialized { our_votes.retain(|index| controlled_indices.contains(index)); !our_votes.is_empty() }; + let was_confirmed = recent_disputes .get(&(session, candidate_hash)) .map_or(false, |s| s.is_confirmed_concluded()); - let comparator = self - .ordering_provider - .candidate_comparator(ctx.sender(), &candidate_receipt) - .await?; - let is_included = comparator.is_some(); + + let is_included = self.scraper.is_candidate_included(&candidate_receipt.hash()); + let is_local = statements .iter() .find(|(_, index)| controlled_indices.contains(index)) @@ -927,13 +829,14 @@ impl Initialized { // Participate in dispute if the imported vote was not local, we did not vote before either // and we actually have keys to issue a local vote. if !is_local && !voted_already && is_disputed && !controlled_indices.is_empty() { + let priority = ParticipationPriority::with_priority_if(is_included); gum::trace!( target: LOG_TARGET, candidate_hash = ?candidate_receipt.hash(), - priority = ?comparator.is_some(), + ?priority, "Queuing participation for candidate" ); - if comparator.is_some() { + if priority.is_priority() { self.metrics.on_queued_priority_participation(); } else { self.metrics.on_queued_best_effort_participation(); @@ -944,7 +847,7 @@ impl Initialized { .participation .queue_participation( ctx, - comparator, + priority, ParticipationRequest::new(candidate_receipt, session, n_validators), ) .await; diff --git a/node/core/dispute-coordinator/src/real/mod.rs b/node/core/dispute-coordinator/src/real/mod.rs index 9304ba620b07..139760b641e4 100644 --- a/node/core/dispute-coordinator/src/real/mod.rs +++ b/node/core/dispute-coordinator/src/real/mod.rs @@ -38,7 +38,7 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_util::{ database::Database, rolling_session_window::RollingSessionWindow, }; -use polkadot_primitives::v2::{ValidatorIndex, ValidatorPair}; +use polkadot_primitives::v2::{ScrapedOnChainVotes, ValidatorIndex, ValidatorPair}; use crate::{ error::{FatalResult, JfyiError, Result}, @@ -50,8 +50,7 @@ use db::v1::DbBackend; use fatality::Split; use self::{ - ordering::CandidateComparator, - participation::ParticipationRequest, + participation::{ParticipationPriority, ParticipationRequest}, spam_slots::{SpamSlots, UnconfirmedDisputes}, }; @@ -62,8 +61,7 @@ pub(crate) mod db; mod initialized; use initialized::Initialized; -/// Provider of an ordering for candidates for dispute participation, see -/// [`participation`] below. +/// Provider of data scraped from chain. /// /// If we have seen a candidate included somewhere, we should treat it as priority and will be able /// to provide an ordering for participation. Thus a dispute for a candidate where we can get some @@ -71,8 +69,8 @@ use initialized::Initialized; /// `participation` based on `relay_parent` block number and other metrics, so each validator will /// participate in disputes in a similar order, which ensures we will be resolving disputes, even /// under heavy load. -mod ordering; -use ordering::OrderingProvider; +mod scraping; +use scraping::ChainScraper; /// When importing votes we will check via the `ordering` module, whether or not we know of the /// candidate to be included somewhere. If not, the votes might be spam, in this case we want to @@ -162,13 +160,15 @@ impl DisputeCoordinatorSubsystem { { let res = self.initialize(&mut ctx, backend, &*clock).await?; - let (participations, first_leaf, initialized, backend) = match res { + let (participations, votes, first_leaf, initialized, backend) = match res { // Concluded: None => return Ok(()), Some(r) => r, }; - initialized.run(ctx, backend, participations, Some(first_leaf), clock).await + initialized + .run(ctx, backend, participations, votes, Some(first_leaf), clock) + .await } /// Make sure to recover participations properly on startup. @@ -179,7 +179,8 @@ impl DisputeCoordinatorSubsystem { clock: &(dyn Clock), ) -> FatalResult< Option<( - Vec<(Option, ParticipationRequest)>, + Vec<(ParticipationPriority, ParticipationRequest)>, + Vec, ActivatedLeaf, Initialized, B, @@ -203,12 +204,8 @@ impl DisputeCoordinatorSubsystem { }, }; - // Before we move to the initialized state we need to check if we got at - // least on finality notification to prevent large ancestry block scraping, - // when the node is syncing. - let mut overlay_db = OverlayedBackend::new(&mut backend); - let (participations, spam_slots, ordering_provider) = match self + let (participations, votes, spam_slots, ordering_provider) = match self .handle_startup( ctx, first_leaf.clone(), @@ -231,6 +228,7 @@ impl DisputeCoordinatorSubsystem { return Ok(Some(( participations, + votes, first_leaf, Initialized::new(self, rolling_session_window, spam_slots, ordering_provider), backend, @@ -251,9 +249,10 @@ impl DisputeCoordinatorSubsystem { overlay_db: &mut OverlayedBackend<'_, impl Backend>, clock: &dyn Clock, ) -> Result<( - Vec<(Option, ParticipationRequest)>, + Vec<(ParticipationPriority, ParticipationRequest)>, + Vec, SpamSlots, - OrderingProvider, + ChainScraper, )> where Context: overseer::SubsystemContext, @@ -274,7 +273,7 @@ impl DisputeCoordinatorSubsystem { let mut participation_requests = Vec::new(); let mut unconfirmed_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new(); - let mut ordering_provider = OrderingProvider::new(ctx.sender(), initial_head).await?; + let (mut scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?; for ((session, ref candidate_hash), status) in active_disputes { let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) { @@ -322,10 +321,7 @@ impl DisputeCoordinatorSubsystem { .map_or(false, |v| v.is_some()) }); - let candidate_comparator = ordering_provider - .candidate_comparator(ctx.sender(), &votes.candidate_receipt) - .await?; - let is_included = candidate_comparator.is_some(); + let is_included = scraper.is_candidate_included(&votes.candidate_receipt.hash()); if !status.is_confirmed_concluded() && !is_included { unconfirmed_disputes.insert((session, *candidate_hash), voted_indices); @@ -335,7 +331,7 @@ impl DisputeCoordinatorSubsystem { // recorded local statement. if missing_local_statement { participation_requests.push(( - candidate_comparator, + ParticipationPriority::with_priority_if(is_included), ParticipationRequest::new( votes.candidate_receipt.clone(), session, @@ -347,8 +343,9 @@ impl DisputeCoordinatorSubsystem { Ok(( participation_requests, + votes, SpamSlots::recover_from_state(unconfirmed_disputes), - ordering_provider, + scraper, )) } } diff --git a/node/core/dispute-coordinator/src/real/ordering/mod.rs b/node/core/dispute-coordinator/src/real/ordering/mod.rs deleted file mode 100644 index b7250aed561c..000000000000 --- a/node/core/dispute-coordinator/src/real/ordering/mod.rs +++ /dev/null @@ -1,378 +0,0 @@ -// Copyright 2021 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -use std::{ - cmp::{Ord, Ordering, PartialOrd}, - collections::{BTreeMap, HashSet}, -}; - -use futures::channel::oneshot; -use lru::LruCache; - -use polkadot_node_subsystem::{ - messages::ChainApiMessage, ActivatedLeaf, ActiveLeavesUpdate, ChainApiError, SubsystemSender, -}; -use polkadot_node_subsystem_util::runtime::get_candidate_events; -use polkadot_primitives::v2::{BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash}; - -use crate::{ - error::{FatalError, FatalResult, Result}, - LOG_TARGET, -}; - -#[cfg(test)] -mod tests; - -const LRU_OBSERVED_BLOCKS_CAPACITY: usize = 20; - -/// Provider of `CandidateComparator` for candidates. -pub struct OrderingProvider { - /// All candidates we have seen included, which not yet have been finalized. - included_candidates: HashSet, - /// including block -> `CandidateHash` - /// - /// We need this to clean up `included_candidates` on `ActiveLeavesUpdate`. - candidates_by_block_number: BTreeMap>, - /// Latest relay blocks observed by the provider. We assume that ancestors of - /// cached blocks are already processed, i.e. we have saved corresponding - /// included candidates. - last_observed_blocks: LruCache, -} - -/// `Comparator` for ordering of disputes for candidates. -/// -/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness -/// between chains in case of equally old disputes. -/// -/// Objective ordering between nodes is important in case of lots disputes, so nodes will pull in -/// the same direction and work on resolving the same disputes first. This ensures that we will -/// conclude some disputes, even if there are lots of them. While any objective ordering would -/// suffice for this goal, ordering by age ensures we are not only resolving disputes, but also -/// resolve the oldest one first, which are also the most urgent and important ones to resolve. -/// -/// Note: That by `oldest` we mean oldest in terms of relay chain block number, for any block -/// number that has not yet been finalized. If a block has been finalized already it should be -/// treated as low priority when it comes to disputes, as even in the case of a negative outcome, -/// we are already too late. The ordering mechanism here serves to prevent this from happening in -/// the first place. -#[derive(Copy, Clone)] -#[cfg_attr(test, derive(Debug))] -pub struct CandidateComparator { - /// Block number of the relay parent. - /// - /// Important, so we will be participating in oldest disputes first. - /// - /// Note: In theory it would make more sense to use the `BlockNumber` of the including - /// block, as inclusion time is the actual relevant event when it comes to ordering. The - /// problem is, that a candidate can get included multiple times on forks, so the `BlockNumber` - /// of the including block is not unique. We could theoretically work around that problem, by - /// just using the lowest `BlockNumber` of all available including blocks - the problem is, - /// that is not stable. If a new fork appears after the fact, we would start ordering the same - /// candidate differently, which would result in the same candidate getting queued twice. - relay_parent_block_number: BlockNumber, - /// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates. - candidate_hash: CandidateHash, -} - -impl PartialEq for CandidateComparator { - fn eq(&self, other: &CandidateComparator) -> bool { - Ordering::Equal == self.cmp(other) - } -} - -impl Eq for CandidateComparator {} - -impl PartialOrd for CandidateComparator { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for CandidateComparator { - fn cmp(&self, other: &Self) -> Ordering { - match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) { - Ordering::Equal => (), - o => return o, - } - self.candidate_hash.cmp(&other.candidate_hash) - } -} - -impl CandidateComparator { - /// Create a candidate comparator based on given (fake) values. - /// - /// Useful for testing. - #[cfg(test)] - pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self { - Self { relay_parent_block_number: block_number, candidate_hash } - } - /// Check whether the given candidate hash belongs to this comparator. - pub fn matches_candidate(&self, candidate_hash: &CandidateHash) -> bool { - &self.candidate_hash == candidate_hash - } -} - -impl OrderingProvider { - /// Limits the number of ancestors received for a single request. - pub(crate) const ANCESTRY_CHUNK_SIZE: usize = 10; - /// Limits the overall number of ancestors walked through for a given head. - pub(crate) const ANCESTRY_SIZE_LIMIT: usize = 1000; - - /// Create a properly initialized `OrderingProvider`. - pub async fn new( - sender: &mut Sender, - initial_head: ActivatedLeaf, - ) -> Result { - let mut s = Self { - included_candidates: HashSet::new(), - candidates_by_block_number: BTreeMap::new(), - last_observed_blocks: LruCache::new(LRU_OBSERVED_BLOCKS_CAPACITY), - }; - let update = - ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() }; - s.process_active_leaves_update(sender, &update).await?; - Ok(s) - } - - /// Retrieve a candidate `comparator` if available. - /// - /// If not available, we can treat disputes concerning this candidate with low priority and - /// should use spam slots for such disputes. - pub async fn candidate_comparator<'a>( - &mut self, - sender: &mut impl SubsystemSender, - candidate: &CandidateReceipt, - ) -> FatalResult> { - let candidate_hash = candidate.hash(); - if !self.included_candidates.contains(&candidate_hash) { - return Ok(None) - } - let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? { - None => { - gum::warn!( - target: LOG_TARGET, - candidate_hash = ?candidate_hash, - "Candidate's relay_parent could not be found via chain API, but we saw candidate included?!" - ); - return Ok(None) - }, - Some(n) => n, - }; - - Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash })) - } - - /// Query active leaves for any candidate `CandidateEvent::CandidateIncluded` events. - /// - /// and updates current heads, so we can query candidates for all non finalized blocks. - pub async fn process_active_leaves_update( - &mut self, - sender: &mut Sender, - update: &ActiveLeavesUpdate, - ) -> crate::error::Result<()> { - if let Some(activated) = update.activated.as_ref() { - // Fetch last finalized block. - let ancestors = match get_finalized_block_number(sender).await { - Ok(block_number) => { - // Fetch ancestry up to last finalized block. - Self::get_block_ancestors( - sender, - activated.hash, - activated.number, - block_number, - &mut self.last_observed_blocks, - ) - .await - .unwrap_or_else(|err| { - gum::debug!( - target: LOG_TARGET, - activated_leaf = ?activated, - error = ?err, - "Skipping leaf ancestors due to an error", - ); - // We assume this is a spurious error so we'll move forward with an - // empty ancestry. - Vec::new() - }) - }, - Err(err) => { - gum::debug!( - target: LOG_TARGET, - activated_leaf = ?activated, - error = ?err, - "Failed to retrieve last finalized block number", - ); - // We assume this is a spurious error so we'll move forward with an - // empty ancestry. - Vec::new() - }, - }; - - // Ancestors block numbers are consecutive in the descending order. - let earliest_block_number = activated.number - ancestors.len() as u32; - let block_numbers = (earliest_block_number..=activated.number).rev(); - - let block_hashes = std::iter::once(activated.hash).chain(ancestors); - for (block_num, block_hash) in block_numbers.zip(block_hashes) { - // Get included events: - let included = get_candidate_events(sender, block_hash) - .await? - .into_iter() - .filter_map(|ev| match ev { - CandidateEvent::CandidateIncluded(receipt, _, _, _) => Some(receipt), - _ => None, - }); - for receipt in included { - let candidate_hash = receipt.hash(); - self.included_candidates.insert(candidate_hash); - self.candidates_by_block_number - .entry(block_num) - .or_default() - .insert(candidate_hash); - } - } - - self.last_observed_blocks.put(activated.hash, ()); - } - - Ok(()) - } - - /// Prune finalized candidates. - /// - /// Once a candidate lives in a relay chain block that's behind the finalized chain/got - /// finalized, we can treat it as low priority. - pub fn process_finalized_block(&mut self, finalized: &BlockNumber) { - let not_finalized = self.candidates_by_block_number.split_off(finalized); - let finalized = std::mem::take(&mut self.candidates_by_block_number); - self.candidates_by_block_number = not_finalized; - // Clean up finalized: - for finalized_candidate in finalized.into_values().flatten() { - self.included_candidates.remove(&finalized_candidate); - } - } - - /// Returns ancestors of `head` in the descending order, stopping - /// either at the block present in cache or at `target_ancestor`. - /// - /// Suited specifically for querying non-finalized chains, thus - /// doesn't rely on block numbers. - /// - /// Both `head` and last are **not** included in the result. - pub async fn get_block_ancestors( - sender: &mut Sender, - mut head: Hash, - mut head_number: BlockNumber, - target_ancestor: BlockNumber, - lookup_cache: &mut LruCache, - ) -> Result> { - let mut ancestors = Vec::new(); - - if lookup_cache.get(&head).is_some() || head_number <= target_ancestor { - return Ok(ancestors) - } - - loop { - let (tx, rx) = oneshot::channel(); - let hashes = { - sender - .send_message( - ChainApiMessage::Ancestors { - hash: head, - k: Self::ANCESTRY_CHUNK_SIZE, - response_channel: tx, - } - .into(), - ) - .await; - - rx.await - .or(Err(FatalError::ChainApiSenderDropped))? - .map_err(FatalError::ChainApiAncestors)? - }; - - let earliest_block_number = match head_number.checked_sub(hashes.len() as u32) { - Some(number) => number, - None => { - // It's assumed that it's impossible to retrieve - // more than N ancestors for block number N. - gum::error!( - target: LOG_TARGET, - "Received {} ancestors for block number {} from Chain API", - hashes.len(), - head_number, - ); - return Ok(ancestors) - }, - }; - // The reversed order is parent, grandparent, etc. excluding the head. - let block_numbers = (earliest_block_number..head_number).rev(); - - for (block_number, hash) in block_numbers.zip(&hashes) { - // Return if we either met target/cached block or - // hit the size limit for the returned ancestry of head. - if lookup_cache.get(hash).is_some() || - block_number <= target_ancestor || - ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT - { - return Ok(ancestors) - } - - ancestors.push(*hash); - } - - match hashes.last() { - Some(last_hash) => { - head = *last_hash; - head_number = earliest_block_number; - }, - None => break, - } - } - return Ok(ancestors) - } -} - -async fn send_message_fatal( - sender: &mut Sender, - message: ChainApiMessage, - receiver: oneshot::Receiver>, -) -> FatalResult -where - Sender: SubsystemSender, -{ - sender.send_message(message.into()).await; - - receiver - .await - .map_err(|_| FatalError::ChainApiSenderDropped)? - .map_err(FatalError::ChainApiAncestors) -} - -async fn get_block_number( - sender: &mut impl SubsystemSender, - relay_parent: Hash, -) -> FatalResult> { - let (tx, rx) = oneshot::channel(); - send_message_fatal(sender, ChainApiMessage::BlockNumber(relay_parent, tx), rx).await -} - -pub async fn get_finalized_block_number( - sender: &mut impl SubsystemSender, -) -> FatalResult { - let (number_tx, number_rx) = oneshot::channel(); - send_message_fatal(sender, ChainApiMessage::FinalizedBlockNumber(number_tx), number_rx).await -} diff --git a/node/core/dispute-coordinator/src/real/participation/mod.rs b/node/core/dispute-coordinator/src/real/participation/mod.rs index 7f43a22b11cb..3ed91836b872 100644 --- a/node/core/dispute-coordinator/src/real/participation/mod.rs +++ b/node/core/dispute-coordinator/src/real/participation/mod.rs @@ -15,11 +15,15 @@ // along with Polkadot. If not, see . use std::collections::HashSet; +#[cfg(test)] +use std::time::Duration; use futures::{ channel::{mpsc, oneshot}, FutureExt, SinkExt, }; +#[cfg(test)] +use futures_timer::Delay; use polkadot_node_primitives::{ValidationResult, APPROVAL_EXECUTION_TIMEOUT}; use polkadot_node_subsystem::{ @@ -31,8 +35,7 @@ use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash use crate::real::LOG_TARGET; -use super::ordering::CandidateComparator; -use crate::error::{FatalError, FatalResult, JfyiError, Result}; +use crate::error::{FatalError, FatalResult, Result}; #[cfg(test)] mod tests; @@ -41,7 +44,7 @@ pub use tests::{participation_full_happy_path, participation_missing_availabilit mod queues; use queues::Queues; -pub use queues::{ParticipationRequest, QueueError}; +pub use queues::{ParticipationPriority, ParticipationRequest, QueueError}; /// How many participation processes do we want to run in parallel the most. /// @@ -144,7 +147,7 @@ impl Participation { pub async fn queue_participation( &mut self, ctx: &mut Context, - comparator: Option, + priority: ParticipationPriority, req: ParticipationRequest, ) -> Result<()> { // Participation already running - we can ignore that request: @@ -159,7 +162,7 @@ impl Participation { } } // Out of capacity/no recent block yet - queue: - Ok(self.queue.queue(comparator, req).map_err(JfyiError::QueueError)?) + self.queue.queue(ctx.sender(), priority, req).await } /// Message from a worker task was received - get the outcome. @@ -249,6 +252,9 @@ async fn participate( block_hash: Hash, req: ParticipationRequest, ) { + #[cfg(test)] + // Hack for tests, so we get recovery messages not too early. + Delay::new(Duration::from_millis(100)).await; // in order to validate a candidate we need to start by recovering the // available data let (recover_available_data_tx, recover_available_data_rx) = oneshot::channel(); diff --git a/node/core/dispute-coordinator/src/real/participation/queues/mod.rs b/node/core/dispute-coordinator/src/real/participation/queues/mod.rs index 921a0941337a..587f0bf10760 100644 --- a/node/core/dispute-coordinator/src/real/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/real/participation/queues/mod.rs @@ -14,11 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::collections::{BTreeMap, HashMap}; +use std::{ + cmp::Ordering, + collections::{BTreeMap, HashMap}, +}; -use polkadot_primitives::v2::{CandidateHash, CandidateReceipt, SessionIndex}; +use futures::channel::oneshot; +use polkadot_node_subsystem::{messages::ChainApiMessage, SubsystemSender}; +use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; -use crate::real::ordering::CandidateComparator; +use crate::{ + error::{FatalError, FatalResult, Result}, + LOG_TARGET, +}; #[cfg(test)] mod tests; @@ -73,11 +81,34 @@ pub struct ParticipationRequest { n_validators: usize, } -/// Entry for the best effort queue. -struct BestEffortEntry { - req: ParticipationRequest, - /// How often was the above request added to the queue. - added_count: BestEffortCount, +/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. +#[derive(Debug)] +pub enum ParticipationPriority { + BestEffort, + Priority, +} + +impl ParticipationPriority { + /// Create `ParticipationPriority` with either `Priority` + /// + /// or `BestEffort`. + pub fn with_priority_if(is_priority: bool) -> Self { + if is_priority { + Self::Priority + } else { + Self::BestEffort + } + } + + /// Whether or not this is a priority entry. + /// + /// If false, it is best effort. + pub fn is_priority(&self) -> bool { + match self { + Self::Priority => true, + Self::BestEffort => false, + } + } } /// What can go wrong when queuing a request. @@ -123,23 +154,47 @@ impl Queues { Self { best_effort: HashMap::new(), priority: BTreeMap::new() } } - /// Will put message in queue, either priority or best effort depending on whether a - /// `CandidateComparator` was provided or not. + /// Will put message in queue, either priority or best effort depending on priority. /// /// If the message was already previously present on best effort, it will be moved to priority - /// if a `CandidateComparator` has been passed now, otherwise the `added_count` on the best - /// effort queue will be bumped. + /// if it considered priority now, otherwise the `added_count` on the best effort queue will be + /// bumped. /// /// Returns error in case a queue was found full already. - pub fn queue( + pub async fn queue( &mut self, - comparator: Option, + sender: &mut impl SubsystemSender, + priority: ParticipationPriority, req: ParticipationRequest, - ) -> Result<(), QueueError> { - debug_assert!(comparator - .map(|c| c.matches_candidate(req.candidate_hash())) - .unwrap_or(true)); + ) -> Result<()> { + let comparator = match priority { + ParticipationPriority::BestEffort => None, + ParticipationPriority::Priority => + CandidateComparator::new(sender, &req.candidate_receipt).await?, + }; + self.queue_with_comparator(comparator, req)?; + Ok(()) + } + /// Get the next best request for dispute participation + /// + /// if any. Priority queue is always considered first, then the best effort queue based on + /// `added_count`. + pub fn dequeue(&mut self) -> Option { + if let Some(req) = self.pop_priority() { + // In case a candidate became best effort over time, we might have it also queued in + // the best effort queue - get rid of any such entry: + self.best_effort.remove(req.candidate_hash()); + return Some(req) + } + self.pop_best_effort() + } + + fn queue_with_comparator( + &mut self, + comparator: Option, + req: ParticipationRequest, + ) -> std::result::Result<(), QueueError> { if let Some(comparator) = comparator { if self.priority.len() >= PRIORITY_QUEUE_SIZE { return Err(QueueError::PriorityFull) @@ -161,20 +216,6 @@ impl Queues { Ok(()) } - /// Get the next best request for dispute participation - /// - /// if any. Priority queue is always considered first, then the best effort queue based on - /// `added_count`. - pub fn dequeue(&mut self) -> Option { - if let Some(req) = self.pop_priority() { - // In case a candidate became best effort over time, we might have it also queued in - // the best effort queue - get rid of any such entry: - self.best_effort.remove(req.candidate_hash()); - return Some(req) - } - self.pop_best_effort() - } - /// Get the next best from the best effort queue. /// /// If there are multiple best - just pick one. @@ -206,3 +247,115 @@ impl Queues { } } } + +/// Entry for the best effort queue. +struct BestEffortEntry { + req: ParticipationRequest, + /// How often was the above request added to the queue. + added_count: BestEffortCount, +} + +/// `Comparator` for ordering of disputes for candidates. +/// +/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness +/// between chains in case of equally old disputes. +/// +/// Objective ordering between nodes is important in case of lots disputes, so nodes will pull in +/// the same direction and work on resolving the same disputes first. This ensures that we will +/// conclude some disputes, even if there are lots of them. While any objective ordering would +/// suffice for this goal, ordering by age ensures we are not only resolving disputes, but also +/// resolve the oldest one first, which are also the most urgent and important ones to resolve. +/// +/// Note: That by `oldest` we mean oldest in terms of relay chain block number, for any block +/// number that has not yet been finalized. If a block has been finalized already it should be +/// treated as low priority when it comes to disputes, as even in the case of a negative outcome, +/// we are already too late. The ordering mechanism here serves to prevent this from happening in +/// the first place. +#[derive(Copy, Clone)] +#[cfg_attr(test, derive(Debug))] +struct CandidateComparator { + /// Block number of the relay parent. + /// + /// Important, so we will be participating in oldest disputes first. + /// + /// Note: In theory it would make more sense to use the `BlockNumber` of the including + /// block, as inclusion time is the actual relevant event when it comes to ordering. The + /// problem is, that a candidate can get included multiple times on forks, so the `BlockNumber` + /// of the including block is not unique. We could theoretically work around that problem, by + /// just using the lowest `BlockNumber` of all available including blocks - the problem is, + /// that is not stable. If a new fork appears after the fact, we would start ordering the same + /// candidate differently, which would result in the same candidate getting queued twice. + relay_parent_block_number: BlockNumber, + /// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates. + candidate_hash: CandidateHash, +} + +impl CandidateComparator { + /// Create a candidate comparator based on given (fake) values. + /// + /// Useful for testing. + #[cfg(test)] + pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self { + Self { relay_parent_block_number: block_number, candidate_hash } + } + + /// Create a candidate comparator for a given candidate. + /// + /// Returns: + /// `Ok(None)` in case we could not lookup the candidate's relay parent, returns a + /// `FatalError` in case the chain API call fails with an unexpected error. + pub async fn new( + sender: &mut impl SubsystemSender, + candidate: &CandidateReceipt, + ) -> FatalResult> { + let candidate_hash = candidate.hash(); + let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? { + None => { + gum::warn!( + target: LOG_TARGET, + candidate_hash = ?candidate_hash, + "Candidate's relay_parent could not be found via chain API - `CandidateComparator could not be provided!" + ); + return Ok(None) + }, + Some(n) => n, + }; + + Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash })) + } +} + +impl PartialEq for CandidateComparator { + fn eq(&self, other: &CandidateComparator) -> bool { + Ordering::Equal == self.cmp(other) + } +} + +impl Eq for CandidateComparator {} + +impl PartialOrd for CandidateComparator { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for CandidateComparator { + fn cmp(&self, other: &Self) -> Ordering { + match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) { + Ordering::Equal => (), + o => return o, + } + self.candidate_hash.cmp(&other.candidate_hash) + } +} + +async fn get_block_number( + sender: &mut impl SubsystemSender, + relay_parent: Hash, +) -> FatalResult> { + let (tx, rx) = oneshot::channel(); + sender.send_message(ChainApiMessage::BlockNumber(relay_parent, tx).into()).await; + rx.await + .map_err(|_| FatalError::ChainApiSenderDropped)? + .map_err(FatalError::ChainApiAncestors) +} diff --git a/node/core/dispute-coordinator/src/real/participation/queues/tests.rs b/node/core/dispute-coordinator/src/real/participation/queues/tests.rs index 8d1cc970a75e..03c8e1f3b658 100644 --- a/node/core/dispute-coordinator/src/real/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/real/participation/queues/tests.rs @@ -18,9 +18,7 @@ use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; use polkadot_primitives::v2::{BlockNumber, Hash}; -use crate::real::ordering::CandidateComparator; - -use super::{ParticipationRequest, QueueError, Queues}; +use super::{CandidateComparator, ParticipationRequest, QueueError, Queues}; /// Make a `ParticipationRequest` based on the given commitments hash. fn make_participation_request(hash: Hash) -> ParticipationRequest { @@ -52,21 +50,21 @@ fn ordering_works_as_expected() { let req5 = make_participation_request(Hash::repeat_byte(0x05)); let req_full = make_participation_request(Hash::repeat_byte(0x06)); let req_prio_full = make_participation_request(Hash::repeat_byte(0x07)); - queue.queue(None, req1.clone()).unwrap(); + queue.queue_with_comparator(None, req1.clone()).unwrap(); queue - .queue(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) + .queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) .unwrap(); - queue.queue(None, req3.clone()).unwrap(); + queue.queue_with_comparator(None, req3.clone()).unwrap(); queue - .queue(Some(make_dummy_comparator(&req_prio_2, 2)), req_prio_2.clone()) + .queue_with_comparator(Some(make_dummy_comparator(&req_prio_2, 2)), req_prio_2.clone()) .unwrap(); - queue.queue(None, req3.clone()).unwrap(); - queue.queue(None, req5.clone()).unwrap(); + queue.queue_with_comparator(None, req3.clone()).unwrap(); + queue.queue_with_comparator(None, req5.clone()).unwrap(); assert_matches!( - queue.queue(Some(make_dummy_comparator(&req_prio_full, 3)), req_prio_full), + queue.queue_with_comparator(Some(make_dummy_comparator(&req_prio_full, 3)), req_prio_full), Err(QueueError::PriorityFull) ); - assert_matches!(queue.queue(None, req_full), Err(QueueError::BestEffortFull)); + assert_matches!(queue.queue_with_comparator(None, req_full), Err(QueueError::BestEffortFull)); assert_eq!(queue.dequeue(), Some(req_prio)); assert_eq!(queue.dequeue(), Some(req_prio_2)); @@ -91,22 +89,22 @@ fn candidate_is_only_dequeued_once() { let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03)); let req_prio_then_best_effort = make_participation_request(Hash::repeat_byte(0x04)); - queue.queue(None, req1.clone()).unwrap(); + queue.queue_with_comparator(None, req1.clone()).unwrap(); queue - .queue(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) + .queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) .unwrap(); // Insert same best effort again: - queue.queue(None, req1.clone()).unwrap(); + queue.queue_with_comparator(None, req1.clone()).unwrap(); // insert same prio again: queue - .queue(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) + .queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) .unwrap(); // Insert first as best effort: - queue.queue(None, req_best_effort_then_prio.clone()).unwrap(); + queue.queue_with_comparator(None, req_best_effort_then_prio.clone()).unwrap(); // Then as prio: queue - .queue( + .queue_with_comparator( Some(make_dummy_comparator(&req_best_effort_then_prio, 2)), req_best_effort_then_prio.clone(), ) @@ -117,13 +115,13 @@ fn candidate_is_only_dequeued_once() { // Insert first as prio: queue - .queue( + .queue_with_comparator( Some(make_dummy_comparator(&req_prio_then_best_effort, 3)), req_prio_then_best_effort.clone(), ) .unwrap(); // Then as best effort: - queue.queue(None, req_prio_then_best_effort.clone()).unwrap(); + queue.queue_with_comparator(None, req_prio_then_best_effort.clone()).unwrap(); assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio)); assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort)); diff --git a/node/core/dispute-coordinator/src/real/participation/tests.rs b/node/core/dispute-coordinator/src/real/participation/tests.rs index d0b38eccbcbd..7fa635482004 100644 --- a/node/core/dispute-coordinator/src/real/participation/tests.rs +++ b/node/core/dispute-coordinator/src/real/participation/tests.rs @@ -76,7 +76,9 @@ async fn participate_with_commitments_hash( let req = ParticipationRequest::new(candidate_receipt, session, n_validators); - participation.queue_participation(ctx, None, req).await + participation + .queue_participation(ctx, ParticipationPriority::BestEffort, req) + .await } async fn activate_leaf( diff --git a/node/core/dispute-coordinator/src/real/scraping/mod.rs b/node/core/dispute-coordinator/src/real/scraping/mod.rs new file mode 100644 index 000000000000..060eaecc0414 --- /dev/null +++ b/node/core/dispute-coordinator/src/real/scraping/mod.rs @@ -0,0 +1,300 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::collections::{BTreeMap, HashSet}; + +use futures::channel::oneshot; +use lru::LruCache; + +use polkadot_node_primitives::MAX_FINALITY_LAG; +use polkadot_node_subsystem::{ + messages::ChainApiMessage, ActivatedLeaf, ActiveLeavesUpdate, ChainApiError, SubsystemSender, +}; +use polkadot_node_subsystem_util::runtime::{get_candidate_events, get_on_chain_votes}; +use polkadot_primitives::v2::{ + BlockNumber, CandidateEvent, CandidateHash, Hash, ScrapedOnChainVotes, +}; + +use crate::{ + error::{FatalError, FatalResult, Result}, + LOG_TARGET, +}; + +#[cfg(test)] +mod tests; + +/// Number of hashes to keep in the LRU. +/// +/// +/// When traversing the ancestry of a block we will stop once we hit a hash that we find in the +/// `last_observed_blocks` LRU. This means, this value should the very least be as large as the +/// number of expected forks for keeping chain scraping efficient. Making the LRU much larger than +/// that has very limited use. +const LRU_OBSERVED_BLOCKS_CAPACITY: usize = 20; + +/// Chain scraper +/// +/// Scrapes unfinalized chain in order to collect information from blocks. +/// +/// Concretely: +/// +/// - Monitors for inclusion events to keep track of candidates that have been included on chains. +/// - Calls `FetchOnChainVotes` for each block to gather potentially missed votes from chain. +/// +/// With this information it provies a `CandidateComparator` and as a return value of +/// `process_active_leaves_update` any scraped votes. +pub struct ChainScraper { + /// All candidates we have seen included, which not yet have been finalized. + included_candidates: HashSet, + /// including block -> `CandidateHash` + /// + /// We need this to clean up `included_candidates` on finalization. + candidates_by_block_number: BTreeMap>, + /// Latest relay blocks observed by the provider. + /// + /// We assume that ancestors of cached blocks are already processed, i.e. we have saved + /// corresponding included candidates. + last_observed_blocks: LruCache, +} + +impl ChainScraper { + /// Limits the number of ancestors received for a single request. + pub(crate) const ANCESTRY_CHUNK_SIZE: u32 = 10; + /// Limits the overall number of ancestors walked through for a given head. + /// + /// As long as we have `MAX_FINALITY_LAG` this makes sense as a value. + pub(crate) const ANCESTRY_SIZE_LIMIT: u32 = MAX_FINALITY_LAG; + + /// Create a properly initialized `OrderingProvider`. + /// + /// Returns: `Self` and any scraped votes. + pub async fn new( + sender: &mut Sender, + initial_head: ActivatedLeaf, + ) -> Result<(Self, Vec)> { + let mut s = Self { + included_candidates: HashSet::new(), + candidates_by_block_number: BTreeMap::new(), + last_observed_blocks: LruCache::new(LRU_OBSERVED_BLOCKS_CAPACITY), + }; + let update = + ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() }; + let votes = s.process_active_leaves_update(sender, &update).await?; + Ok((s, votes)) + } + + /// Check whether we have seen a candidate included on any chain. + pub fn is_candidate_included(&mut self, candidate_hash: &CandidateHash) -> bool { + self.included_candidates.contains(candidate_hash) + } + + /// Query active leaves for any candidate `CandidateEvent::CandidateIncluded` events. + /// + /// and updates current heads, so we can query candidates for all non finalized blocks. + /// + /// Returns: On chain vote for the leaf and any ancestors we might not yet have seen. + pub async fn process_active_leaves_update( + &mut self, + sender: &mut Sender, + update: &ActiveLeavesUpdate, + ) -> crate::error::Result> { + let activated = match update.activated.as_ref() { + Some(activated) => activated, + None => return Ok(Vec::new()), + }; + + // Fetch ancestry up to last finalized block. + let ancestors = self + .get_unfinalized_block_ancestors(sender, activated.hash, activated.number) + .await?; + + // Ancestors block numbers are consecutive in the descending order. + let earliest_block_number = activated.number - ancestors.len() as u32; + let block_numbers = (earliest_block_number..=activated.number).rev(); + + let block_hashes = std::iter::once(activated.hash).chain(ancestors); + + let mut on_chain_votes = Vec::new(); + for (block_number, block_hash) in block_numbers.zip(block_hashes) { + gum::trace!(?block_number, ?block_hash, "In ancestor processesing."); + + self.process_candidate_events(sender, block_number, block_hash).await?; + + if let Some(votes) = get_on_chain_votes(sender, block_hash).await? { + on_chain_votes.push(votes); + } + } + + self.last_observed_blocks.put(activated.hash, ()); + + Ok(on_chain_votes) + } + + /// Prune finalized candidates. + /// + /// Once a candidate lives in a relay chain block that's behind the finalized chain/got + /// finalized, we can treat it as low priority. + pub fn process_finalized_block(&mut self, finalized: &BlockNumber) { + let not_finalized = self.candidates_by_block_number.split_off(finalized); + let finalized = std::mem::take(&mut self.candidates_by_block_number); + self.candidates_by_block_number = not_finalized; + // Clean up finalized: + for finalized_candidate in finalized.into_values().flatten() { + self.included_candidates.remove(&finalized_candidate); + } + } + + /// Process candidate events of a block. + /// + /// Keep track of all included candidates. + async fn process_candidate_events( + &mut self, + sender: &mut impl SubsystemSender, + block_number: BlockNumber, + block_hash: Hash, + ) -> Result<()> { + // Get included events: + let included = + get_candidate_events(sender, block_hash) + .await? + .into_iter() + .filter_map(|ev| match ev { + CandidateEvent::CandidateIncluded(receipt, _, _, _) => Some(receipt), + _ => None, + }); + for receipt in included { + let candidate_hash = receipt.hash(); + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + ?block_number, + "Processing included event" + ); + self.included_candidates.insert(candidate_hash); + self.candidates_by_block_number + .entry(block_number) + .or_default() + .insert(candidate_hash); + } + Ok(()) + } + + /// Returns ancestors of `head` in the descending order, stopping + /// either at the block present in cache or at the last finalized block. + /// + /// Both `head` and the latest finalized block are **not** included in the result. + async fn get_unfinalized_block_ancestors( + &mut self, + sender: &mut Sender, + mut head: Hash, + mut head_number: BlockNumber, + ) -> Result> { + let target_ancestor = get_finalized_block_number(sender).await?; + + let mut ancestors = Vec::new(); + + // If head_number <= target_ancestor + 1 the ancestry will be empty. + if self.last_observed_blocks.get(&head).is_some() || head_number <= target_ancestor + 1 { + return Ok(ancestors) + } + + loop { + let hashes = get_block_ancestors(sender, head, Self::ANCESTRY_CHUNK_SIZE).await?; + + let earliest_block_number = match head_number.checked_sub(hashes.len() as u32) { + Some(number) => number, + None => { + // It's assumed that it's impossible to retrieve + // more than N ancestors for block number N. + gum::error!( + target: LOG_TARGET, + "Received {} ancestors for block number {} from Chain API", + hashes.len(), + head_number, + ); + return Ok(ancestors) + }, + }; + // The reversed order is parent, grandparent, etc. excluding the head. + let block_numbers = (earliest_block_number..head_number).rev(); + + for (block_number, hash) in block_numbers.zip(&hashes) { + // Return if we either met target/cached block or + // hit the size limit for the returned ancestry of head. + if self.last_observed_blocks.get(hash).is_some() || + block_number <= target_ancestor || + ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT as usize + { + return Ok(ancestors) + } + + ancestors.push(*hash); + } + + match hashes.last() { + Some(last_hash) => { + head = *last_hash; + head_number = earliest_block_number; + }, + None => break, + } + } + return Ok(ancestors) + } +} + +async fn get_finalized_block_number(sender: &mut impl SubsystemSender) -> FatalResult { + let (number_tx, number_rx) = oneshot::channel(); + send_message_fatal(sender, ChainApiMessage::FinalizedBlockNumber(number_tx), number_rx).await +} + +async fn get_block_ancestors( + sender: &mut impl SubsystemSender, + head: Hash, + num_ancestors: BlockNumber, +) -> FatalResult> { + let (tx, rx) = oneshot::channel(); + sender + .send_message( + ChainApiMessage::Ancestors { + hash: head, + k: num_ancestors as usize, + response_channel: tx, + } + .into(), + ) + .await; + + rx.await + .or(Err(FatalError::ChainApiSenderDropped))? + .map_err(FatalError::ChainApiAncestors) +} + +async fn send_message_fatal( + sender: &mut Sender, + message: ChainApiMessage, + receiver: oneshot::Receiver>, +) -> FatalResult +where + Sender: SubsystemSender, +{ + sender.send_message(message.into()).await; + + receiver + .await + .map_err(|_| FatalError::ChainApiSenderDropped)? + .map_err(FatalError::ChainApiAncestors) +} diff --git a/node/core/dispute-coordinator/src/real/ordering/tests.rs b/node/core/dispute-coordinator/src/real/scraping/tests.rs similarity index 78% rename from node/core/dispute-coordinator/src/real/ordering/tests.rs rename to node/core/dispute-coordinator/src/real/scraping/tests.rs index 09a7ef26ee5b..94e5652adda5 100644 --- a/node/core/dispute-coordinator/src/real/ordering/tests.rs +++ b/node/core/dispute-coordinator/src/real/scraping/tests.rs @@ -40,7 +40,9 @@ use polkadot_primitives::v2::{ GroupIndex, Hash, HashT, HeadData, }; -use super::OrderingProvider; +use crate::LOG_TARGET; + +use super::ChainScraper; type VirtualOverseer = TestSubsystemContextHandle; @@ -56,30 +58,32 @@ async fn overseer_recv(virtual_overseer: &mut VirtualOverseer) -> AllMessages { struct TestState { chain: Vec, - ordering: OrderingProvider, + scraper: ChainScraper, ctx: TestSubsystemContext, } impl TestState { async fn new() -> (Self, VirtualOverseer) { let (mut ctx, mut ctx_handle) = make_subsystem_context(TaskExecutor::new()); - let leaf = get_activated_leaf(0); - let chain = vec![get_block_number_hash(0)]; + let chain = vec![get_block_number_hash(0), get_block_number_hash(1)]; + let leaf = get_activated_leaf(1); let finalized_block_number = 0; let overseer_fut = async { assert_finalized_block_number_request(&mut ctx_handle, finalized_block_number).await; - // No requests for ancestors since the block is already finalized. + gum::trace!(target: LOG_TARGET, "After assert_finalized_block_number"); + // No ancestors requests, as list would be empty. assert_candidate_events_request(&mut ctx_handle, &chain).await; + assert_chain_vote_request(&mut ctx_handle, &chain).await; }; - let ordering_provider = - join(OrderingProvider::new(ctx.sender(), leaf.clone()), overseer_fut) - .await - .0 - .unwrap(); + let (scraper, _) = join(ChainScraper::new(ctx.sender(), leaf.clone()), overseer_fut) + .await + .0 + .unwrap(); + gum::trace!(target: LOG_TARGET, "After launching chain scraper"); - let test_state = Self { chain, ordering: ordering_provider, ctx }; + let test_state = Self { chain, scraper, ctx }; (test_state, ctx_handle) } @@ -99,10 +103,10 @@ fn next_leaf(chain: &mut Vec) -> ActivatedLeaf { async fn process_active_leaves_update( sender: &mut TestSubsystemSender, - ordering: &mut OrderingProvider, + scraper: &mut ChainScraper, update: ActivatedLeaf, ) { - ordering + scraper .process_active_leaves_update(sender, &ActiveLeavesUpdate::start_work(update)) .await .unwrap(); @@ -166,13 +170,14 @@ async fn assert_candidate_events_request(virtual_overseer: &mut VirtualOverseer, ); } -async fn assert_block_number_request(virtual_overseer: &mut VirtualOverseer, chain: &[Hash]) { +async fn assert_chain_vote_request(virtual_overseer: &mut VirtualOverseer, _chain: &[Hash]) { assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::BlockNumber(relay_parent, tx)) => { - let maybe_block_number = - chain.iter().position(|hash| *hash == relay_parent).map(|number| number as u32); - tx.send(Ok(maybe_block_number)).unwrap(); + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::FetchOnChainVotes(tx), + )) => { + tx.send(Ok(None)).unwrap(); } ); } @@ -211,27 +216,29 @@ async fn overseer_process_active_leaves_update( // Before walking through ancestors provider requests latest finalized block number. assert_finalized_block_number_request(virtual_overseer, finalized_block).await; // Expect block ancestors requests with respect to the ancestry step. - for _ in (0..expected_ancestry_len).step_by(OrderingProvider::ANCESTRY_CHUNK_SIZE) { + for _ in (0..expected_ancestry_len).step_by(ChainScraper::ANCESTRY_CHUNK_SIZE as usize) { assert_block_ancestors_request(virtual_overseer, chain).await; } // For each ancestry and the head return corresponding candidates inclusions. for _ in 0..expected_ancestry_len { assert_candidate_events_request(virtual_overseer, chain).await; + assert_chain_vote_request(virtual_overseer, chain).await; } } #[test] -fn ordering_provider_provides_ordering_when_initialized() { - let candidate = make_candidate_receipt(get_block_number_hash(1)); +fn scraper_provides_included_state_when_initialized() { + let candidate_1 = make_candidate_receipt(get_block_number_hash(1)); + let candidate_2 = make_candidate_receipt(get_block_number_hash(2)); futures::executor::block_on(async { let (state, mut virtual_overseer) = TestState::new().await; - let TestState { mut chain, mut ordering, mut ctx } = state; + let TestState { mut chain, mut scraper, mut ctx } = state; - let r = ordering.candidate_comparator(ctx.sender(), &candidate).await.unwrap(); - assert_matches!(r, None); + assert!(!scraper.is_candidate_included(&candidate_2.hash())); + assert!(scraper.is_candidate_included(&candidate_1.hash())); - // After next active leaves update we should have a comparator: + // After next active leaves update we should see the candidate included. let next_update = next_leaf(&mut chain); let finalized_block_number = 0; @@ -242,30 +249,22 @@ fn ordering_provider_provides_ordering_when_initialized() { finalized_block_number, expected_ancestry_len, ); - join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut) + join(process_active_leaves_update(ctx.sender(), &mut scraper, next_update), overseer_fut) .await; - let r = join( - ordering.candidate_comparator(ctx.sender(), &candidate), - assert_block_number_request(&mut virtual_overseer, &chain), - ) - .await - .0; - assert_matches!(r, Ok(Some(r2)) => { - assert_eq!(r2.relay_parent_block_number, 1); - }); + assert!(scraper.is_candidate_included(&candidate_2.hash())); }); } #[test] -fn ordering_provider_requests_candidates_of_leaf_ancestors() { +fn scraper_requests_candidates_of_leaf_ancestors() { futures::executor::block_on(async { // How many blocks should we skip before sending a leaf update. const BLOCKS_TO_SKIP: usize = 30; let (state, mut virtual_overseer) = TestState::new().await; - let TestState { mut chain, mut ordering, mut ctx } = state; + let TestState { mut chain, mut scraper, mut ctx } = state; let next_update = (0..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap(); @@ -276,34 +275,26 @@ fn ordering_provider_requests_candidates_of_leaf_ancestors() { finalized_block_number, BLOCKS_TO_SKIP, ); - join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut) + join(process_active_leaves_update(ctx.sender(), &mut scraper, next_update), overseer_fut) .await; let next_block_number = next_block_number(&chain); for block_number in 1..next_block_number { let candidate = make_candidate_receipt(get_block_number_hash(block_number)); - let r = join( - ordering.candidate_comparator(ctx.sender(), &candidate), - assert_block_number_request(&mut virtual_overseer, &chain), - ) - .await - .0; - assert_matches!(r, Ok(Some(r2)) => { - assert_eq!(r2.relay_parent_block_number, block_number); - }); + assert!(scraper.is_candidate_included(&candidate.hash())); } }); } #[test] -fn ordering_provider_requests_candidates_of_non_cached_ancestors() { +fn scraper_requests_candidates_of_non_cached_ancestors() { futures::executor::block_on(async { // How many blocks should we skip before sending a leaf update. const BLOCKS_TO_SKIP: &[usize] = &[30, 15]; let (state, mut virtual_overseer) = TestState::new().await; - let TestState { mut chain, mut ordering, mut ctx } = state; + let TestState { mut chain, scraper: mut ordering, mut ctx } = state; let next_update = (0..BLOCKS_TO_SKIP[0]).map(|_| next_leaf(&mut chain)).last().unwrap(); @@ -331,16 +322,17 @@ fn ordering_provider_requests_candidates_of_non_cached_ancestors() { } #[test] -fn ordering_provider_requests_candidates_of_non_finalized_ancestors() { +fn scraper_requests_candidates_of_non_finalized_ancestors() { futures::executor::block_on(async { // How many blocks should we skip before sending a leaf update. const BLOCKS_TO_SKIP: usize = 30; let (state, mut virtual_overseer) = TestState::new().await; - let TestState { mut chain, mut ordering, mut ctx } = state; + let TestState { mut chain, scraper: mut ordering, mut ctx } = state; - let next_update = (0..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap(); + // 1 because `TestState` starts at leaf 1. + let next_update = (1..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap(); let finalized_block_number = 17; let overseer_fut = overseer_process_active_leaves_update( diff --git a/node/core/dispute-coordinator/src/real/tests.rs b/node/core/dispute-coordinator/src/real/tests.rs index 54b952fd9e2b..ff956310340c 100644 --- a/node/core/dispute-coordinator/src/real/tests.rs +++ b/node/core/dispute-coordinator/src/real/tests.rs @@ -29,7 +29,6 @@ use futures::{ future::{self, BoxFuture}, }; -use parity_scale_codec::Encode; use polkadot_node_subsystem_util::database::Database; use polkadot_node_primitives::{SignedDisputeStatement, SignedFullStatement, Statement}; @@ -39,7 +38,7 @@ use polkadot_node_subsystem::{ ImportStatementsResult, }, overseer::FromOverseer, - ChainApiError, OverseerSignal, + OverseerSignal, }; use polkadot_node_subsystem_util::TimeoutExt; use sc_keystore::LocalKeystore; @@ -55,9 +54,9 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystemContextHandle}; use polkadot_primitives::v2::{ - BlakeTwo256, BlockNumber, CandidateCommitments, CandidateHash, CandidateReceipt, Hash, HashT, - Header, MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SessionInfo, - SigningContext, ValidatorId, ValidatorIndex, + BlockNumber, CandidateCommitments, CandidateHash, CandidateReceipt, Hash, Header, + MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SessionInfo, SigningContext, + ValidatorId, ValidatorIndex, }; use crate::{ @@ -87,12 +86,18 @@ fn make_keystore(seeds: impl Iterator) -> LocalKeystore { store } -fn session_to_hash(session: SessionIndex, extra: impl Encode) -> Hash { - BlakeTwo256::hash_of(&(session, extra)) -} - type VirtualOverseer = TestSubsystemContextHandle; +const OVERSEER_RECEIVE_TIMEOUT: Duration = Duration::from_secs(2); + +async fn overseer_recv(virtual_overseer: &mut VirtualOverseer) -> AllMessages { + virtual_overseer + .recv() + .timeout(OVERSEER_RECEIVE_TIMEOUT) + .await + .expect("overseer `recv` timed out") +} + #[derive(Clone)] struct MockClock { time: Arc, @@ -126,6 +131,9 @@ struct TestState { config: Config, clock: MockClock, headers: HashMap, + last_block: Hash, + // last session the subsystem knows about. + known_session: Option, } impl Default for TestState { @@ -168,6 +176,18 @@ impl Default for TestState { let db = Arc::new(db); let config = Config { col_data: 0 }; + let genesis_header = Header { + parent_hash: Hash::zero(), + number: 0, + digest: dummy_digest(), + state_root: dummy_hash(), + extrinsics_root: dummy_hash(), + }; + let last_block = genesis_header.hash(); + + let mut headers = HashMap::new(); + let _ = headers.insert(last_block, genesis_header.clone()); + TestState { validators: validators.into_iter().map(|(pair, _)| pair).collect(), validator_public, @@ -177,7 +197,9 @@ impl Default for TestState { db, config, clock: MockClock::default(), - headers: HashMap::new(), + headers, + last_block, + known_session: None, } } } @@ -191,9 +213,8 @@ impl TestState { ) { assert!(block_number > 0); - let parent_hash = session_to_hash(session, b"parent"); let block_header = Header { - parent_hash, + parent_hash: self.last_block, number: block_number, digest: dummy_digest(), state_root: dummy_hash(), @@ -202,7 +223,9 @@ impl TestState { let block_hash = block_header.hash(); let _ = self.headers.insert(block_hash, block_header.clone()); + self.last_block = block_hash; + gum::debug!(?block_number, "Activating block in activate_leaf_at_session."); virtual_overseer .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate::start_work(ActivatedLeaf { @@ -218,84 +241,113 @@ impl TestState { } async fn handle_sync_queries( - &self, + &mut self, virtual_overseer: &mut VirtualOverseer, block_hash: Hash, session: SessionIndex, ) { - assert_matches!( - virtual_overseer.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(h, block_hash); - let _ = tx.send(Ok(session)); + // Order of messages is not fixed (different on initializing): + struct FinishedSteps { + got_session_information: bool, + got_scraping_information: bool, + } + + impl FinishedSteps { + fn new() -> Self { + Self { got_session_information: false, got_scraping_information: false } } - ); + fn is_done(&self) -> bool { + self.got_session_information && self.got_scraping_information + } + } - loop { - // answer session info queries until the current session is reached. - assert_matches!( - virtual_overseer.recv().await, + let mut finished_steps = FinishedSteps::new(); + + while !finished_steps.is_done() { + match overseer_recv(virtual_overseer).await { AllMessages::RuntimeApi(RuntimeApiMessage::Request( h, - RuntimeApiRequest::SessionInfo(session_index, tx), + RuntimeApiRequest::SessionIndexForChild(tx), )) => { + assert!( + !finished_steps.got_session_information, + "session infos already retrieved" + ); + finished_steps.got_session_information = true; assert_eq!(h, block_hash); - - let _ = tx.send(Ok(Some(self.session_info()))); - if session_index == session { break } - } - ) - } - - // Since the test harness sends active leaves update for each block - // consecutively, walking back for ancestors is not necessary. Sending - // an error to the subsystem will force-skip this procedure, the ordering - // provider will only request for candidates included in the leaf. - assert_matches!( - virtual_overseer.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - tx - )) => { - tx.send(Err(ChainApiError::from(""))).unwrap(); - } - ); - - assert_matches!( - virtual_overseer.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _new_leaf, - RuntimeApiRequest::CandidateEvents(tx), - )) => { - tx.send(Ok(Vec::new())).unwrap(); - } - ); - - assert_matches!( - virtual_overseer.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _new_leaf, - RuntimeApiRequest::FetchOnChainVotes(tx), - )) => { - //add some `BackedCandidates` or resolved disputes here as needed - tx.send(Ok(Some(ScrapedOnChainVotes { - session, - backing_validators_per_candidate: Vec::default(), - disputes: MultiDisputeStatementSet::default(), - }))).unwrap(); + let _ = tx.send(Ok(session)); + // No queries, if subsystem knows about this session already. + if self.known_session == Some(session) { + continue + } + self.known_session = Some(session); + loop { + // answer session info queries until the current session is reached. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionInfo(session_index, tx), + )) => { + assert_eq!(h, block_hash); + + let _ = tx.send(Ok(Some(self.session_info()))); + if session_index == session { break } + } + ); + } + }, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(tx)) => { + assert!( + !finished_steps.got_scraping_information, + "Scraping info was already retrieved!" + ); + finished_steps.got_scraping_information = true; + tx.send(Ok(0)).unwrap(); + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _new_leaf, + RuntimeApiRequest::CandidateEvents(tx), + )) => { + tx.send(Ok(Vec::new())).unwrap(); + } + ); + gum::info!("After answering runtime api request"); + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _new_leaf, + RuntimeApiRequest::FetchOnChainVotes(tx), + )) => { + //add some `BackedCandidates` or resolved disputes here as needed + tx.send(Ok(Some(ScrapedOnChainVotes { + session, + backing_validators_per_candidate: Vec::default(), + disputes: MultiDisputeStatementSet::default(), + }))).unwrap(); + } + ); + gum::info!("After answering runtime api request (votes)"); + }, + msg => { + panic!("Received unexpected message in `handle_sync_queries`: {:?}", msg); + }, } - ) + } } async fn handle_resume_sync( - &self, + &mut self, virtual_overseer: &mut VirtualOverseer, session: SessionIndex, ) { let leaves: Vec = self.headers.keys().cloned().collect(); for (n, leaf) in leaves.iter().enumerate() { + gum::debug!( + block_number= ?n, + "Activating block in handle resume sync." + ); virtual_overseer .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate::start_work(ActivatedLeaf { @@ -376,10 +428,11 @@ impl TestState { SignedDisputeStatement::from_backing_statement(&statement, context, validator_id).unwrap() } - fn resume(self, test: F) -> Self + fn resume(mut self, test: F) -> Self where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState>, { + self.known_session = None; let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); let subsystem = DisputeCoordinatorSubsystem::new( self.db.clone(), @@ -411,7 +464,7 @@ async fn participation_with_distribution( ) { participation_full_happy_path(virtual_overseer, expected_commitments_hash).await; assert_matches!( - virtual_overseer.recv().await, + overseer_recv(virtual_overseer).await, AllMessages::DisputeDistribution( DisputeDistributionMessage::SendDispute(msg) ) => { @@ -459,7 +512,6 @@ fn too_many_unconfirmed_statements_are_considered_spam() { .issue_explicit_statement_with_index(1, candidate_hash1, session, false) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -470,7 +522,7 @@ fn too_many_unconfirmed_statements_are_considered_spam() { (valid_vote1, ValidatorIndex(3)), (invalid_vote1, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -514,7 +566,7 @@ fn too_many_unconfirmed_statements_are_considered_spam() { (valid_vote2, ValidatorIndex(3)), (invalid_vote2, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -577,7 +629,6 @@ fn dispute_gets_confirmed_via_participation() { .issue_explicit_statement_with_index(1, candidate_hash1, session, false) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -588,7 +639,7 @@ fn dispute_gets_confirmed_via_participation() { (valid_vote1, ValidatorIndex(3)), (invalid_vote1, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -636,7 +687,7 @@ fn dispute_gets_confirmed_via_participation() { (valid_vote2, ValidatorIndex(3)), (invalid_vote2, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -711,7 +762,6 @@ fn dispute_gets_confirmed_at_byzantine_threshold() { .issue_explicit_statement_with_index(1, candidate_hash1, session, false) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -724,7 +774,7 @@ fn dispute_gets_confirmed_at_byzantine_threshold() { (valid_vote1a, ValidatorIndex(4)), (invalid_vote1a, ValidatorIndex(5)), ], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -767,7 +817,7 @@ fn dispute_gets_confirmed_at_byzantine_threshold() { (valid_vote2, ValidatorIndex(3)), (invalid_vote2, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -834,7 +884,7 @@ fn backing_statements_import_works_and_no_spam() { (valid_vote1, ValidatorIndex(3)), (valid_vote2, ValidatorIndex(4)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -888,7 +938,7 @@ fn backing_statements_import_works_and_no_spam() { (valid_vote1, ValidatorIndex(3)), (valid_vote2, ValidatorIndex(4)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -931,7 +981,6 @@ fn conflicting_votes_lead_to_dispute_participation() { .issue_explicit_statement_with_index(2, candidate_hash, session, false) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -942,7 +991,7 @@ fn conflicting_votes_lead_to_dispute_participation() { (valid_vote, ValidatorIndex(3)), (invalid_vote, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -979,7 +1028,6 @@ fn conflicting_votes_lead_to_dispute_participation() { assert_eq!(votes.invalid.len(), 1); } - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -987,7 +1035,7 @@ fn conflicting_votes_lead_to_dispute_participation() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![(invalid_vote_2, ValidatorIndex(2))], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1039,7 +1087,6 @@ fn positive_votes_dont_trigger_participation() { .issue_explicit_statement_with_index(1, candidate_hash, session, true) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1047,7 +1094,7 @@ fn positive_votes_dont_trigger_participation() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![(valid_vote, ValidatorIndex(2))], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1077,7 +1124,6 @@ fn positive_votes_dont_trigger_participation() { assert!(votes.invalid.is_empty()); } - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1085,7 +1131,7 @@ fn positive_votes_dont_trigger_participation() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![(valid_vote_2, ValidatorIndex(1))], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1146,7 +1192,6 @@ fn wrong_validator_index_is_ignored() { .issue_explicit_statement_with_index(1, candidate_hash, session, false) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1157,7 +1202,7 @@ fn wrong_validator_index_is_ignored() { (valid_vote, ValidatorIndex(1)), (invalid_vote, ValidatorIndex(2)), ], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1218,7 +1263,6 @@ fn finality_votes_ignore_disputed_candidates() { .issue_explicit_statement_with_index(1, candidate_hash, session, false) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1229,7 +1273,7 @@ fn finality_votes_ignore_disputed_candidates() { (valid_vote, ValidatorIndex(2)), (invalid_vote, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1321,7 +1365,6 @@ fn supermajority_valid_dispute_may_be_finalized() { .issue_explicit_statement_with_index(1, candidate_hash, session, false) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1332,7 +1375,7 @@ fn supermajority_valid_dispute_may_be_finalized() { (valid_vote, ValidatorIndex(2)), (invalid_vote, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1353,7 +1396,6 @@ fn supermajority_valid_dispute_may_be_finalized() { statements.push((vote, ValidatorIndex(i as _))); } - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1361,7 +1403,7 @@ fn supermajority_valid_dispute_may_be_finalized() { candidate_receipt: candidate_receipt.clone(), session, statements, - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1446,7 +1488,6 @@ fn concluded_supermajority_for_non_active_after_time() { .issue_explicit_statement_with_index(1, candidate_hash, session, false) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1457,7 +1498,7 @@ fn concluded_supermajority_for_non_active_after_time() { (valid_vote, ValidatorIndex(2)), (invalid_vote, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1479,7 +1520,6 @@ fn concluded_supermajority_for_non_active_after_time() { statements.push((vote, ValidatorIndex(i as _))); } - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1487,7 +1527,7 @@ fn concluded_supermajority_for_non_active_after_time() { candidate_receipt: candidate_receipt.clone(), session, statements, - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1560,7 +1600,7 @@ fn concluded_supermajority_against_non_active_after_time() { (valid_vote, ValidatorIndex(2)), (invalid_vote, ValidatorIndex(1)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -1586,7 +1626,6 @@ fn concluded_supermajority_against_non_active_after_time() { statements.push((vote, ValidatorIndex(i as _))); } - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1594,7 +1633,7 @@ fn concluded_supermajority_against_non_active_after_time() { candidate_receipt: candidate_receipt.clone(), session, statements, - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1665,7 +1704,7 @@ fn resume_dispute_without_local_statement() { (valid_vote, ValidatorIndex(1)), (invalid_vote, ValidatorIndex(2)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -1695,7 +1734,7 @@ fn resume_dispute_without_local_statement() { }) // Alice should send a DisputeParticiationMessage::Participate on restart since she has no // local statement for the active dispute. - .resume(|test_state, mut virtual_overseer| { + .resume(|mut test_state, mut virtual_overseer| { Box::pin(async move { test_state.handle_resume_sync(&mut virtual_overseer, session).await; @@ -1728,7 +1767,6 @@ fn resume_dispute_without_local_statement() { .issue_explicit_statement_with_index(7, candidate_hash, session, true) .await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); virtual_overseer .send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { @@ -1743,7 +1781,7 @@ fn resume_dispute_without_local_statement() { (valid_vote6, ValidatorIndex(6)), (valid_vote7, ValidatorIndex(7)), ], - pending_confirmation, + pending_confirmation: None, }, }) .await; @@ -1809,7 +1847,7 @@ fn resume_dispute_with_local_statement() { (valid_vote, ValidatorIndex(1)), (invalid_vote, ValidatorIndex(2)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -1836,7 +1874,7 @@ fn resume_dispute_with_local_statement() { }) // Alice should not send a DisputeParticiationMessage::Participate on restart since she has a // local statement for the active dispute. - .resume(|test_state, mut virtual_overseer| { + .resume(|mut test_state, mut virtual_overseer| { Box::pin(async move { test_state.handle_resume_sync(&mut virtual_overseer, session).await; @@ -1886,7 +1924,7 @@ fn resume_dispute_without_local_statement_or_local_key() { (valid_vote, ValidatorIndex(1)), (invalid_vote, ValidatorIndex(2)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -1916,7 +1954,7 @@ fn resume_dispute_without_local_statement_or_local_key() { }) // Two should not send a DisputeParticiationMessage::Participate on restart since she is no // validator in that dispute. - .resume(|test_state, mut virtual_overseer| { + .resume(|mut test_state, mut virtual_overseer| { Box::pin(async move { test_state.handle_resume_sync(&mut virtual_overseer, session).await; @@ -1969,7 +2007,7 @@ fn resume_dispute_with_local_statement_without_local_key() { (valid_vote, ValidatorIndex(1)), (invalid_vote, ValidatorIndex(2)), ], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -1999,7 +2037,7 @@ fn resume_dispute_with_local_statement_without_local_key() { make_keystore(vec![Sr25519Keyring::Two.to_seed()].into_iter()).into(); // Two should not send a DisputeParticiationMessage::Participate on restart since we gave // her a non existing key. - test_state.resume(|test_state, mut virtual_overseer| { + test_state.resume(|mut test_state, mut virtual_overseer| { Box::pin(async move { test_state.handle_resume_sync(&mut virtual_overseer, session).await; @@ -2049,7 +2087,7 @@ fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation candidate_receipt: candidate_receipt.clone(), session, statements: vec![(other_vote, ValidatorIndex(1))], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, }) .await; @@ -2070,7 +2108,7 @@ fn issue_local_statement_does_cause_distribution_but_not_duplicate_participation // Dispute distribution should get notified now: assert_matches!( - virtual_overseer.recv().await, + overseer_recv(&mut virtual_overseer).await, AllMessages::DisputeDistribution( DisputeDistributionMessage::SendDispute(msg) ) => { @@ -2155,7 +2193,7 @@ fn empty_import_still_writes_candidate_receipt() { candidate_receipt: candidate_receipt.clone(), session, statements: Vec::new(), - pending_confirmation: tx, + pending_confirmation: Some(tx), }, }) .await; @@ -2206,7 +2244,7 @@ fn redundant_votes_ignored() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![(valid_vote.clone(), ValidatorIndex(1))], - pending_confirmation: tx, + pending_confirmation: Some(tx), }, }) .await; @@ -2221,7 +2259,7 @@ fn redundant_votes_ignored() { candidate_receipt: candidate_receipt.clone(), session, statements: vec![(valid_vote_2, ValidatorIndex(1))], - pending_confirmation: tx, + pending_confirmation: Some(tx), }, }) .await; diff --git a/node/network/dispute-distribution/src/receiver/mod.rs b/node/network/dispute-distribution/src/receiver/mod.rs index 252ec7c4301a..a75d95b6e4aa 100644 --- a/node/network/dispute-distribution/src/receiver/mod.rs +++ b/node/network/dispute-distribution/src/receiver/mod.rs @@ -271,7 +271,7 @@ where candidate_receipt, session: valid_vote.0.session_index(), statements: vec![valid_vote, invalid_vote], - pending_confirmation, + pending_confirmation: Some(pending_confirmation), }, )) .await; diff --git a/node/network/dispute-distribution/src/tests/mod.rs b/node/network/dispute-distribution/src/tests/mod.rs index 9b0e4037a38e..e3df6c20a6af 100644 --- a/node/network/dispute-distribution/src/tests/mod.rs +++ b/node/network/dispute-distribution/src/tests/mod.rs @@ -526,7 +526,7 @@ async fn nested_network_dispute_request<'a, F, O>( candidate_receipt, session, statements, - pending_confirmation, + pending_confirmation: Some(pending_confirmation), } ) => { assert_eq!(session, MOCK_SESSION_INDEX); diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 69b2baf0af12..62bb97c66c06 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -254,7 +254,7 @@ pub enum DisputeCoordinatorMessage { /// The validator index passed alongside each statement should correspond to the index /// of the validator in the set. statements: Vec<(SignedDisputeStatement, ValidatorIndex)>, - /// Inform the requester once we finished importing. + /// Inform the requester once we finished importing (if a sender was provided). /// /// This is: /// - we discarded the votes because @@ -268,7 +268,7 @@ pub enum DisputeCoordinatorMessage { /// - or other explicit votes on that candidate already recorded /// - or recovered availability for the candidate /// - or the imported statements are backing/approval votes, which are always accepted. - pending_confirmation: oneshot::Sender, + pending_confirmation: Option>, }, /// Fetch a list of all recent disputes the co-ordinator is aware of. /// These are disputes which have occurred any time in recent sessions, diff --git a/node/subsystem-util/src/lib.rs b/node/subsystem-util/src/lib.rs index 8e969fc110e1..b249fa11f0eb 100644 --- a/node/subsystem-util/src/lib.rs +++ b/node/subsystem-util/src/lib.rs @@ -52,8 +52,9 @@ use pin_project::pin_project; use polkadot_primitives::v2::{ AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption, - PersistedValidationData, SessionIndex, SessionInfo, Signed, SigningContext, ValidationCode, - ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + PersistedValidationData, ScrapedOnChainVotes, SessionIndex, SessionInfo, Signed, + SigningContext, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, + ValidatorSignature, }; pub use rand; use sp_application_crypto::AppKey; @@ -217,6 +218,7 @@ specialize_requests! { fn request_session_info(index: SessionIndex) -> Option; SessionInfo; fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; ValidationCodeHash; + fn request_on_chain_votes() -> Option; FetchOnChainVotes; } /// From the given set of validators, find the first key we can sign with, if any. diff --git a/node/subsystem-util/src/runtime/mod.rs b/node/subsystem-util/src/runtime/mod.rs index 31a67a6859c7..4286eba89a54 100644 --- a/node/subsystem-util/src/runtime/mod.rs +++ b/node/subsystem-util/src/runtime/mod.rs @@ -28,13 +28,14 @@ use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; use polkadot_node_subsystem::{SubsystemContext, SubsystemSender}; use polkadot_primitives::v2::{ CandidateEvent, CoreState, EncodeAs, GroupIndex, GroupRotationInfo, Hash, OccupiedCore, - SessionIndex, SessionInfo, Signed, SigningContext, UncheckedSigned, ValidationCode, - ValidationCodeHash, ValidatorId, ValidatorIndex, + ScrapedOnChainVotes, SessionIndex, SessionInfo, Signed, SigningContext, UncheckedSigned, + ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, }; use crate::{ - request_availability_cores, request_candidate_events, request_session_index_for_child, - request_session_info, request_validation_code_by_hash, request_validator_groups, + request_availability_cores, request_candidate_events, request_on_chain_votes, + request_session_index_for_child, request_session_info, request_validation_code_by_hash, + request_validator_groups, }; /// Errors that can happen on runtime fetches. @@ -314,6 +315,17 @@ where recv_runtime(request_candidate_events(relay_parent, sender).await).await } +/// Get on chain votes. +pub async fn get_on_chain_votes( + sender: &mut Sender, + relay_parent: Hash, +) -> Result> +where + Sender: SubsystemSender, +{ + recv_runtime(request_on_chain_votes(relay_parent, sender).await).await +} + /// Fetch `ValidationCode` by hash from the runtime. pub async fn get_validation_code_by_hash( sender: &mut Sender,