diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 7b2d8d6a40d7c0..e6edaf4a65832b 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -2311,8 +2311,10 @@ impl AccountsDb { accounts_hash_cache_path: Option, ) -> Self { let num_threads = get_thread_count(); - // 400M bytes - const MAX_READ_ONLY_CACHE_DATA_SIZE: usize = 400_000_000; + // The high and low watermark sizes for the accounts read cache. If the cache size exceeds + // MAX_SIZE_HI, it'll evict entries until the size is <= MAX_SIZE_LO. + const MAX_READ_ONLY_CACHE_DATA_SIZE_LO: usize = 400_000_000; + const MAX_READ_ONLY_CACHE_DATA_SIZE_HI: usize = 400_000_000; // read only cache does not update lru on read of an entry unless it has been at least this many ms since the last lru update const READ_ONLY_CACHE_MS_TO_SKIP_LRU_UPDATE: u32 = 100; @@ -2352,7 +2354,8 @@ impl AccountsDb { accounts_cache: AccountsCache::default(), sender_bg_hasher: None, read_only_accounts_cache: ReadOnlyAccountsCache::new( - MAX_READ_ONLY_CACHE_DATA_SIZE, + MAX_READ_ONLY_CACHE_DATA_SIZE_LO, + MAX_READ_ONLY_CACHE_DATA_SIZE_HI, READ_ONLY_CACHE_MS_TO_SKIP_LRU_UPDATE, ), uncleaned_pubkeys: DashMap::new(), diff --git a/accounts-db/src/read_only_accounts_cache.rs b/accounts-db/src/read_only_accounts_cache.rs index 5d10608e266ad9..18089255627c11 100644 --- a/accounts-db/src/read_only_accounts_cache.rs +++ b/accounts-db/src/read_only_accounts_cache.rs @@ -3,6 +3,7 @@ use { dashmap::{mapref::entry::Entry, DashMap}, index_list::{Index, IndexList}, + log::*, solana_measure::{measure::Measure, measure_us}, solana_sdk::{ account::{AccountSharedData, ReadableAccount}, @@ -10,9 +11,12 @@ use { pubkey::Pubkey, timing::timestamp, }, - std::sync::{ - atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering}, - Mutex, + std::{ + sync::{ + atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering}, + Arc, Mutex, + }, + thread, }, }; @@ -64,33 +68,66 @@ impl ReadOnlyCacheStats { #[derive(Debug)] pub(crate) struct ReadOnlyAccountsCache { - cache: DashMap, + cache: Arc>, /// When an item is first entered into the cache, it is added to the end of /// the queue. Also each time an entry is looked up from the cache it is /// moved to the end of the queue. As a result, items in the queue are /// always sorted in the order that they have last been accessed. When doing /// LRU eviction, cache entries are evicted from the front of the queue. - queue: Mutex>, - max_data_size: usize, - data_size: AtomicUsize, + queue: Arc>>, + _max_data_size_lo: usize, + max_data_size_hi: usize, + data_size: Arc, // read only cache does not update lru on read of an entry unless it has been at least this many ms since the last lru update ms_to_skip_lru_update: u32, // Performance statistics - stats: ReadOnlyCacheStats, + stats: Arc, highest_slot_stored: AtomicU64, + + /// Channel to send eviction requests + /// + /// NOTE: This field must be above `evictor` to ensure it is dropped before `evictor`. + evict_sender: crossbeam_channel::Sender<()>, + /// To the evictor goes the spoiled [sic] + /// + /// Evict from the cache in the background. + _evictor: thread::JoinHandle<()>, } impl ReadOnlyAccountsCache { - pub(crate) fn new(max_data_size: usize, ms_to_skip_lru_update: u32) -> Self { + pub(crate) fn new( + max_data_size_lo: usize, + max_data_size_hi: usize, + ms_to_skip_lru_update: u32, + ) -> Self { + assert!(max_data_size_lo <= max_data_size_hi); + let cache = Arc::new(DashMap::default()); + let queue = Arc::new(Mutex::>::default()); + let data_size = Arc::new(AtomicUsize::default()); + let stats = Arc::new(ReadOnlyCacheStats::default()); + let (evict_sender, evict_receiver) = crossbeam_channel::bounded::<()>(1); + let evictor = Self::spawn_evictor( + evict_receiver, + max_data_size_lo, + max_data_size_hi, + data_size.clone(), + cache.clone(), + queue.clone(), + stats.clone(), + ); + Self { highest_slot_stored: AtomicU64::default(), - max_data_size, - cache: DashMap::default(), - queue: Mutex::>::default(), - data_size: AtomicUsize::default(), + _max_data_size_lo: max_data_size_lo, + max_data_size_hi, + cache, + queue, + data_size, ms_to_skip_lru_update, - stats: ReadOnlyCacheStats::default(), + stats, + evict_sender, + _evictor: evictor, } } @@ -137,7 +174,7 @@ impl ReadOnlyAccountsCache { account } - fn account_size(&self, account: &AccountSharedData) -> usize { + fn account_size(account: &AccountSharedData) -> usize { CACHE_ENTRY_SIZE + account.data().len() } @@ -145,7 +182,7 @@ impl ReadOnlyAccountsCache { let measure_store = Measure::start(""); self.highest_slot_stored.fetch_max(slot, Ordering::Release); let key = (pubkey, slot); - let account_size = self.account_size(&account); + let account_size = Self::account_size(&account); self.data_size.fetch_add(account_size, Ordering::Relaxed); // self.queue is modified while holding a reference to the cache entry; // so that another thread cannot write to the same key. @@ -158,7 +195,7 @@ impl ReadOnlyAccountsCache { } Entry::Occupied(mut entry) => { let entry = entry.get_mut(); - let account_size = self.account_size(&entry.account); + let account_size = Self::account_size(&entry.account); self.data_size.fetch_sub(account_size, Ordering::Relaxed); entry.account = account; // Move the entry to the end of the queue. @@ -167,20 +204,11 @@ impl ReadOnlyAccountsCache { entry.set_index(queue.insert_last(key)); } }; - // Evict entries from the front of the queue. - let mut num_evicts = 0; - let (_, evict_us) = measure_us!({ - while self.data_size.load(Ordering::Relaxed) > self.max_data_size { - let Some(&(pubkey, slot)) = self.queue.lock().unwrap().get_first() else { - break; - }; - num_evicts += 1; - self.remove(pubkey, slot); - } - }); + + if self.data_size() > self.max_data_size_hi { + self.send_evict(); + } let store_us = measure_store.end_as_us(); - self.stats.evicts.fetch_add(num_evicts, Ordering::Relaxed); - self.stats.evict_us.fetch_add(evict_us, Ordering::Relaxed); self.stats.store_us.fetch_add(store_us, Ordering::Relaxed); } @@ -202,13 +230,23 @@ impl ReadOnlyAccountsCache { } pub(crate) fn remove(&self, pubkey: Pubkey, slot: Slot) -> Option { - let (_, entry) = self.cache.remove(&(pubkey, slot))?; + Self::do_remove(&(pubkey, slot), &self.cache, &self.queue, &self.data_size) + } + + /// Removes `key` from the cache, if present, and returns the removed account + fn do_remove( + key: &ReadOnlyCacheKey, + cache: &DashMap, + queue: &Mutex>, + data_size: &AtomicUsize, + ) -> Option { + let (_, entry) = cache.remove(key)?; // self.queue should be modified only after removing the entry from the // cache, so that this is still safe if another thread writes to the // same key. - self.queue.lock().unwrap().remove(entry.index()); - let account_size = self.account_size(&entry.account); - self.data_size.fetch_sub(account_size, Ordering::Relaxed); + queue.lock().unwrap().remove(entry.index()); + let account_size = Self::account_size(&entry.account); + data_size.fetch_sub(account_size, Ordering::Relaxed); Some(entry.account) } @@ -223,6 +261,80 @@ impl ReadOnlyAccountsCache { pub(crate) fn get_and_reset_stats(&self) -> (u64, u64, u64, u64, u64, u64) { self.stats.get_and_reset_stats() } + + /// Sends a message to the evictor to trigger evictions + fn send_evict(&self) { + let res = self.evict_sender.try_send(()); + if let Err(err) = res { + // It's possible multiple threads tried to send the evict message at the same time. + // Since the channel's size is bounded to 1, only a single message will be sent, + // which is fine. + trace!("Failed to send accounts read cache eviction request: {err}"); + } + } + + /// Spawns the background thread to handle evictions + fn spawn_evictor( + receiver: crossbeam_channel::Receiver<()>, + max_data_size_lo: usize, + max_data_size_hi: usize, + data_size: Arc, + cache: Arc>, + queue: Arc>>, + stats: Arc, + ) -> thread::JoinHandle<()> { + thread::Builder::new() + .name("solAcctReadCache".to_string()) + .spawn(move || { + info!("AccountsReadCacheEvictor has started"); + loop { + let res = receiver.recv(); + if let Err(err) = res { + // The only error is when the channel is empty and disconnected. + // Disconnecting the channel is the intended way to stop the evictor. + trace!("AccountsReadCacheEvictor is shutting down... {err}"); + break; + }; + + // If a message was sent to the channel *while we were already evicting*, then + // when we loop around we'll find a message that we should evict again. + // However the current data size likely is not higher than the high water mark. + // So, check the current size to see if this was a spurious wakeup. + if data_size.load(Ordering::Relaxed) <= max_data_size_hi { + continue; + } + + let (num_evicts, evict_us) = + measure_us!(Self::evict(max_data_size_lo, &data_size, &cache, &queue)); + stats.evicts.fetch_add(num_evicts, Ordering::Relaxed); + stats.evict_us.fetch_add(evict_us, Ordering::Relaxed); + } + info!("AccountsReadCacheEvictor has stopped"); + }) + .expect("spawn accounts read cache evictor thread") + } + + /// Evicts entries until the cache's size is <= `target_data_size` + /// + /// Oldest entries are evicted first. + /// Returns the number of entries evicted. + fn evict( + target_data_size: usize, + data_size: &AtomicUsize, + cache: &DashMap, + queue: &Mutex>, + ) -> u64 { + let mut num_evicts = 0; + while data_size.load(Ordering::Relaxed) > target_data_size { + let Some(&key) = queue.lock().unwrap().get_first() else { + // if there are no more entries, we're done + break; + }; + Self::do_remove(&key, cache, queue, data_size); + num_evicts += 1; + } + num_evicts + } } impl ReadOnlyAccountCacheEntry { @@ -269,8 +381,26 @@ mod tests { }, rand_chacha::ChaChaRng, solana_sdk::account::{accounts_equal, Account, WritableAccount}, - std::{collections::HashMap, iter::repeat_with, sync::Arc}, + std::{ + collections::HashMap, + iter::repeat_with, + sync::Arc, + time::{Duration, Instant}, + }, }; + + impl ReadOnlyAccountsCache { + // Evict entries, but in the foreground + // + // Evicting in the background is non-deterministic w.r.t. when the evictor runs, + // which can make asserting invariants difficult in tests. + fn evict_in_foreground(&self) { + #[allow(clippy::used_underscore_binding)] + let target_data_size = self._max_data_size_lo; + Self::evict(target_data_size, &self.data_size, &self.cache, &self.queue); + } + } + #[test] fn test_accountsdb_sizeof() { // size_of(arc(x)) does not return the size of x @@ -279,13 +409,16 @@ mod tests { } #[test] - fn test_read_only_accounts_cache() { + fn test_read_only_accounts_cache_deterministic() { solana_logger::setup(); let per_account_size = CACHE_ENTRY_SIZE; let data_size = 100; let max = data_size + per_account_size; - let cache = - ReadOnlyAccountsCache::new(max, READ_ONLY_CACHE_MS_TO_SKIP_LRU_UPDATE_FOR_TESTS); + let cache = ReadOnlyAccountsCache::new( + max, + usize::MAX, // <-- do not evict in the background + READ_ONLY_CACHE_MS_TO_SKIP_LRU_UPDATE_FOR_TESTS, + ); let slot = 0; assert!(cache.load(Pubkey::default(), slot).is_none()); assert_eq!(0, cache.cache_len()); @@ -303,14 +436,17 @@ mod tests { let mut account3 = account1.clone(); account3.checked_add_lamports(4).unwrap(); // so they compare differently cache.store(key1, slot, account1.clone()); + cache.evict_in_foreground(); assert_eq!(100 + per_account_size, cache.data_size()); assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1)); assert_eq!(1, cache.cache_len()); cache.store(key2, slot, account2.clone()); + cache.evict_in_foreground(); assert_eq!(100 + per_account_size, cache.data_size()); assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account2)); assert_eq!(1, cache.cache_len()); cache.store(key2, slot, account1.clone()); // overwrite key2 with account1 + cache.evict_in_foreground(); assert_eq!(100 + per_account_size, cache.data_size()); assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account1)); assert_eq!(1, cache.cache_len()); @@ -320,23 +456,30 @@ mod tests { // can store 2 items, 3rd item kicks oldest item out let max = (data_size + per_account_size) * 2; - let cache = - ReadOnlyAccountsCache::new(max, READ_ONLY_CACHE_MS_TO_SKIP_LRU_UPDATE_FOR_TESTS); + let cache = ReadOnlyAccountsCache::new( + max, + usize::MAX, // <-- do not evict in the background + READ_ONLY_CACHE_MS_TO_SKIP_LRU_UPDATE_FOR_TESTS, + ); cache.store(key1, slot, account1.clone()); + cache.evict_in_foreground(); assert_eq!(100 + per_account_size, cache.data_size()); assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1)); assert_eq!(1, cache.cache_len()); cache.store(key2, slot, account2.clone()); + cache.evict_in_foreground(); assert_eq!(max, cache.data_size()); assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1)); assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account2)); assert_eq!(2, cache.cache_len()); cache.store(key2, slot, account1.clone()); // overwrite key2 with account1 + cache.evict_in_foreground(); assert_eq!(max, cache.data_size()); assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1)); assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account1)); assert_eq!(2, cache.cache_len()); cache.store(key3, slot, account3.clone()); + cache.evict_in_foreground(); assert_eq!(max, cache.data_size()); assert!(cache.load(key1, slot).is_none()); // was lru purged assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account1)); @@ -355,6 +498,7 @@ mod tests { let mut rng = ChaChaRng::from_seed(SEED); let cache = ReadOnlyAccountsCache::new( MAX_CACHE_SIZE, + usize::MAX, // <-- do not evict in the background READ_ONLY_CACHE_MS_TO_SKIP_LRU_UPDATE_FOR_TESTS, ); let slots: Vec = repeat_with(|| rng.gen_range(0..1000)).take(5).collect(); @@ -389,6 +533,7 @@ mod tests { let key = (pubkey, slot); hash_map.insert(key, (account.clone(), ix)); cache.store(pubkey, slot, account); + cache.evict_in_foreground(); } } assert_eq!(cache.cache_len(), 17); @@ -407,4 +552,49 @@ mod tests { ); } } + + #[test] + fn test_evict_in_background() { + const ACCOUNT_DATA_SIZE: usize = 200; + const MAX_ENTRIES: usize = 7; + const MAX_CACHE_SIZE: usize = MAX_ENTRIES * (CACHE_ENTRY_SIZE + ACCOUNT_DATA_SIZE); + let cache = ReadOnlyAccountsCache::new( + MAX_CACHE_SIZE, + MAX_CACHE_SIZE, + READ_ONLY_CACHE_MS_TO_SKIP_LRU_UPDATE_FOR_TESTS, + ); + + for i in 0..MAX_ENTRIES { + let pubkey = Pubkey::new_unique(); + let account = AccountSharedData::new(i as u64, ACCOUNT_DATA_SIZE, &Pubkey::default()); + cache.store(pubkey, i as Slot, account); + } + // we haven't exceeded the max cache size yet, so no evictions should've happened + assert_eq!(cache.cache_len(), MAX_ENTRIES); + assert_eq!(cache.data_size(), MAX_CACHE_SIZE); + assert_eq!(cache.stats.evicts.load(Ordering::Relaxed), 0); + + // store another account to trigger evictions + let slot = MAX_ENTRIES as Slot; + let pubkey = Pubkey::new_unique(); + let account = AccountSharedData::new(42, ACCOUNT_DATA_SIZE, &Pubkey::default()); + cache.store(pubkey, slot, account.clone()); + + // wait for the evictor to run... + let timer = Instant::now(); + while cache.stats.evicts.load(Ordering::Relaxed) == 0 { + assert!( + timer.elapsed() < Duration::from_secs(5), + "timed out waiting for the evictor to run", + ); + thread::sleep(Duration::from_millis(1)); + } + + // ...now ensure the cache size is right + assert_eq!(cache.cache_len(), MAX_ENTRIES); + assert_eq!(cache.data_size(), MAX_CACHE_SIZE); + + // and the most recent account we stored should still be in the cache + assert_eq!(cache.load(pubkey, slot).unwrap(), account); + } }