Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 100 additions & 38 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub const SNAP_LIMIT: usize = 128;
// increasing them may be the cause of peers disconnection
pub const MAX_BLOCK_BODIES_TO_REQUEST: usize = 128;

const STORAGE_ROOTS_PER_CHUNK: usize = 10_000;
// How many storage roots we include in a single task sent to a peer.
const STORAGE_ROOTS_PER_TASK: usize = 300;

/// An abstraction over the [Kademlia] containing logic to make requests to peers
#[derive(Debug, Clone)]
pub struct PeerHandler {
Expand Down Expand Up @@ -1277,43 +1281,87 @@ impl PeerHandler {
.current_step
.set(CurrentStepValue::RequestingStorageRanges);
debug!("Starting request_storage_ranges function");
// 1) split the range in chunks of same length
let mut accounts_by_root_hash: BTreeMap<_, Vec<_>> = BTreeMap::new();
for (account, (maybe_root_hash, _)) in &account_storage_roots.accounts_with_storage_root {
match maybe_root_hash {
Some(root) => {
accounts_by_root_hash
.entry(*root)
.or_default()
.push(*account);
}
// 1) collect pairs of (account_hash, storage_root)
let account_root_pairs: Vec<(H256, Option<H256>)> = account_storage_roots
.accounts_with_storage_root
.iter()
.map(|(account, (maybe_root_hash, _))| (*account, *maybe_root_hash))
.collect();
let mut chunk_groups: BTreeMap<H256, Vec<H256>> = BTreeMap::new();

// 2) group accounts by storage root and process them in chunks of STORAGE_ROOTS_PER_CHUNK
for (account, maybe_root_hash) in account_root_pairs {
// 2.1) Make sure we have the storage root for the account
let root = match maybe_root_hash {
Some(root) => root,
None => {
let root = store
.get_account_state_by_acc_hash(pivot_header.hash(), *account)
store
.get_account_state_by_acc_hash(pivot_header.hash(), account)
.expect("Failed to get account in state trie")
.expect("Could not find account that should have been downloaded or healed")
.storage_root;
accounts_by_root_hash
.entry(root)
.or_default()
.push(*account);
.storage_root
}
};

chunk_groups.entry(root).or_default().push(account);

// 2.2) If we have enough roots, process them
if chunk_groups.len() >= STORAGE_ROOTS_PER_CHUNK {
let chunk_accounts = Vec::from_iter(chunk_groups.into_iter());
self.process_storage_chunk(
chunk_accounts,
account_storage_roots,
account_storages_snapshots_dir,
&mut chunk_index,
pivot_header,
)
.await?;
chunk_groups = BTreeMap::new();
}
}
let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash);
// TODO: Turn this into a stable sort for binary search.

// 2.3) Process remaining roots if any
if !chunk_groups.is_empty() {
let chunk_accounts = Vec::from_iter(chunk_groups.into_iter());
self.process_storage_chunk(
chunk_accounts,
account_storage_roots,
account_storages_snapshots_dir,
&mut chunk_index,
pivot_header,
)
.await?;
}

Ok(chunk_index)
}

async fn process_storage_chunk(
&mut self,
mut accounts_by_root_hash: Vec<(H256, Vec<H256>)>,
account_storage_roots: &mut AccountStorageRoots,
account_storages_snapshots_dir: &Path,
chunk_index: &mut u64,
pivot_header: &mut BlockHeader,
) -> Result<(), PeerHandlerError> {
if accounts_by_root_hash.is_empty() {
return Ok(());
}

// Maintain previous prioritization of busy roots
accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len());
let chunk_size = 300;
let chunk_count = (accounts_by_root_hash.len() / chunk_size) + 1;
let total_roots = accounts_by_root_hash.len();
let task_span = STORAGE_ROOTS_PER_TASK.min(STORAGE_ROOTS_PER_CHUNK);
// how many fully-populated task_span slices fit in
let task_partition_count = total_roots.div_ceil(task_span);

// list of tasks to be executed
// Types are (start_index, end_index, starting_hash)
// NOTE: end_index is NOT inclusive

let mut tasks_queue_not_started = VecDeque::<StorageTask>::new();
for i in 0..chunk_count {
let chunk_start = chunk_size * i;
let chunk_end = (chunk_start + chunk_size).min(accounts_by_root_hash.len());
for i in 0..task_partition_count {
let chunk_start = task_span * i;
let chunk_end = ((i + 1) * task_span).min(total_roots);
Comment on lines +1354 to +1364
Copy link

Copilot AI Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable name task_span is unclear. Consider renaming it to roots_per_task or task_size to better indicate it represents the number of storage roots processed per task.

Copilot uses AI. Check for mistakes.

tasks_queue_not_started.push_back(StorageTask {
start_index: chunk_start,
end_index: chunk_end,
Expand All @@ -1339,7 +1387,7 @@ impl PeerHandler {
// vector of hashed storage keys and storage values.
let mut current_account_storages: BTreeMap<H256, AccountsWithStorage> = BTreeMap::new();

debug!("Starting request_storage_ranges loop");
debug!(chunk = chunk_index, "Starting request_storage_ranges loop");
loop {
if current_account_storages
.values()
Expand Down Expand Up @@ -1370,15 +1418,16 @@ impl PeerHandler {
})
.map_err(PeerHandlerError::DumpError)?;
}
let file_index = *chunk_index;
disk_joinset.spawn(async move {
let path = get_account_storages_snapshot_file(
&account_storages_snapshots_dir_cloned,
chunk_index,
file_index,
);
dump_storages_to_file(&path, snapshot)
});

chunk_index += 1;
*chunk_index += 1;
}

if let Ok(result) = task_receiver.try_recv() {
Expand All @@ -1396,9 +1445,7 @@ impl PeerHandler {

for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() {
for account in accounts {
if !accounts_done.contains_key(account) {
accounts_done.insert(*account, vec![]);
}
accounts_done.entry(*account).or_default();
}
}

Expand Down Expand Up @@ -1429,7 +1476,11 @@ impl PeerHandler {
let acc_hash = accounts_by_root_hash[remaining_start].1[0];
let (_, old_intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&acc_hash).ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
.get_mut(&acc_hash)
.ok_or(PeerHandlerError::UnrecoverableError(
"Tried to get the old download intervals for an account but did not find them"
.to_owned(),
))?;
for (old_start, end) in old_intervals {
if end == &hash_end {
*old_start = hash_start;
Expand Down Expand Up @@ -1461,7 +1512,10 @@ impl PeerHandler {
let (_, old_intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&acc_hash)
.ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
.ok_or(PeerHandlerError::UnrecoverableError(
"Tried to get the old download intervals for an account but did not find them"
.to_owned(),
))?;
old_intervals.remove(
old_intervals
.iter()
Expand Down Expand Up @@ -1533,7 +1587,10 @@ impl PeerHandler {
let (_, intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&accounts_by_root_hash[remaining_start].1[0])
.ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
.ok_or(PeerHandlerError::UnrecoverableError(
"Tried to get the old download intervals for an account but did not find them"
.to_owned(),
))?;

for i in 0..chunk_count {
let start_hash_u256 = start_hash_u256 + chunk_size * i;
Expand Down Expand Up @@ -1569,7 +1626,10 @@ impl PeerHandler {
let (_, intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&accounts_by_root_hash[remaining_start].1[0])
.ok_or(PeerHandlerError::UnrecoverableError("Trie to get the old download intervals for an account but did not find them".to_owned()))?;
.ok_or(PeerHandlerError::UnrecoverableError(
"Tried to get the old download intervals for an account but did not find them"
.to_owned(),
))?;

for i in 0..chunk_count {
let start_hash_u256 = start_hash_u256 + chunk_size * i;
Expand Down Expand Up @@ -1713,10 +1773,12 @@ impl PeerHandler {
std::fs::create_dir_all(account_storages_snapshots_dir)
.map_err(|_| PeerHandlerError::CreateStorageSnapshotsDir)?;
}

let path =
get_account_storages_snapshot_file(account_storages_snapshots_dir, chunk_index);
get_account_storages_snapshot_file(account_storages_snapshots_dir, *chunk_index);
dump_storages_to_file(&path, snapshot)
.map_err(|_| PeerHandlerError::WriteStorageSnapshotsDir(chunk_index))?;
.map_err(|_| PeerHandlerError::WriteStorageSnapshotsDir(*chunk_index))?;
*chunk_index += 1;
}
disk_joinset
.join_all()
Expand Down Expand Up @@ -1744,7 +1806,7 @@ impl PeerHandler {
self.peer_table.free_peer(&result.peer_id).await?;
}

Ok(chunk_index + 1)
Ok(())
}

async fn request_storage_ranges_worker(
Expand Down
Loading