-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Fix a storage leak in parachains db #5594
Changes from 10 commits
7601247
834f308
a2b43ee
e31ecbd
85ced82
b6b0b37
2fccece
c1415da
37f8a19
07b0431
a83b172
5513608
1253ab1
dfcc91b
7077ddb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,12 +31,30 @@ use crate::{ | |
| backend::{Backend, BackendWriteOp, OverlayedBackend}, | ||
| error::{FatalError, FatalResult}, | ||
| status::DisputeStatus, | ||
| DISPUTE_WINDOW, | ||
| DISPUTE_WINDOW, LOG_TARGET, | ||
| }; | ||
|
|
||
| const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes"; | ||
| const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session"; | ||
| const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes"; | ||
| /// Until what session have votes been cleaned up already? | ||
| const CLEANED_VOTES_WATERMARK_KEY: &[u8; 23] = b"cleaned-votes-watermark"; | ||
|
|
||
| /// Restrict number of cleanup operations. | ||
| /// | ||
| /// On the first run we are starting at session 0 going up all the way to the current session - | ||
| /// this should not be done at once, but rather in smaller batches so nodes won't get stalled by | ||
| /// this. | ||
| /// | ||
| /// 300 is with session duration of 1h and 30 parachains around <3_000_000 key purges in the worst | ||
| /// case. Which is already quite a lot, at the same time we have around 21_000 sessions on | ||
| /// Kusama. This means at 300 purged sessions per session, cleaning everything up will take | ||
| /// around 3 days. Depending on how severe disk usage becomes, we might want to bump the batch | ||
| /// size, at the cost of risking issues at session boundaries (performance). | ||
| #[cfg(test)] | ||
| const MAX_CLEAN_BATCH_SIZE: u32 = 10; | ||
| #[cfg(not(test))] | ||
| const MAX_CLEAN_BATCH_SIZE: u32 = 300; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the impact of doing this many items on each earliest session update? (for nodes who have a lot of dangling storage items to clean)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From what I have seen so far, it seems to be pretty fast - although that might have been mostly empty sessions. Worst thing that could happen is that a valdiator is heavily loaded at a session boundary and fails to do some work. I also don't have any good data yet about how much wasted storage we are actually talking about, if it is tiny we can go with smaller batch sizes as then it does not matter if it takes forever.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. burnin on Kusama will tell. |
||
|
|
||
| pub struct DbBackend { | ||
| inner: Arc<dyn Database>, | ||
|
|
@@ -47,6 +65,44 @@ impl DbBackend { | |
| pub fn new(db: Arc<dyn Database>, config: ColumnConfiguration) -> Self { | ||
| Self { inner: db, config } | ||
| } | ||
|
|
||
| /// Cleanup old votes. | ||
| /// | ||
| /// Should be called whenever a new ealiest session gets written. | ||
eskimor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| fn add_vote_cleanup_tx( | ||
| &mut self, | ||
| tx: &mut DBTransaction, | ||
| earliest_session: SessionIndex, | ||
| ) -> FatalResult<()> { | ||
| // Cleanup old votes in db: | ||
| let watermark = load_cleaned_votes_watermark(&*self.inner, &self.config)?.unwrap_or(0); | ||
| let clean_until = if earliest_session.saturating_sub(watermark) > MAX_CLEAN_BATCH_SIZE { | ||
| watermark + MAX_CLEAN_BATCH_SIZE | ||
| } else { | ||
| earliest_session | ||
| }; | ||
| gum::trace!( | ||
| target: LOG_TARGET, | ||
| ?watermark, | ||
| ?clean_until, | ||
| ?earliest_session, | ||
| ?MAX_CLEAN_BATCH_SIZE, | ||
| "WriteEarliestSession" | ||
| ); | ||
|
|
||
| for index in watermark..clean_until { | ||
| gum::trace!( | ||
| target: LOG_TARGET, | ||
| ?index, | ||
| encoded = ?candidate_votes_session_prefix(index), | ||
| "Cleaning votes for session index" | ||
| ); | ||
| tx.delete_prefix(self.config.col_data, &candidate_votes_session_prefix(index)); | ||
| } | ||
| // New watermark: | ||
| tx.put_vec(self.config.col_data, CLEANED_VOTES_WATERMARK_KEY, clean_until.encode()); | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| impl Backend for DbBackend { | ||
|
|
@@ -71,6 +127,8 @@ impl Backend for DbBackend { | |
|
|
||
| /// Atomically writes the list of operations, with later operations taking precedence over | ||
| /// prior. | ||
| /// | ||
| /// This also takes care of purging old votes (of obsolete sessions). | ||
| fn write<I>(&mut self, ops: I) -> FatalResult<()> | ||
| where | ||
| I: IntoIterator<Item = BackendWriteOp>, | ||
|
|
@@ -79,12 +137,16 @@ impl Backend for DbBackend { | |
| for op in ops { | ||
| match op { | ||
| BackendWriteOp::WriteEarliestSession(session) => { | ||
| self.add_vote_cleanup_tx(&mut tx, session)?; | ||
|
|
||
| // Actually write the earliest session. | ||
| tx.put_vec(self.config.col_data, EARLIEST_SESSION_KEY, session.encode()); | ||
| }, | ||
| BackendWriteOp::WriteRecentDisputes(recent_disputes) => { | ||
| tx.put_vec(self.config.col_data, RECENT_DISPUTES_KEY, recent_disputes.encode()); | ||
| }, | ||
| BackendWriteOp::WriteCandidateVotes(session, candidate_hash, votes) => { | ||
| gum::trace!(target: LOG_TARGET, ?session, "Writing candidate votes"); | ||
| tx.put_vec( | ||
| self.config.col_data, | ||
| &candidate_votes_key(session, &candidate_hash), | ||
|
|
@@ -112,6 +174,15 @@ fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) -> | |
| buf | ||
| } | ||
|
|
||
| fn candidate_votes_session_prefix(session: SessionIndex) -> [u8; 15 + 4] { | ||
| let mut buf = [0u8; 15 + 4]; | ||
| buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY); | ||
|
|
||
| // big-endian encoding is used to ensure lexicographic ordering. | ||
| buf[15..][..4].copy_from_slice(&session.to_be_bytes()); | ||
| buf | ||
| } | ||
|
|
||
| /// Column configuration information for the DB. | ||
| #[derive(Debug, Clone)] | ||
| pub struct ColumnConfiguration { | ||
|
|
@@ -244,9 +315,7 @@ pub(crate) fn note_current_session( | |
|
|
||
| if pruned_disputes.len() != 0 { | ||
| overlay_db.write_recent_disputes(new_recent_disputes); | ||
| for ((session, candidate_hash), _) in pruned_disputes { | ||
| overlay_db.delete_candidate_votes(session, candidate_hash); | ||
| } | ||
| // Note: Deleting old candidate votes is handled in `write` based on the earliest session. | ||
| } | ||
| } | ||
| }, | ||
|
|
@@ -258,20 +327,116 @@ pub(crate) fn note_current_session( | |
| Ok(()) | ||
| } | ||
|
|
||
| /// Until what session votes have been cleaned up already. | ||
| /// | ||
| /// That is the db has already been purged of votes for sessions older than the returned | ||
| /// `SessionIndex`. | ||
| fn load_cleaned_votes_watermark( | ||
| db: &dyn Database, | ||
| config: &ColumnConfiguration, | ||
| ) -> FatalResult<Option<SessionIndex>> { | ||
| load_decode(db, config.col_data, CLEANED_VOTES_WATERMARK_KEY) | ||
| .map_err(|e| FatalError::DbReadFailed(e)) | ||
eskimor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
|
|
||
| use super::*; | ||
| use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; | ||
| use polkadot_primitives::v2::{Hash, Id as ParaId}; | ||
|
|
||
| fn make_db() -> DbBackend { | ||
| let db = kvdb_memorydb::create(1); | ||
| let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); | ||
| let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]); | ||
| let store = Arc::new(db); | ||
| let config = ColumnConfiguration { col_data: 0 }; | ||
| DbBackend::new(store, config) | ||
| } | ||
|
|
||
| #[test] | ||
| fn max_clean_batch_size_is_honored() { | ||
| let mut backend = make_db(); | ||
|
|
||
| let mut overlay_db = OverlayedBackend::new(&backend); | ||
| let current_session = MAX_CLEAN_BATCH_SIZE + DISPUTE_WINDOW.get() + 3; | ||
| let earliest_session = current_session - DISPUTE_WINDOW.get(); | ||
|
|
||
| overlay_db.write_earliest_session(0); | ||
| let candidate_hash = CandidateHash(Hash::repeat_byte(1)); | ||
|
|
||
| for session in 0..current_session + 1 { | ||
| overlay_db.write_candidate_votes( | ||
| session, | ||
| candidate_hash, | ||
| CandidateVotes { | ||
| candidate_receipt: dummy_candidate_receipt(dummy_hash()), | ||
| valid: Vec::new(), | ||
| invalid: Vec::new(), | ||
| }, | ||
| ); | ||
| } | ||
| assert!(overlay_db.load_candidate_votes(0, &candidate_hash).unwrap().is_some()); | ||
| assert!(overlay_db | ||
| .load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash) | ||
| .unwrap() | ||
| .is_some()); | ||
| assert!(overlay_db | ||
| .load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash) | ||
| .unwrap() | ||
| .is_some()); | ||
|
|
||
| // Cleanup only works for votes that have been written already - so write. | ||
| let write_ops = overlay_db.into_write_ops(); | ||
| backend.write(write_ops).unwrap(); | ||
|
|
||
| let mut overlay_db = OverlayedBackend::new(&backend); | ||
|
|
||
| gum::trace!(target: LOG_TARGET, ?current_session, "Noting current session"); | ||
| note_current_session(&mut overlay_db, current_session).unwrap(); | ||
|
|
||
| let write_ops = overlay_db.into_write_ops(); | ||
| backend.write(write_ops).unwrap(); | ||
|
|
||
| let mut overlay_db = OverlayedBackend::new(&backend); | ||
|
|
||
| assert!(overlay_db | ||
| .load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash) | ||
| .unwrap() | ||
| .is_none()); | ||
| // After batch size votes should still be there: | ||
| assert!(overlay_db | ||
| .load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash) | ||
| .unwrap() | ||
| .is_some()); | ||
|
|
||
| let current_session = current_session + 1; | ||
| let earliest_session = earliest_session + 1; | ||
|
|
||
| note_current_session(&mut overlay_db, current_session).unwrap(); | ||
|
|
||
| let write_ops = overlay_db.into_write_ops(); | ||
| backend.write(write_ops).unwrap(); | ||
|
|
||
| let overlay_db = OverlayedBackend::new(&backend); | ||
|
|
||
| // All should be gone now: | ||
| assert!(overlay_db | ||
| .load_candidate_votes(earliest_session - 1, &candidate_hash) | ||
| .unwrap() | ||
| .is_none()); | ||
| // Earliest session should still be there: | ||
| assert!(overlay_db | ||
| .load_candidate_votes(earliest_session, &candidate_hash) | ||
| .unwrap() | ||
| .is_some()); | ||
| // Old current session should still be there as well: | ||
| assert!(overlay_db | ||
| .load_candidate_votes(current_session - 1, &candidate_hash) | ||
| .unwrap() | ||
| .is_some()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn overlay_pre_and_post_commit_consistency() { | ||
| let mut backend = make_db(); | ||
|
|
@@ -368,57 +533,40 @@ mod tests { | |
| let mut backend = make_db(); | ||
|
|
||
| let mut overlay_db = OverlayedBackend::new(&backend); | ||
| overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1))); | ||
|
|
||
| overlay_db.write_candidate_votes( | ||
| 1, | ||
| CandidateHash(Hash::repeat_byte(1)), | ||
| CandidateVotes { | ||
| candidate_receipt: dummy_candidate_receipt(dummy_hash()), | ||
| candidate_receipt: dummy_candidate_receipt(Hash::random()), | ||
| valid: Vec::new(), | ||
| invalid: Vec::new(), | ||
| }, | ||
| ); | ||
|
|
||
| let write_ops = overlay_db.into_write_ops(); | ||
| backend.write(write_ops).unwrap(); | ||
|
|
||
| assert_eq!( | ||
| backend | ||
| .load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1))) | ||
| .unwrap() | ||
| .unwrap() | ||
| .candidate_receipt | ||
| .descriptor | ||
| .para_id, | ||
| ParaId::from(1), | ||
| ); | ||
| let receipt = dummy_candidate_receipt(dummy_hash()); | ||
|
|
||
| let mut overlay_db = OverlayedBackend::new(&backend); | ||
| overlay_db.write_candidate_votes( | ||
| 1, | ||
| CandidateHash(Hash::repeat_byte(1)), | ||
| CandidateVotes { | ||
| candidate_receipt: { | ||
| let mut receipt = dummy_candidate_receipt(dummy_hash()); | ||
| receipt.descriptor.para_id = ParaId::from(5_u32); | ||
|
|
||
| receipt | ||
| }, | ||
| candidate_receipt: receipt.clone(), | ||
| valid: Vec::new(), | ||
| invalid: Vec::new(), | ||
| }, | ||
| ); | ||
|
|
||
| overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1))); | ||
|
|
||
| let write_ops = overlay_db.into_write_ops(); | ||
| backend.write(write_ops).unwrap(); | ||
|
|
||
| assert!(backend | ||
| .load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1))) | ||
| .unwrap() | ||
| .is_none()); | ||
| assert_eq!( | ||
| backend | ||
| .load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1))) | ||
| .unwrap() | ||
| .unwrap() | ||
| .candidate_receipt, | ||
| receipt, | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
@@ -434,6 +582,7 @@ mod tests { | |
| let new_earliest_session = 5; | ||
| let current_session = 5 + DISPUTE_WINDOW.get(); | ||
|
|
||
| let super_old_no_dispute = 1; | ||
| let very_old = 3; | ||
| let slightly_old = 4; | ||
| let very_recent = current_session - 1; | ||
|
|
@@ -457,6 +606,7 @@ mod tests { | |
| .collect(), | ||
| ); | ||
|
|
||
| overlay_db.write_candidate_votes(super_old_no_dispute, hash_a, blank_candidate_votes()); | ||
| overlay_db.write_candidate_votes(very_old, hash_a, blank_candidate_votes()); | ||
|
|
||
| overlay_db.write_candidate_votes(slightly_old, hash_b, blank_candidate_votes()); | ||
|
|
@@ -483,6 +633,16 @@ mod tests { | |
| .collect(), | ||
| ); | ||
|
|
||
| // Votes are only cleaned up after actual write: | ||
| let write_ops = overlay_db.into_write_ops(); | ||
| backend.write(write_ops).unwrap(); | ||
|
|
||
| let overlay_db = OverlayedBackend::new(&backend); | ||
|
|
||
| assert!(overlay_db | ||
| .load_candidate_votes(super_old_no_dispute, &hash_a) | ||
| .unwrap() | ||
| .is_none()); | ||
| assert!(overlay_db.load_candidate_votes(very_old, &hash_a).unwrap().is_none()); | ||
| assert!(overlay_db.load_candidate_votes(slightly_old, &hash_b).unwrap().is_none()); | ||
| assert!(overlay_db | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.