Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
renanme unload_blocks to uncached_blocks
Signed-off-by: linning <linningde25@gmail.com>
  • Loading branch information
NingLin-P committed Aug 9, 2022
commit d0c7710a3b4a76f072101819ac4db728b27e0b96
108 changes: 55 additions & 53 deletions client/state-db/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ enum DeathRowQueue<BlockHash: Hash, Key: Hash, D: MetaDb> {
/// successive and ordered by block number
cache: VecDeque<DeathRow<BlockHash, Key>>,
/// The number of blocks that not loaded into `cache`
unload_blocks: usize,
uncached_blocks: usize,
},
}

Expand Down Expand Up @@ -109,26 +109,26 @@ impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
fn new_db_backed(
db: D,
base: u64,
mut unload_blocks: usize,
mut uncached_blocks: usize,
) -> Result<DeathRowQueue<BlockHash, Key, D>, Error<D::Error>> {
let mut cache = VecDeque::with_capacity(CACHE_BATCH_SIZE);
trace!(target: "state-db", "Reading pruning journal for the database-backed queue. Pending #{}", base);
// Load block from db
DeathRowQueue::load_batch_from_db(&db, &mut unload_blocks, &mut cache, base)?;
Ok(DeathRowQueue::DbBacked { db, cache, unload_blocks })
DeathRowQueue::load_batch_from_db(&db, &mut uncached_blocks, &mut cache, base)?;
Ok(DeathRowQueue::DbBacked { db, cache, uncached_blocks })
}

/// import a new block to the back of the queue
fn import(&mut self, base: u64, journal_record: JournalRecord<BlockHash, Key>) {
let JournalRecord { hash, inserted, deleted } = journal_record;
match self {
DeathRowQueue::DbBacked { unload_blocks, cache, .. } => {
// `unload_blocks` is zero means currently all block are loaded into `cache`
DeathRowQueue::DbBacked { uncached_blocks, cache, .. } => {
// `uncached_blocks` is zero means currently all block are loaded into `cache`
// thus if `cache` is not full, load the next block into `cache` too
if *unload_blocks == 0 && cache.len() < CACHE_BATCH_SIZE {
if *uncached_blocks == 0 && cache.len() < CACHE_BATCH_SIZE {
cache.push_back(DeathRow { hash, deleted: deleted.into_iter().collect() });
} else {
*unload_blocks += 1;
*uncached_blocks += 1;
}
},
DeathRowQueue::Mem { death_rows, death_index } => {
Expand All @@ -155,10 +155,10 @@ impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
base: u64,
) -> Result<Option<DeathRow<BlockHash, Key>>, Error<D::Error>> {
match self {
DeathRowQueue::DbBacked { db, unload_blocks, cache } => {
if cache.is_empty() && *unload_blocks != 0 {
DeathRowQueue::DbBacked { db, uncached_blocks, cache } => {
if cache.is_empty() && *uncached_blocks != 0 {
// load more blocks from db since there are still blocks in it
DeathRowQueue::load_batch_from_db(db, unload_blocks, cache, base)?;
DeathRowQueue::load_batch_from_db(db, uncached_blocks, cache, base)?;
}
Ok(cache.pop_front())
},
Expand All @@ -179,15 +179,15 @@ impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
fn revert_recent_add(&mut self, base: u64, amout: usize) {
debug_assert!(amout <= self.len());
match self {
DeathRowQueue::DbBacked { unload_blocks, cache, .. } => {
// remove from `unload_blocks` if it can cover
if *unload_blocks >= amout {
*unload_blocks -= amout;
DeathRowQueue::DbBacked { uncached_blocks, cache, .. } => {
// remove from `uncached_blocks` if it can cover
if *uncached_blocks >= amout {
*uncached_blocks -= amout;
return
}
// reset `unload_blocks` and remove remain blocks from `cache`
let remain = amout - *unload_blocks;
*unload_blocks = 0;
// reset `uncached_blocks` and remove remain blocks from `cache`
let remain = amout - *uncached_blocks;
*uncached_blocks = 0;
cache.truncate(cache.len() - remain);
},
DeathRowQueue::Mem { death_rows, death_index } => {
Expand All @@ -208,17 +208,17 @@ impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
/// of the queue
fn load_batch_from_db(
db: &D,
unload_blocks: &mut usize,
uncached_blocks: &mut usize,
cache: &mut VecDeque<DeathRow<BlockHash, Key>>,
base: u64,
) -> Result<(), Error<D::Error>> {
// return if all blocks already loaded into `cache` and there are no other
// blocks in the backend database
if *unload_blocks == 0 {
if *uncached_blocks == 0 {
return Ok(())
}
let start = base + cache.len() as u64;
let batch_size = cmp::min(*unload_blocks, CACHE_BATCH_SIZE);
let batch_size = cmp::min(*uncached_blocks, CACHE_BATCH_SIZE);
let mut loaded = 0;
for i in 0..batch_size as u64 {
match load_death_row_from_db::<BlockHash, Key, D>(db, start + i)? {
Expand All @@ -232,7 +232,7 @@ impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
// `loaded` should be the same as what we expect, if there are missing blocks
// `load_death_row_from_db` should return a db error
debug_assert_eq!(batch_size, loaded);
*unload_blocks -= loaded;
*uncached_blocks -= loaded;
Ok(())
}

Expand All @@ -244,12 +244,12 @@ impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
index: usize,
) -> Result<Option<DeathRow<BlockHash, Key>>, Error<D::Error>> {
match self {
DeathRowQueue::DbBacked { db, unload_blocks, cache } => {
DeathRowQueue::DbBacked { db, uncached_blocks, cache } => {
// check if `index` target a block reside on disk
if index >= cache.len() && index < cache.len() + *unload_blocks {
if index >= cache.len() && index < cache.len() + *uncached_blocks {
// if `index` target the next batch of `DeathRow`, load a batch from db
if index - cache.len() < cmp::min(*unload_blocks, CACHE_BATCH_SIZE) {
DeathRowQueue::load_batch_from_db(db, unload_blocks, cache, base)?;
if index - cache.len() < cmp::min(*uncached_blocks, CACHE_BATCH_SIZE) {
DeathRowQueue::load_batch_from_db(db, uncached_blocks, cache, base)?;
} else {
// load a single `DeathRow` from db, but do not insert it to `cache`
// because `cache` is a queue of successive `DeathRow`
Expand Down Expand Up @@ -298,7 +298,8 @@ impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
/// Return the number of block in the pruning window
fn len(&self) -> usize {
match self {
DeathRowQueue::DbBacked { unload_blocks, cache, .. } => cache.len() + *unload_blocks,
DeathRowQueue::DbBacked { uncached_blocks, cache, .. } =>
cache.len() + *uncached_blocks,
DeathRowQueue::Mem { death_rows, .. } => death_rows.len(),
}
}
Expand All @@ -316,7 +317,8 @@ impl<BlockHash: Hash, Key: Hash, D: MetaDb> DeathRowQueue<BlockHash, Key, D> {
#[cfg(test)]
fn get_db_backed_queue_state(&self) -> Option<(&VecDeque<DeathRow<BlockHash, Key>>, usize)> {
match self {
DeathRowQueue::DbBacked { cache, unload_blocks, .. } => Some((cache, *unload_blocks)),
DeathRowQueue::DbBacked { cache, uncached_blocks, .. } =>
Some((cache, *uncached_blocks)),
DeathRowQueue::Mem { .. } => None,
}
}
Expand Down Expand Up @@ -800,9 +802,9 @@ mod tests {
RefWindow::new(&make_db(&[]), false).unwrap();

// start as an empty queue
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), 0);
assert_eq!(unload_blocks, 0);
assert_eq!(uncached_blocks, 0);

// import blocks
// queue size and content should match
Expand All @@ -812,20 +814,20 @@ mod tests {
push_last_canonicalized(i as u64, &mut commit);
pruning.queue.get_db().unwrap().commit(&commit);
// block will fill in cache first
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
if i < CACHE_BATCH_SIZE {
assert_eq!(cache.len(), i + 1);
assert_eq!(unload_blocks, 0);
assert_eq!(uncached_blocks, 0);
} else {
assert_eq!(cache.len(), CACHE_BATCH_SIZE);
assert_eq!(unload_blocks, i - CACHE_BATCH_SIZE + 1);
assert_eq!(uncached_blocks, i - CACHE_BATCH_SIZE + 1);
}
}
pruning.apply_pending();
assert_eq!(pruning.queue.len(), CACHE_BATCH_SIZE + 10);
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), CACHE_BATCH_SIZE);
assert_eq!(unload_blocks, 10);
assert_eq!(uncached_blocks, 10);
for i in 0..CACHE_BATCH_SIZE {
assert_eq!(cache[i].hash, i as u64);
}
Expand All @@ -835,17 +837,17 @@ mod tests {
let mut commit = CommitSet::default();
pruning.note_canonical(&(CACHE_BATCH_SIZE as u64 + 10), &mut commit);
assert_eq!(pruning.queue.len(), CACHE_BATCH_SIZE + 11);
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), CACHE_BATCH_SIZE);
assert_eq!(unload_blocks, 11);
assert_eq!(uncached_blocks, 11);

// revert the last add that no apply yet
// NOTE: do not commit the previous `CommitSet` to db
pruning.queue.revert_recent_add(0, 1);
assert_eq!(pruning.queue.len(), CACHE_BATCH_SIZE + 10);
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), CACHE_BATCH_SIZE);
assert_eq!(unload_blocks, 10);
assert_eq!(uncached_blocks, 10);

// remove one block from the start of the queue
// block is removed from the head of cache
Expand All @@ -854,9 +856,9 @@ mod tests {
pruning.queue.get_db().unwrap().commit(&commit);
pruning.apply_pending();
assert_eq!(pruning.queue.len(), CACHE_BATCH_SIZE + 9);
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), CACHE_BATCH_SIZE - 1);
assert_eq!(unload_blocks, 10);
assert_eq!(uncached_blocks, 10);
for i in 0..(CACHE_BATCH_SIZE - 1) {
assert_eq!(cache[i].hash, (i + 1) as u64);
}
Expand All @@ -866,9 +868,9 @@ mod tests {
let pruning: RefWindow<u64, H256, TestDb> =
RefWindow::new(pruning.queue.get_db().unwrap(), false).unwrap();
assert_eq!(pruning.queue.len(), CACHE_BATCH_SIZE + 9);
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), CACHE_BATCH_SIZE);
assert_eq!(unload_blocks, 9);
assert_eq!(uncached_blocks, 9);
for i in 0..CACHE_BATCH_SIZE {
assert_eq!(cache[i].hash, (i + 1) as u64);
}
Expand Down Expand Up @@ -896,15 +898,15 @@ mod tests {
CACHE_BATCH_SIZE as u64 - 1
);
assert_eq!(pruning.queue.get(0, CACHE_BATCH_SIZE * 2 + 10).unwrap(), None);
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), CACHE_BATCH_SIZE);
assert_eq!(unload_blocks, CACHE_BATCH_SIZE + 10);
assert_eq!(uncached_blocks, CACHE_BATCH_SIZE + 10);

// getting a block not in cache will triger loading block from db
assert_eq!(pruning.queue.get(0, index).unwrap().unwrap().hash, CACHE_BATCH_SIZE as u64);
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), CACHE_BATCH_SIZE * 2);
assert_eq!(unload_blocks, 10);
assert_eq!(uncached_blocks, 10);

// clear all block loaded in cache
for _ in 0..CACHE_BATCH_SIZE * 2 {
Expand All @@ -913,28 +915,28 @@ mod tests {
pruning.queue.get_db().unwrap().commit(&commit);
}
pruning.apply_pending();
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert!(cache.is_empty());
assert_eq!(unload_blocks, 10);
assert_eq!(uncached_blocks, 10);

// getting the hash of block that not in cache will also triger loading
// the remaining blocks from db
assert_eq!(
pruning.queue.get(pruning.pending_number, 0).unwrap().unwrap().hash,
(CACHE_BATCH_SIZE * 2) as u64
);
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), 10);
assert_eq!(unload_blocks, 0);
assert_eq!(uncached_blocks, 0);

// load a new queue from db
// `cache` should be the same
let pruning: RefWindow<u64, H256, TestDb> =
RefWindow::new(pruning.queue.get_db().unwrap(), false).unwrap();
assert_eq!(pruning.queue.len(), 10);
let (cache, unload_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap();
assert_eq!(cache.len(), 10);
assert_eq!(unload_blocks, 0);
assert_eq!(uncached_blocks, 0);
for i in 0..10 {
assert_eq!(cache[i].hash, (CACHE_BATCH_SIZE * 2 + i) as u64);
}
Expand Down