diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 9328229705..3d2bb43a7d 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -65,6 +65,10 @@ pub const SNAP_LIMIT: usize = 128; // increasing them may be the cause of peers disconnection pub const MAX_BLOCK_BODIES_TO_REQUEST: usize = 128; +const STORAGE_ROOTS_PER_CHUNK: usize = 10_000; +// How many storage roots we include in a single task sent to a peer. +const STORAGE_ROOTS_PER_TASK: usize = 300; + /// An abstraction over the [Kademlia] containing logic to make requests to peers #[derive(Debug, Clone)] pub struct PeerHandler { @@ -1278,43 +1282,88 @@ impl PeerHandler { .current_step .set(CurrentStepValue::RequestingStorageRanges); debug!("Starting request_storage_ranges function"); - // 1) split the range in chunks of same length - let mut accounts_by_root_hash: BTreeMap<_, Vec<_>> = BTreeMap::new(); - for (account, (maybe_root_hash, _)) in &account_storage_roots.accounts_with_storage_root { - match maybe_root_hash { - Some(root) => { - accounts_by_root_hash - .entry(*root) - .or_default() - .push(*account); - } + // 1) collect pairs of (account_hash, storage_root) + let account_root_pairs: Vec<(H256, Option)> = account_storage_roots + .accounts_with_storage_root + .iter() + .map(|(account, (maybe_root_hash, _))| (*account, *maybe_root_hash)) + .collect(); + let mut chunk_groups: BTreeMap> = BTreeMap::new(); + + // 2) group accounts by storage root and process them in chunks of STORAGE_ROOTS_PER_CHUNK + for (account, maybe_root_hash) in account_root_pairs { + // 2.1) Make sure we have the storage root for the account + let root = match maybe_root_hash { + Some(root) => root, None => { - let root = store - .get_account_state_by_acc_hash(pivot_header.hash(), *account) + store + .get_account_state_by_acc_hash(pivot_header.hash(), account) .expect("Failed to get account in state trie") .expect("Could not find account that should have been downloaded or healed") - .storage_root; - accounts_by_root_hash - .entry(root) - .or_default() - .push(*account); + .storage_root } + }; + + chunk_groups.entry(root).or_default().push(account); + + // 2.2) If we have enough roots, process them + if chunk_groups.len() >= STORAGE_ROOTS_PER_CHUNK { + let chunk_accounts = Vec::from_iter(chunk_groups.into_iter()); + self.process_storage_chunk( + chunk_accounts, + account_storage_roots, + account_storages_snapshots_dir, + &mut chunk_index, + pivot_header, + ) + .await?; + chunk_groups = BTreeMap::new(); } } - let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash); - // TODO: Turn this into a stable sort for binary search. + + // 2.3) Process remaining roots if any + if !chunk_groups.is_empty() { + let chunk_accounts = Vec::from_iter(chunk_groups.into_iter()); + self.process_storage_chunk( + chunk_accounts, + account_storage_roots, + account_storages_snapshots_dir, + &mut chunk_index, + pivot_header, + ) + .await?; + } + + Ok(chunk_index) + } + + async fn process_storage_chunk( + &mut self, + mut accounts_by_root_hash: Vec<(H256, Vec)>, + account_storage_roots: &mut AccountStorageRoots, + account_storages_snapshots_dir: &Path, + chunk_index: &mut u64, + pivot_header: &mut BlockHeader, + ) -> Result<(), PeerHandlerError> { + if accounts_by_root_hash.is_empty() { + return Ok(()); + } + + // Maintain previous prioritization of busy roots accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len()); - let chunk_size = 300; - let chunk_count = (accounts_by_root_hash.len() / chunk_size) + 1; + let total_roots = accounts_by_root_hash.len(); + let task_span = STORAGE_ROOTS_PER_TASK.min(STORAGE_ROOTS_PER_CHUNK); + // how many fully-populated task_span slices fit in + let task_partition_count = (total_roots + task_span - 1) / task_span; // list of tasks to be executed // Types are (start_index, end_index, starting_hash) // NOTE: end_index is NOT inclusive let mut tasks_queue_not_started = VecDeque::::new(); - for i in 0..chunk_count { - let chunk_start = chunk_size * i; - let chunk_end = (chunk_start + chunk_size).min(accounts_by_root_hash.len()); + for i in 0..task_partition_count { + let chunk_start = task_span * i; + let chunk_end = ((i + 1) * task_span).min(total_roots); tasks_queue_not_started.push_back(StorageTask { start_index: chunk_start, end_index: chunk_end, @@ -1340,7 +1389,7 @@ impl PeerHandler { // vector of hashed storage keys and storage values. let mut current_account_storages: BTreeMap = BTreeMap::new(); - debug!("Starting request_storage_ranges loop"); + debug!(chunk = chunk_index, "Starting request_storage_ranges loop"); loop { if current_account_storages .values() @@ -1371,15 +1420,16 @@ impl PeerHandler { }) .map_err(PeerHandlerError::DumpError)?; } + let file_index = *chunk_index; disk_joinset.spawn(async move { let path = get_account_storages_snapshot_file( &account_storages_snapshots_dir_cloned, - chunk_index, + file_index, ); dump_storages_to_file(&path, snapshot) }); - chunk_index += 1; + *chunk_index += 1; } if let Ok(result) = task_receiver.try_recv() { @@ -1397,9 +1447,7 @@ impl PeerHandler { for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() { for account in accounts { - if !accounts_done.contains_key(account) { - accounts_done.insert(*account, vec![]); - } + accounts_done.entry(*account).or_insert_with(Vec::new); } } @@ -1430,7 +1478,11 @@ impl PeerHandler { let acc_hash = accounts_by_root_hash[remaining_start].1[0]; let (_, old_intervals) = account_storage_roots .accounts_with_storage_root - .get_mut(&acc_hash).ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + .get_mut(&acc_hash) + .ok_or(PeerHandlerError::UnrecoverableError( + "Tried to get the old download intervals for an account but did not find them" + .to_owned(), + ))?; for (old_start, end) in old_intervals { if end == &hash_end { *old_start = hash_start; @@ -1452,8 +1504,6 @@ impl PeerHandler { if !old_intervals.is_empty() { acc_hash = *account; } - } else { - continue; } } if acc_hash.is_zero() { @@ -1462,7 +1512,10 @@ impl PeerHandler { let (_, old_intervals) = account_storage_roots .accounts_with_storage_root .get_mut(&acc_hash) - .ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + .ok_or(PeerHandlerError::UnrecoverableError( + "Tried to get the old download intervals for an account but did not find them" + .to_owned(), + ))?; old_intervals.remove( old_intervals .iter() @@ -1534,7 +1587,10 @@ impl PeerHandler { let (_, intervals) = account_storage_roots .accounts_with_storage_root .get_mut(&accounts_by_root_hash[remaining_start].1[0]) - .ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + .ok_or(PeerHandlerError::UnrecoverableError( + "Tried to get the old download intervals for an account but did not find them" + .to_owned(), + ))?; for i in 0..chunk_count { let start_hash_u256 = start_hash_u256 + chunk_size * i; @@ -1570,7 +1626,10 @@ impl PeerHandler { let (_, intervals) = account_storage_roots .accounts_with_storage_root .get_mut(&accounts_by_root_hash[remaining_start].1[0]) - .ok_or(PeerHandlerError::UnrecoverableError("Trie to get the old download intervals for an account but did not find them".to_owned()))?; + .ok_or(PeerHandlerError::UnrecoverableError( + "Trie to get the old download intervals for an account but did not find them" + .to_owned(), + ))?; for i in 0..chunk_count { let start_hash_u256 = start_hash_u256 + chunk_size * i; @@ -1714,10 +1773,14 @@ impl PeerHandler { std::fs::create_dir_all(account_storages_snapshots_dir) .map_err(|_| PeerHandlerError::CreateStorageSnapshotsDir)?; } - let path = - get_account_storages_snapshot_file(account_storages_snapshots_dir, chunk_index); + + let path = get_account_storages_snapshot_file( + account_storages_snapshots_dir, + *chunk_index, + ); dump_storages_to_file(&path, snapshot) - .map_err(|_| PeerHandlerError::WriteStorageSnapshotsDir(chunk_index))?; + .map_err(|_| PeerHandlerError::WriteStorageSnapshotsDir(*chunk_index))?; + *chunk_index += 1; } disk_joinset .join_all() @@ -1745,7 +1808,7 @@ impl PeerHandler { self.peer_table.free_peer(&result.peer_id).await?; } - Ok(chunk_index + 1) + Ok(()) } async fn request_storage_ranges_worker(