Skip to content
Closed
Changes from all commits
Commits
Show all changes
135 commits
Select commit Hold shift + click to select a range
c4105ee
Made it so that it writes stts for default
fedacking Sep 17, 2025
87cfc01
format
fedacking Sep 17, 2025
b7f29e7
Update utils.rs
fedacking Sep 17, 2025
72f00b7
compare
fedacking Sep 17, 2025
999115d
moved function outside
fedacking Sep 17, 2025
2384772
Update sync.rs
fedacking Sep 17, 2025
ae963c8
Update Cargo.toml
fedacking Sep 17, 2025
b60cd75
Update Cargo.toml
fedacking Sep 17, 2025
29c4b4c
Update sync.rs
fedacking Sep 17, 2025
287da4e
Sorted trie
fedacking Sep 18, 2025
dded7b7
Compiles
fedacking Sep 18, 2025
1dc3e74
dedup
fedacking Sep 18, 2025
500be72
Update Cargo.toml
fedacking Sep 18, 2025
5a49a83
Update sync.rs
fedacking Sep 18, 2025
79b6795
par iter
fedacking Sep 18, 2025
278da8a
Update sync.rs
fedacking Sep 18, 2025
b9e0f83
Merge remote-tracking branch 'origin/main' into perf/fast-insert-snap
fedacking Sep 19, 2025
90f0752
Fixed compile
fedacking Sep 22, 2025
0a38b20
added logging for error
fedacking Sep 23, 2025
85cd817
Fixed edge case and pushed fix
fedacking Sep 23, 2025
211a6b6
print the account with an error
fedacking Sep 23, 2025
141019c
what's going on
fedacking Sep 23, 2025
6fa1883
Fix attempt
fedacking Sep 23, 2025
20b1a7c
Update sync.rs
fedacking Sep 23, 2025
0126919
Update trie_sorted.rs
fedacking Sep 23, 2025
33f750a
Added inc
fedacking Sep 23, 2025
34eebd1
rocksdb iterator
fedacking Sep 23, 2025
80557ad
fixed bytecodes
fedacking Sep 23, 2025
dc8d0ec
returned par iter
fedacking Sep 23, 2025
4298c21
added logging
fedacking Sep 24, 2025
74f2e5a
Scoped thread
fedacking Sep 24, 2025
46bb452
Removed logging
fedacking Sep 24, 2025
5a5d9a0
Don't spawn that many threads
fedacking Sep 24, 2025
0e53046
Improved error handling
fedacking Sep 24, 2025
8add39d
Fixed race condition
fedacking Sep 24, 2025
edc2f91
perf(l1): use BTree for tracking storages during snap sync
Oppen Sep 24, 2025
df2811d
fast put batch
fedacking Sep 24, 2025
13d6659
Update rocksdb.rs
fedacking Sep 24, 2025
8896db3
threadpool
fedacking Sep 24, 2025
86869ec
perf(l1): compress storage trie downloads
Oppen Sep 25, 2025
2ec688b
fix order
Oppen Sep 25, 2025
f0d1e0e
fix insertion
Oppen Sep 25, 2025
bccc632
Back into par iter
fedacking Sep 25, 2025
f7a4702
fix typo + time roots
Oppen Sep 25, 2025
9ea9ddf
extra data + reuse storages
Oppen Sep 25, 2025
e66cd62
Make lint
fedacking Sep 25, 2025
6756108
Merge remote-tracking branch 'origin/main' into perf/fast-insert-snap
fedacking Sep 25, 2025
37c1d79
Update db.rs
fedacking Sep 25, 2025
9473681
Merge remote-tracking branch 'origin/main' into perf/fast-insert-snap
fedacking Sep 25, 2025
6c8e0fb
WIP: scoped thread pool
pablodeymo Sep 25, 2025
2ebcafa
Finished scoped threadpool
fedacking Sep 25, 2025
677bf5c
Merge remote-tracking branch 'origin/thread_pool' into perf/fast-inse…
fedacking Sep 25, 2025
d650922
Adopted thread pool
fedacking Sep 25, 2025
6718148
Pool arc
fedacking Sep 25, 2025
c301529
Update sync.rs
fedacking Sep 25, 2025
8b5e934
naming threads
fedacking Sep 25, 2025
aa6fdbe
Update sync.rs
fedacking Sep 25, 2025
53b8163
Executed separatedly last task
fedacking Sep 25, 2025
8e704cc
Added crossbeam
fedacking Sep 26, 2025
ab9df8b
Merged the snap sync optimizations that improve storage leaves downlo…
jrchatruc Sep 26, 2025
af4c4e4
2 pools
fedacking Sep 26, 2025
888472a
Comment out debug! on insertion for now
jrchatruc Sep 26, 2025
b75034f
Testing priority queue
fedacking Sep 26, 2025
a7e2084
Add TODO
jrchatruc Sep 26, 2025
08fbdf6
Update sync.rs
fedacking Sep 26, 2025
07b8ea5
Update sync.rs
fedacking Sep 26, 2025
5795636
Merge branch 'perf/fast-insert-snap-pool' into merge_optimizations_fa…
fedacking Sep 26, 2025
ec81b86
Compiles
fedacking Sep 26, 2025
7a7e0f3
Update sync.rs
fedacking Sep 26, 2025
4e3621b
Simple semaphore
fedacking Sep 26, 2025
193327d
Update sync.rs
fedacking Sep 26, 2025
fa9ac95
Changed to fixed amount of buffers
fedacking Sep 29, 2025
e0ec836
Removed unneeded compaction
fedacking Sep 29, 2025
c93aa46
cleanup
fedacking Sep 29, 2025
9680d09
Merge remote-tracking branch 'origin/main' into merge_optimizations_f…
fedacking Sep 29, 2025
303f9e0
Compiles
fedacking Sep 29, 2025
adaf0b0
fixed tomls
fedacking Sep 29, 2025
1e0910a
Update code_collector.rs
fedacking Sep 30, 2025
ac7a8b9
Update rocksdb.rs
fedacking Sep 30, 2025
6d0e8d8
Removed unnecesaty alloc in put_batch_no_alloc
fedacking Sep 30, 2025
ca3942a
Removed cfg_if
fedacking Sep 30, 2025
ebcccd6
Update db.rs
fedacking Sep 30, 2025
f47bcb3
Fixed buffer count
fedacking Sep 30, 2025
60b9372
Update trie_sorted.rs
fedacking Sep 30, 2025
2c79fe0
Update sync.rs
fedacking Sep 30, 2025
7a14f42
Optimized insertion to disk
fedacking Sep 30, 2025
c02f069
fixed potential bug
fedacking Sep 30, 2025
60f0bc6
added debug logging
fedacking Sep 30, 2025
4528ac0
Update utils.rs
fedacking Sep 30, 2025
c8c00bc
fix
fedacking Sep 30, 2025
939ac99
Lint
fedacking Sep 30, 2025
dda32e4
Cleaned up code
fedacking Sep 30, 2025
fd55d47
Update Cargo.lock
fedacking Sep 30, 2025
558c5f0
simplified
fedacking Sep 30, 2025
3bf7b01
Lowered memory usage further (hopefully)
fedacking Sep 30, 2025
a659670
Update Cargo.lock
fedacking Sep 30, 2025
b9c0138
Perf changelog
fedacking Sep 30, 2025
21cef0a
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Sep 30, 2025
01f10bf
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 1, 2025
1886bf0
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 1, 2025
c079384
Remove comments and commented code
jrchatruc Oct 1, 2025
a497870
Update crates/networking/p2p/peer_handler.rs
jrchatruc Oct 1, 2025
31652d8
Update CHANGELOG.md
jrchatruc Oct 1, 2025
bbbb137
initial chunk test
rodrigo-o Oct 1, 2025
a1397ad
reduce trie sorted size to write and buffer count
rodrigo-o Oct 1, 2025
7f441cf
size_to_write and buffer_count changes didn't affect trie_sroted memo…
rodrigo-o Oct 1, 2025
57d9631
handle a case where snapshots become empty after a stale pivot
rodrigo-o Oct 1, 2025
ce53e50
go back to previous peer-level parallelism
rodrigo-o Oct 2, 2025
3475ee5
Unconditionally heal big accounts
jrchatruc Oct 2, 2025
30c3dc9
Added some comments and cleanup to the high level chunking
rodrigo-o Oct 2, 2025
c87fbcf
Added some comment and a bit of extra cleanup
rodrigo-o Oct 2, 2025
1d54144
formatting
rodrigo-o Oct 2, 2025
42a297e
Added one more comment
rodrigo-o Oct 2, 2025
c0ceeff
Update utils.rs
fedacking Oct 2, 2025
5580ea1
Update trie_sorted.rs
fedacking Oct 3, 2025
4bc928e
Fixed failing test
fedacking Oct 3, 2025
f9fcccd
Only call request_storage_ranges twice as a temporary solution while …
jrchatruc Oct 3, 2025
6656c32
Update server_runner.py
fedacking Oct 2, 2025
b50703d
Fix bug where we would get stuck trying to insert storage tries for a…
jrchatruc Oct 5, 2025
57a7d52
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 6, 2025
612a8d0
Readd rocksdb feature for p2p
jrchatruc Oct 6, 2025
1d0b580
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 6, 2025
cd4d1a3
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 6, 2025
dce6c08
Change log showing accounts sent for storage healing after request st…
jrchatruc Oct 6, 2025
259eb83
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 6, 2025
d51a7de
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 6, 2025
5110a89
fixed empty accounts case
fedacking Oct 6, 2025
35fd6be
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 6, 2025
508022e
Removed unneeded remove
fedacking Oct 6, 2025
d92e42b
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 6, 2025
34aebb2
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 6, 2025
dbc92d1
Merge branch 'main' into merge_optimizations_fast_insert_semaphore
jrchatruc Oct 7, 2025
bfdb207
Merge remote-tracking branch 'origin/merge_optimizations_fast_insert_…
rodrigo-o Oct 7, 2025
c2bacdd
remove guardrails for empty snapshots
rodrigo-o Oct 7, 2025
ffb17eb
Merge branch 'main' into chunk_storage_requests
rodrigo-o Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 103 additions & 40 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub const SNAP_LIMIT: usize = 128;
// increasing them may be the cause of peers disconnection
pub const MAX_BLOCK_BODIES_TO_REQUEST: usize = 128;

const STORAGE_ROOTS_PER_CHUNK: usize = 10_000;
// How many storage roots we include in a single task sent to a peer.
const STORAGE_ROOTS_PER_TASK: usize = 300;

/// An abstraction over the [Kademlia] containing logic to make requests to peers
#[derive(Debug, Clone)]
pub struct PeerHandler {
Expand Down Expand Up @@ -1278,43 +1282,88 @@ impl PeerHandler {
.current_step
.set(CurrentStepValue::RequestingStorageRanges);
debug!("Starting request_storage_ranges function");
// 1) split the range in chunks of same length
let mut accounts_by_root_hash: BTreeMap<_, Vec<_>> = BTreeMap::new();
for (account, (maybe_root_hash, _)) in &account_storage_roots.accounts_with_storage_root {
match maybe_root_hash {
Some(root) => {
accounts_by_root_hash
.entry(*root)
.or_default()
.push(*account);
}
// 1) collect pairs of (account_hash, storage_root)
let account_root_pairs: Vec<(H256, Option<H256>)> = account_storage_roots
.accounts_with_storage_root
.iter()
.map(|(account, (maybe_root_hash, _))| (*account, *maybe_root_hash))
.collect();
let mut chunk_groups: BTreeMap<H256, Vec<H256>> = BTreeMap::new();

// 2) group accounts by storage root and process them in chunks of STORAGE_ROOTS_PER_CHUNK
for (account, maybe_root_hash) in account_root_pairs {
// 2.1) Make sure we have the storage root for the account
let root = match maybe_root_hash {
Some(root) => root,
None => {
let root = store
.get_account_state_by_acc_hash(pivot_header.hash(), *account)
store
.get_account_state_by_acc_hash(pivot_header.hash(), account)
.expect("Failed to get account in state trie")
.expect("Could not find account that should have been downloaded or healed")
.storage_root;
accounts_by_root_hash
.entry(root)
.or_default()
.push(*account);
.storage_root
}
};

chunk_groups.entry(root).or_default().push(account);

// 2.2) If we have enough roots, process them
if chunk_groups.len() >= STORAGE_ROOTS_PER_CHUNK {
let chunk_accounts = Vec::from_iter(chunk_groups.into_iter());
self.process_storage_chunk(
chunk_accounts,
account_storage_roots,
account_storages_snapshots_dir,
&mut chunk_index,
pivot_header,
)
.await?;
chunk_groups = BTreeMap::new();
}
}
let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash);
// TODO: Turn this into a stable sort for binary search.

// 2.3) Process remaining roots if any
if !chunk_groups.is_empty() {
let chunk_accounts = Vec::from_iter(chunk_groups.into_iter());
self.process_storage_chunk(
chunk_accounts,
account_storage_roots,
account_storages_snapshots_dir,
&mut chunk_index,
pivot_header,
)
.await?;
}

Ok(chunk_index)
}

async fn process_storage_chunk(
&mut self,
mut accounts_by_root_hash: Vec<(H256, Vec<H256>)>,
account_storage_roots: &mut AccountStorageRoots,
account_storages_snapshots_dir: &Path,
chunk_index: &mut u64,
pivot_header: &mut BlockHeader,
) -> Result<(), PeerHandlerError> {
if accounts_by_root_hash.is_empty() {
return Ok(());
}

// Maintain previous prioritization of busy roots
accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len());
let chunk_size = 300;
let chunk_count = (accounts_by_root_hash.len() / chunk_size) + 1;
let total_roots = accounts_by_root_hash.len();
let task_span = STORAGE_ROOTS_PER_TASK.min(STORAGE_ROOTS_PER_CHUNK);
// how many fully-populated task_span slices fit in
let task_partition_count = (total_roots + task_span - 1) / task_span;

// list of tasks to be executed
// Types are (start_index, end_index, starting_hash)
// NOTE: end_index is NOT inclusive

let mut tasks_queue_not_started = VecDeque::<StorageTask>::new();
for i in 0..chunk_count {
let chunk_start = chunk_size * i;
let chunk_end = (chunk_start + chunk_size).min(accounts_by_root_hash.len());
for i in 0..task_partition_count {
let chunk_start = task_span * i;
let chunk_end = ((i + 1) * task_span).min(total_roots);
tasks_queue_not_started.push_back(StorageTask {
start_index: chunk_start,
end_index: chunk_end,
Expand All @@ -1340,7 +1389,7 @@ impl PeerHandler {
// vector of hashed storage keys and storage values.
let mut current_account_storages: BTreeMap<H256, AccountsWithStorage> = BTreeMap::new();

debug!("Starting request_storage_ranges loop");
debug!(chunk = chunk_index, "Starting request_storage_ranges loop");
loop {
if current_account_storages
.values()
Expand Down Expand Up @@ -1371,15 +1420,16 @@ impl PeerHandler {
})
.map_err(PeerHandlerError::DumpError)?;
}
let file_index = *chunk_index;
disk_joinset.spawn(async move {
let path = get_account_storages_snapshot_file(
&account_storages_snapshots_dir_cloned,
chunk_index,
file_index,
);
dump_storages_to_file(&path, snapshot)
});

chunk_index += 1;
*chunk_index += 1;
}

if let Ok(result) = task_receiver.try_recv() {
Expand All @@ -1397,9 +1447,7 @@ impl PeerHandler {

for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() {
for account in accounts {
if !accounts_done.contains_key(account) {
accounts_done.insert(*account, vec![]);
}
accounts_done.entry(*account).or_insert_with(Vec::new);
}
}

Expand Down Expand Up @@ -1430,7 +1478,11 @@ impl PeerHandler {
let acc_hash = accounts_by_root_hash[remaining_start].1[0];
let (_, old_intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&acc_hash).ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
.get_mut(&acc_hash)
.ok_or(PeerHandlerError::UnrecoverableError(
"Tried to get the old download intervals for an account but did not find them"
.to_owned(),
))?;
for (old_start, end) in old_intervals {
if end == &hash_end {
*old_start = hash_start;
Expand All @@ -1452,8 +1504,6 @@ impl PeerHandler {
if !old_intervals.is_empty() {
acc_hash = *account;
}
} else {
continue;
}
}
if acc_hash.is_zero() {
Expand All @@ -1462,7 +1512,10 @@ impl PeerHandler {
let (_, old_intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&acc_hash)
.ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
.ok_or(PeerHandlerError::UnrecoverableError(
"Tried to get the old download intervals for an account but did not find them"
.to_owned(),
))?;
old_intervals.remove(
old_intervals
.iter()
Expand Down Expand Up @@ -1534,7 +1587,10 @@ impl PeerHandler {
let (_, intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&accounts_by_root_hash[remaining_start].1[0])
.ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
.ok_or(PeerHandlerError::UnrecoverableError(
"Tried to get the old download intervals for an account but did not find them"
.to_owned(),
))?;

for i in 0..chunk_count {
let start_hash_u256 = start_hash_u256 + chunk_size * i;
Expand Down Expand Up @@ -1570,7 +1626,10 @@ impl PeerHandler {
let (_, intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&accounts_by_root_hash[remaining_start].1[0])
.ok_or(PeerHandlerError::UnrecoverableError("Trie to get the old download intervals for an account but did not find them".to_owned()))?;
.ok_or(PeerHandlerError::UnrecoverableError(
"Trie to get the old download intervals for an account but did not find them"
.to_owned(),
))?;

for i in 0..chunk_count {
let start_hash_u256 = start_hash_u256 + chunk_size * i;
Expand Down Expand Up @@ -1714,10 +1773,14 @@ impl PeerHandler {
std::fs::create_dir_all(account_storages_snapshots_dir)
.map_err(|_| PeerHandlerError::CreateStorageSnapshotsDir)?;
}
let path =
get_account_storages_snapshot_file(account_storages_snapshots_dir, chunk_index);

let path = get_account_storages_snapshot_file(
account_storages_snapshots_dir,
*chunk_index,
);
dump_storages_to_file(&path, snapshot)
.map_err(|_| PeerHandlerError::WriteStorageSnapshotsDir(chunk_index))?;
.map_err(|_| PeerHandlerError::WriteStorageSnapshotsDir(*chunk_index))?;
*chunk_index += 1;
}
disk_joinset
.join_all()
Expand Down Expand Up @@ -1745,7 +1808,7 @@ impl PeerHandler {
self.peer_table.free_peer(&result.peer_id).await?;
}

Ok(chunk_index + 1)
Ok(())
}

async fn request_storage_ranges_worker(
Expand Down
Loading