diff --git a/node/core/dispute-coordinator/src/backend.rs b/node/core/dispute-coordinator/src/backend.rs index 92e63fba1ad9..9e8252f01176 100644 --- a/node/core/dispute-coordinator/src/backend.rs +++ b/node/core/dispute-coordinator/src/backend.rs @@ -149,13 +149,6 @@ impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> { self.candidate_votes.insert((session, candidate_hash), Some(votes)); } - /// Prepare a deletion of the candidate votes under the indicated candidate. - /// - /// Later calls to this function for the same candidate will override earlier ones. - pub fn delete_candidate_votes(&mut self, session: SessionIndex, candidate_hash: CandidateHash) { - self.candidate_votes.insert((session, candidate_hash), None); - } - /// Transform this backend into a set of write-ops to be written to the inner backend. pub fn into_write_ops(self) -> impl Iterator { let earliest_session_ops = self diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs index 4810123d4f17..0f0883649b5b 100644 --- a/node/core/dispute-coordinator/src/db/v1.rs +++ b/node/core/dispute-coordinator/src/db/v1.rs @@ -30,22 +30,80 @@ use parity_scale_codec::{Decode, Encode}; use crate::{ backend::{Backend, BackendWriteOp, OverlayedBackend}, error::{FatalError, FatalResult}, + metrics::Metrics, status::DisputeStatus, - DISPUTE_WINDOW, + DISPUTE_WINDOW, LOG_TARGET, }; const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes"; const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session"; const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes"; +/// Until what session have votes been cleaned up already? +const CLEANED_VOTES_WATERMARK_KEY: &[u8; 23] = b"cleaned-votes-watermark"; + +/// Restrict number of cleanup operations. +/// +/// On the first run we are starting at session 0 going up all the way to the current session - +/// this should not be done at once, but rather in smaller batches so nodes won't get stalled by +/// this. +/// +/// 300 is with session duration of 1 hour and 30 parachains around <3_000_000 key purges in the worst +/// case. Which is already quite a lot, at the same time we have around 21_000 sessions on +/// Kusama. This means at 300 purged sessions per session, cleaning everything up will take +/// around 3 days. Depending on how severe disk usage becomes, we might want to bump the batch +/// size, at the cost of risking issues at session boundaries (performance). +#[cfg(test)] +const MAX_CLEAN_BATCH_SIZE: u32 = 10; +#[cfg(not(test))] +const MAX_CLEAN_BATCH_SIZE: u32 = 300; pub struct DbBackend { inner: Arc, config: ColumnConfiguration, + metrics: Metrics, } impl DbBackend { - pub fn new(db: Arc, config: ColumnConfiguration) -> Self { - Self { inner: db, config } + pub fn new(db: Arc, config: ColumnConfiguration, metrics: Metrics) -> Self { + Self { inner: db, config, metrics } + } + + /// Cleanup old votes. + /// + /// Should be called whenever a new earliest session gets written. + fn add_vote_cleanup_tx( + &mut self, + tx: &mut DBTransaction, + earliest_session: SessionIndex, + ) -> FatalResult<()> { + // Cleanup old votes in db: + let watermark = load_cleaned_votes_watermark(&*self.inner, &self.config)?.unwrap_or(0); + let clean_until = if earliest_session.saturating_sub(watermark) > MAX_CLEAN_BATCH_SIZE { + watermark + MAX_CLEAN_BATCH_SIZE + } else { + earliest_session + }; + gum::trace!( + target: LOG_TARGET, + ?watermark, + ?clean_until, + ?earliest_session, + ?MAX_CLEAN_BATCH_SIZE, + "WriteEarliestSession" + ); + + for index in watermark..clean_until { + gum::trace!( + target: LOG_TARGET, + ?index, + encoded = ?candidate_votes_session_prefix(index), + "Cleaning votes for session index" + ); + tx.delete_prefix(self.config.col_data, &candidate_votes_session_prefix(index)); + } + // New watermark: + tx.put_vec(self.config.col_data, CLEANED_VOTES_WATERMARK_KEY, clean_until.encode()); + Ok(()) } } @@ -71,20 +129,32 @@ impl Backend for DbBackend { /// Atomically writes the list of operations, with later operations taking precedence over /// prior. + /// + /// This also takes care of purging old votes (of obsolete sessions). fn write(&mut self, ops: I) -> FatalResult<()> where I: IntoIterator, { let mut tx = DBTransaction::new(); + // Make sure the whole process is timed, including the actual transaction flush: + let mut cleanup_timer = None; for op in ops { match op { BackendWriteOp::WriteEarliestSession(session) => { + cleanup_timer = match cleanup_timer.take() { + None => Some(self.metrics.time_vote_cleanup()), + Some(t) => Some(t), + }; + self.add_vote_cleanup_tx(&mut tx, session)?; + + // Actually write the earliest session. tx.put_vec(self.config.col_data, EARLIEST_SESSION_KEY, session.encode()); }, BackendWriteOp::WriteRecentDisputes(recent_disputes) => { tx.put_vec(self.config.col_data, RECENT_DISPUTES_KEY, recent_disputes.encode()); }, BackendWriteOp::WriteCandidateVotes(session, candidate_hash, votes) => { + gum::trace!(target: LOG_TARGET, ?session, "Writing candidate votes"); tx.put_vec( self.config.col_data, &candidate_votes_key(session, &candidate_hash), @@ -112,6 +182,15 @@ fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) -> buf } +fn candidate_votes_session_prefix(session: SessionIndex) -> [u8; 15 + 4] { + let mut buf = [0u8; 15 + 4]; + buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY); + + // big-endian encoding is used to ensure lexicographic ordering. + buf[15..][..4].copy_from_slice(&session.to_be_bytes()); + buf +} + /// Column configuration information for the DB. #[derive(Debug, Clone)] pub struct ColumnConfiguration { @@ -244,9 +323,7 @@ pub(crate) fn note_current_session( if pruned_disputes.len() != 0 { overlay_db.write_recent_disputes(new_recent_disputes); - for ((session, candidate_hash), _) in pruned_disputes { - overlay_db.delete_candidate_votes(session, candidate_hash); - } + // Note: Deleting old candidate votes is handled in `write` based on the earliest session. } } }, @@ -258,18 +335,114 @@ pub(crate) fn note_current_session( Ok(()) } +/// Until what session votes have been cleaned up already. +/// +/// That is the db has already been purged of votes for sessions older than the returned +/// `SessionIndex`. +fn load_cleaned_votes_watermark( + db: &dyn Database, + config: &ColumnConfiguration, +) -> FatalResult> { + load_decode(db, config.col_data, CLEANED_VOTES_WATERMARK_KEY) + .map_err(|e| FatalError::DbReadFailed(e)) +} + #[cfg(test)] mod tests { + use super::*; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use polkadot_primitives::v2::{Hash, Id as ParaId}; fn make_db() -> DbBackend { let db = kvdb_memorydb::create(1); - let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); + let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]); let store = Arc::new(db); let config = ColumnConfiguration { col_data: 0 }; - DbBackend::new(store, config) + DbBackend::new(store, config, Metrics::default()) + } + + #[test] + fn max_clean_batch_size_is_honored() { + let mut backend = make_db(); + + let mut overlay_db = OverlayedBackend::new(&backend); + let current_session = MAX_CLEAN_BATCH_SIZE + DISPUTE_WINDOW.get() + 3; + let earliest_session = current_session - DISPUTE_WINDOW.get(); + + overlay_db.write_earliest_session(0); + let candidate_hash = CandidateHash(Hash::repeat_byte(1)); + + for session in 0..current_session + 1 { + overlay_db.write_candidate_votes( + session, + candidate_hash, + CandidateVotes { + candidate_receipt: dummy_candidate_receipt(dummy_hash()), + valid: Vec::new(), + invalid: Vec::new(), + }, + ); + } + assert!(overlay_db.load_candidate_votes(0, &candidate_hash).unwrap().is_some()); + assert!(overlay_db + .load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash) + .unwrap() + .is_some()); + assert!(overlay_db + .load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash) + .unwrap() + .is_some()); + + // Cleanup only works for votes that have been written already - so write. + let write_ops = overlay_db.into_write_ops(); + backend.write(write_ops).unwrap(); + + let mut overlay_db = OverlayedBackend::new(&backend); + + gum::trace!(target: LOG_TARGET, ?current_session, "Noting current session"); + note_current_session(&mut overlay_db, current_session).unwrap(); + + let write_ops = overlay_db.into_write_ops(); + backend.write(write_ops).unwrap(); + + let mut overlay_db = OverlayedBackend::new(&backend); + + assert!(overlay_db + .load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash) + .unwrap() + .is_none()); + // After batch size votes should still be there: + assert!(overlay_db + .load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash) + .unwrap() + .is_some()); + + let current_session = current_session + 1; + let earliest_session = earliest_session + 1; + + note_current_session(&mut overlay_db, current_session).unwrap(); + + let write_ops = overlay_db.into_write_ops(); + backend.write(write_ops).unwrap(); + + let overlay_db = OverlayedBackend::new(&backend); + + // All should be gone now: + assert!(overlay_db + .load_candidate_votes(earliest_session - 1, &candidate_hash) + .unwrap() + .is_none()); + // Earliest session should still be there: + assert!(overlay_db + .load_candidate_votes(earliest_session, &candidate_hash) + .unwrap() + .is_some()); + // Old current session should still be there as well: + assert!(overlay_db + .load_candidate_votes(current_session - 1, &candidate_hash) + .unwrap() + .is_some()); } #[test] @@ -368,57 +541,40 @@ mod tests { let mut backend = make_db(); let mut overlay_db = OverlayedBackend::new(&backend); - overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1))); overlay_db.write_candidate_votes( 1, CandidateHash(Hash::repeat_byte(1)), CandidateVotes { - candidate_receipt: dummy_candidate_receipt(dummy_hash()), + candidate_receipt: dummy_candidate_receipt(Hash::random()), valid: Vec::new(), invalid: Vec::new(), }, ); - let write_ops = overlay_db.into_write_ops(); - backend.write(write_ops).unwrap(); - - assert_eq!( - backend - .load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1))) - .unwrap() - .unwrap() - .candidate_receipt - .descriptor - .para_id, - ParaId::from(1), - ); + let receipt = dummy_candidate_receipt(dummy_hash()); - let mut overlay_db = OverlayedBackend::new(&backend); overlay_db.write_candidate_votes( 1, CandidateHash(Hash::repeat_byte(1)), CandidateVotes { - candidate_receipt: { - let mut receipt = dummy_candidate_receipt(dummy_hash()); - receipt.descriptor.para_id = ParaId::from(5_u32); - - receipt - }, + candidate_receipt: receipt.clone(), valid: Vec::new(), invalid: Vec::new(), }, ); - overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1))); - let write_ops = overlay_db.into_write_ops(); backend.write(write_ops).unwrap(); - assert!(backend - .load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1))) - .unwrap() - .is_none()); + assert_eq!( + backend + .load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1))) + .unwrap() + .unwrap() + .candidate_receipt, + receipt, + ); } #[test] @@ -434,6 +590,7 @@ mod tests { let new_earliest_session = 5; let current_session = 5 + DISPUTE_WINDOW.get(); + let super_old_no_dispute = 1; let very_old = 3; let slightly_old = 4; let very_recent = current_session - 1; @@ -457,6 +614,7 @@ mod tests { .collect(), ); + overlay_db.write_candidate_votes(super_old_no_dispute, hash_a, blank_candidate_votes()); overlay_db.write_candidate_votes(very_old, hash_a, blank_candidate_votes()); overlay_db.write_candidate_votes(slightly_old, hash_b, blank_candidate_votes()); @@ -483,6 +641,16 @@ mod tests { .collect(), ); + // Votes are only cleaned up after actual write: + let write_ops = overlay_db.into_write_ops(); + backend.write(write_ops).unwrap(); + + let overlay_db = OverlayedBackend::new(&backend); + + assert!(overlay_db + .load_candidate_votes(super_old_no_dispute, &hash_a) + .unwrap() + .is_none()); assert!(overlay_db.load_candidate_votes(very_old, &hash_a).unwrap().is_none()); assert!(overlay_db.load_candidate_votes(slightly_old, &hash_b).unwrap().is_none()); assert!(overlay_db diff --git a/node/core/dispute-coordinator/src/error.rs b/node/core/dispute-coordinator/src/error.rs index dbef9c29167e..4306413a6ab9 100644 --- a/node/core/dispute-coordinator/src/error.rs +++ b/node/core/dispute-coordinator/src/error.rs @@ -20,7 +20,7 @@ use futures::channel::oneshot; use polkadot_node_subsystem::{errors::ChainApiError, SubsystemError}; use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime}; -use crate::{participation, LOG_TARGET}; +use crate::{db, participation, LOG_TARGET}; use parity_scale_codec::Error as CodecError; pub type Result = std::result::Result; @@ -53,6 +53,10 @@ pub enum Error { #[error("Writing to database failed: {0}")] DbWriteFailed(std::io::Error), + #[fatal] + #[error("Reading from database failed: {0}")] + DbReadFailed(db::v1::Error), + #[fatal] #[error("Oneshot for receiving block number from chain API got cancelled")] CanceledBlockNumber, diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index a4cff2f1385c..d04cbf29ca58 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -127,7 +127,11 @@ impl Config { impl DisputeCoordinatorSubsystem { fn start(self, ctx: Context) -> SpawnedSubsystem { let future = async { - let backend = DbBackend::new(self.store.clone(), self.config.column_config()); + let backend = DbBackend::new( + self.store.clone(), + self.config.column_config(), + self.metrics.clone(), + ); self.run(ctx, backend, Box::new(SystemClock)) .await .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) diff --git a/node/core/dispute-coordinator/src/metrics.rs b/node/core/dispute-coordinator/src/metrics.rs index 83810df67a2b..40503428c1c8 100644 --- a/node/core/dispute-coordinator/src/metrics.rs +++ b/node/core/dispute-coordinator/src/metrics.rs @@ -26,6 +26,8 @@ struct MetricsInner { concluded: prometheus::CounterVec, /// Number of participations that have been queued. queued_participations: prometheus::CounterVec, + /// How long vote cleanup batches take. + vote_cleanup_time: prometheus::Histogram, } /// Candidate validation metrics. @@ -74,6 +76,10 @@ impl Metrics { metrics.queued_participations.with_label_values(&["best-effort"]).inc(); } } + + pub(crate) fn time_vote_cleanup(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.vote_cleanup_time.start_timer()) + } } impl metrics::Metrics for Metrics { @@ -116,6 +122,16 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + vote_cleanup_time: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_dispute_coordinator_vote_cleanup", + "Time spent cleaning up old votes per batch.", + ) + .buckets([0.01, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0].into()), + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index e8c983ddb977..f1dbde642c22 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -438,7 +438,8 @@ impl TestState { self.subsystem_keystore.clone(), Metrics::default(), ); - let backend = DbBackend::new(self.db.clone(), self.config.column_config()); + let backend = + DbBackend::new(self.db.clone(), self.config.column_config(), Metrics::default()); let subsystem_task = subsystem.run(ctx, backend, Box::new(self.clock.clone())); let test_task = test(self, ctx_handle); @@ -2150,7 +2151,11 @@ fn negative_issue_local_statement_only_triggers_import() { }) .await; - let backend = DbBackend::new(test_state.db.clone(), test_state.config.column_config()); + let backend = DbBackend::new( + test_state.db.clone(), + test_state.config.column_config(), + Metrics::default(), + ); let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap(); assert_eq!(votes.invalid.len(), 1); @@ -2198,7 +2203,11 @@ fn empty_import_still_writes_candidate_receipt() { rx.await.unwrap(); - let backend = DbBackend::new(test_state.db.clone(), test_state.config.column_config()); + let backend = DbBackend::new( + test_state.db.clone(), + test_state.config.column_config(), + Metrics::default(), + ); let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap(); assert_eq!(votes.invalid.len(), 0); @@ -2264,7 +2273,11 @@ fn redundant_votes_ignored() { rx.await.unwrap(); - let backend = DbBackend::new(test_state.db.clone(), test_state.config.column_config()); + let backend = DbBackend::new( + test_state.db.clone(), + test_state.config.column_config(), + Metrics::default(), + ); let votes = backend.load_candidate_votes(session, &candidate_hash).unwrap().unwrap(); assert_eq!(votes.invalid.len(), 0); diff --git a/node/service/src/parachains_db/mod.rs b/node/service/src/parachains_db/mod.rs index d80341e918d3..f6f6864a0e78 100644 --- a/node/service/src/parachains_db/mod.rs +++ b/node/service/src/parachains_db/mod.rs @@ -33,7 +33,8 @@ pub(crate) mod columns { pub const COL_APPROVAL_DATA: u32 = 2; pub const COL_CHAIN_SELECTION_DATA: u32 = 3; pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4; - pub const ORDERED_COL: &[u32] = &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA]; + pub const ORDERED_COL: &[u32] = + &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA]; } /// Columns used by different subsystems.