diff --git a/Cargo.lock b/Cargo.lock index a4df609663..133da1e1cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3505,6 +3505,7 @@ dependencies = [ "ethrex-sdk", "ethrex-storage 0.1.0", "ethrex-storage-rollup", + "ethrex-trie 0.1.0", "ethrex-vm", "eyre", "genesis-tool", diff --git a/cmd/ethrex/Cargo.toml b/cmd/ethrex/Cargo.toml index 407bd2dab5..bb14f257e5 100644 --- a/cmd/ethrex/Cargo.toml +++ b/cmd/ethrex/Cargo.toml @@ -22,6 +22,7 @@ ethrex-l2.workspace = true ethrex-l2-common.workspace = true ethrex-sdk.workspace = true ethrex-l2-rpc.workspace = true +ethrex-trie.workspace = true tikv-jemallocator = { version = "0.6.0", optional = true, features = ["stats", "unprefixed_malloc_on_supported_platforms"] } bytes.workspace = true hex.workspace = true diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 4916f3a5f6..c1b683c717 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -21,7 +21,9 @@ use ethrex_p2p::{ types::{Node, NodeRecord}, utils::public_key_from_signing_key, }; +use ethrex_rlp::decode::RLPDecode; use ethrex_storage::{EngineType, Store}; +use ethrex_trie::EMPTY_TRIE_HASH; use local_ip_address::{local_ip, local_ipv6}; use rand::rngs::OsRng; use secp256k1::SecretKey; @@ -376,6 +378,43 @@ async fn set_sync_block(store: &Store) { } } +async fn reset_to_head(store: &Store) -> eyre::Result<()> { + let trie = store.open_state_trie(*EMPTY_TRIE_HASH)?; + let Some(root) = trie.db().get(Default::default())? else { + return Ok(()); + }; + let root = ethrex_trie::Node::decode(&root)?; + let state_root = root.compute_hash().finalize(); + + // TODO: store latest state metadata in the DB to avoid this loop + for block_number in (0..=store.get_latest_block_number().await?).rev() { + if let Some(header) = store.get_block_header(block_number)? { + if header.state_root == state_root { + info!("Resetting head to {block_number}"); + let last_kept_block = block_number; + let mut block_to_delete = last_kept_block + 1; + while store + .get_canonical_block_hash(block_to_delete) + .await? + .is_some() + { + debug!("Deleting block {block_to_delete}"); + store.remove_block(block_to_delete).await?; + block_to_delete += 1; + } + let last_kept_header = store + .get_block_header(last_kept_block)? + .ok_or_else(|| eyre::eyre!("Block number {} not found", last_kept_block))?; + store + .forkchoice_update(None, last_kept_block, last_kept_header.hash(), None, None) + .await?; + break; + } + } + } + Ok(()) +} + pub async fn init_l1( opts: Options, log_filter_handler: Option>, @@ -394,6 +433,8 @@ pub async fn init_l1( display_chain_initialization(&genesis); let store = init_store(datadir, genesis).await; + reset_to_head(&store).await?; + #[cfg(feature = "sync-test")] set_sync_block(&store).await; diff --git a/crates/blockchain/tracing.rs b/crates/blockchain/tracing.rs index b10e94533d..445b98ffe8 100644 --- a/crates/blockchain/tracing.rs +++ b/crates/blockchain/tracing.rs @@ -135,7 +135,7 @@ async fn get_missing_state_parents( let Some(parent_block) = store.get_block_by_hash(parent_hash).await? else { return Err(ChainError::Custom("Parent Block not Found".to_string())); }; - if store.contains_state_node(parent_block.header.state_root)? { + if store.has_state_root(parent_block.header.state_root)? { break; } parent_hash = parent_block.header.parent_hash; diff --git a/crates/common/trie/db.rs b/crates/common/trie/db.rs index 0222b22dbc..d3ae250b55 100644 --- a/crates/common/trie/db.rs +++ b/crates/common/trie/db.rs @@ -1,15 +1,15 @@ use ethereum_types::H256; -use crate::{NodeHash, NodeRLP, Trie, error::TrieError}; +use crate::{Nibbles, NodeRLP, Trie, error::TrieError}; use std::{ collections::BTreeMap, sync::{Arc, Mutex}, }; pub trait TrieDB: Send + Sync { - fn get(&self, key: NodeHash) -> Result>, TrieError>; - fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError>; - fn put(&self, key: NodeHash, value: Vec) -> Result<(), TrieError> { + fn get(&self, key: Nibbles) -> Result>, TrieError>; + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError>; + fn put(&self, key: Nibbles, value: Vec) -> Result<(), TrieError> { self.put_batch(vec![(key, value)]) } } @@ -17,11 +17,11 @@ pub trait TrieDB: Send + Sync { /// InMemory implementation for the TrieDB trait, with get and put operations. #[derive(Default)] pub struct InMemoryTrieDB { - pub inner: Arc>>>, + pub inner: Arc>>>, } impl InMemoryTrieDB { - pub const fn new(map: Arc>>>) -> Self { + pub const fn new(map: Arc>>>) -> Self { Self { inner: map } } pub fn new_empty() -> Self { @@ -35,10 +35,13 @@ impl InMemoryTrieDB { state_nodes: &BTreeMap, ) -> Result { let mut embedded_root = Trie::get_embedded_root(state_nodes, root_hash)?; - let mut hashed_nodes: Vec<(NodeHash, Vec)> = vec![]; - embedded_root.commit(&mut hashed_nodes); + let mut hashed_nodes = vec![]; + embedded_root.commit(Nibbles::default(), &mut hashed_nodes); - let hashed_nodes = hashed_nodes.into_iter().collect(); + let hashed_nodes = hashed_nodes + .into_iter() + .map(|(k, v)| (nibbles_to_fixed_size(k), v)) + .collect(); let in_memory_trie = Arc::new(Mutex::new(hashed_nodes)); Ok(Self::new(in_memory_trie)) @@ -46,22 +49,34 @@ impl InMemoryTrieDB { } impl TrieDB for InMemoryTrieDB { - fn get(&self, key: NodeHash) -> Result>, TrieError> { + fn get(&self, key: Nibbles) -> Result>, TrieError> { Ok(self .inner .lock() .map_err(|_| TrieError::LockError)? - .get(&key) + .get(&nibbles_to_fixed_size(key)) .cloned()) } - fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { let mut db = self.inner.lock().map_err(|_| TrieError::LockError)?; for (key, value) in key_values { - db.insert(key, value); + db.insert(nibbles_to_fixed_size(key), value); } Ok(()) } } + +pub fn nibbles_to_fixed_size(nibbles: Nibbles) -> [u8; 33] { + let node_hash_ref = nibbles.to_bytes(); + let original_len = node_hash_ref.len(); + + let mut buffer = [0u8; 33]; + + // Encode the node as [original_len, node_hash...] + buffer[32] = nibbles.len() as u8; + buffer[..original_len].copy_from_slice(&node_hash_ref); + buffer +} diff --git a/crates/common/trie/logger.rs b/crates/common/trie/logger.rs index bc13adc98a..d0cf36af94 100644 --- a/crates/common/trie/logger.rs +++ b/crates/common/trie/logger.rs @@ -5,7 +5,7 @@ use std::{ use ethrex_rlp::decode::RLPDecode; -use crate::{Node, NodeHash, Trie, TrieDB, TrieError}; +use crate::{Nibbles, Node, Trie, TrieDB, TrieError}; pub type TrieWitness = Arc>>>; @@ -33,7 +33,7 @@ impl TrieLogger { } impl TrieDB for TrieLogger { - fn get(&self, key: NodeHash) -> Result>, TrieError> { + fn get(&self, key: Nibbles) -> Result>, TrieError> { let result = self.inner_db.get(key)?; if let Some(result) = result.as_ref() { if let Ok(decoded) = Node::decode(result) { @@ -44,11 +44,11 @@ impl TrieDB for TrieLogger { Ok(result) } - fn put(&self, key: NodeHash, value: Vec) -> Result<(), TrieError> { + fn put(&self, key: Nibbles, value: Vec) -> Result<(), TrieError> { self.inner_db.put(key, value) } - fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { self.inner_db.put_batch(key_values) } } diff --git a/crates/common/trie/nibbles.rs b/crates/common/trie/nibbles.rs index cc3aaf6150..5a93d88381 100644 --- a/crates/common/trie/nibbles.rs +++ b/crates/common/trie/nibbles.rs @@ -7,10 +7,35 @@ use ethrex_rlp::{ structs::{Decoder, Encoder}, }; +// TODO: move path-tracking logic somewhere else +// PERF: try using a stack-allocated array /// Struct representing a list of nibbles (half-bytes) -#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Default)] pub struct Nibbles { data: Vec, + /// Parts of the path that have already been consumed (used for tracking + /// current position when visiting nodes). See `current()`. + already_consumed: Vec, +} + +impl PartialEq for Nibbles { + fn eq(&self, other: &Nibbles) -> bool { + self.data == other.data + } +} + +impl Eq for Nibbles {} + +impl PartialOrd for Nibbles { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Nibbles { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.data.cmp(&other.data) + } } impl std::hash::Hash for Nibbles { @@ -22,7 +47,10 @@ impl std::hash::Hash for Nibbles { impl Nibbles { /// Create `Nibbles` from hex-encoded nibbles pub const fn from_hex(hex: Vec) -> Self { - Self { data: hex } + Self { + data: hex, + already_consumed: vec![], + } } /// Splits incoming bytes into nibbles and appends the leaf flag (a 16 nibble at the end) @@ -40,7 +68,10 @@ impl Nibbles { data.push(16); } - Self { data } + Self { + data, + already_consumed: vec![], + } } /// Returns the amount of nibbles @@ -58,6 +89,7 @@ impl Nibbles { pub fn skip_prefix(&mut self, prefix: &Nibbles) -> bool { if self.len() >= prefix.len() && &self.data[..prefix.len()] == prefix.as_ref() { self.data = self.data[prefix.len()..].to_vec(); + self.already_consumed.extend(&prefix.data); true } else { false @@ -85,7 +117,10 @@ impl Nibbles { /// Removes and returns the first nibble #[allow(clippy::should_implement_trait)] pub fn next(&mut self) -> Option { - (!self.is_empty()).then(|| self.data.remove(0)) + (!self.is_empty()).then(|| { + self.already_consumed.push(self.data[0]); + self.data.remove(0) + }) } /// Removes and returns the first nibble if it is a suitable choice index (aka < 16) @@ -95,7 +130,9 @@ impl Nibbles { /// Returns the nibbles after the given offset pub fn offset(&self, offset: usize) -> Nibbles { - self.slice(offset, self.len()) + let mut ret = self.slice(offset, self.len()); + ret.already_consumed = [&self.already_consumed, &self.data[0..offset]].concat(); + ret } /// Returns the nibbles beween the start and end indexes @@ -190,6 +227,7 @@ impl Nibbles { pub fn concat(&self, other: Nibbles) -> Nibbles { Nibbles { data: [self.data.clone(), other.data].concat(), + already_consumed: self.already_consumed.clone(), } } @@ -197,6 +235,15 @@ impl Nibbles { pub fn append_new(&self, nibble: u8) -> Nibbles { Nibbles { data: [self.data.clone(), vec![nibble]].concat(), + already_consumed: self.already_consumed.clone(), + } + } + + /// Return already consumed parts of path + pub fn current(&self) -> Nibbles { + Nibbles { + data: self.already_consumed.clone(), + already_consumed: vec![], } } } @@ -217,7 +264,13 @@ impl RLPDecode for Nibbles { fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { let decoder = Decoder::new(rlp)?; let (data, decoder) = decoder.decode_field("data")?; - Ok((Self { data }, decoder.finish()?)) + Ok(( + Self { + data, + already_consumed: vec![], + }, + decoder.finish()?, + )) } } diff --git a/crates/common/trie/node.rs b/crates/common/trie/node.rs index 8818c54f8a..d3022e461d 100644 --- a/crates/common/trie/node.rs +++ b/crates/common/trie/node.rs @@ -31,15 +31,40 @@ pub enum NodeRef { } impl NodeRef { - pub fn get_node(&self, db: &dyn TrieDB) -> Result, TrieError> { + pub fn get_node(&self, db: &dyn TrieDB, path: Nibbles) -> Result, TrieError> { match *self { NodeRef::Node(ref node, _) => Ok(Some(node.as_ref().clone())), NodeRef::Hash(NodeHash::Inline((data, len))) => { Ok(Some(Node::decode_raw(&data[..len as usize])?)) } - NodeRef::Hash(hash @ NodeHash::Hashed(_)) => db - .get(hash)? - .map(|rlp| Node::decode(&rlp).map_err(TrieError::RLPDecode)) + NodeRef::Hash(hash) => db + .get(path)? + .filter(|rlp| !rlp.is_empty()) + .and_then(|rlp| match Node::decode(&rlp) { + Ok(node) => (node.compute_hash() == hash).then_some(Ok(node)), + Err(err) => Some(Err(TrieError::RLPDecode(err))), + }) + .transpose(), + } + } + + pub fn get_node_checked( + &self, + db: &dyn TrieDB, + path: Nibbles, + ) -> Result, TrieError> { + match *self { + NodeRef::Node(ref node, _) => Ok(Some((true, node.as_ref().clone()))), + NodeRef::Hash(NodeHash::Inline((data, len))) => { + Ok(Some((true, Node::decode_raw(&data[..len as usize])?))) + } + NodeRef::Hash(hash) => db + .get(path)? + .filter(|rlp| !rlp.is_empty()) + .map(|rlp| match Node::decode(&rlp) { + Ok(node) => Ok((node.compute_hash() == hash, node)), + Err(err) => Err(TrieError::RLPDecode(err)), + }) .transpose(), } } @@ -51,23 +76,28 @@ impl NodeRef { } } - pub fn commit(&mut self, acc: &mut Vec<(NodeHash, Vec)>) -> NodeHash { + pub fn commit(&mut self, path: Nibbles, acc: &mut Vec<(Nibbles, Vec)>) -> NodeHash { match *self { NodeRef::Node(ref mut node, ref mut hash) => { match Arc::make_mut(node) { Node::Branch(node) => { - for node in &mut node.choices { - node.commit(acc); + for (choice, node) in &mut node.choices.iter_mut().enumerate() { + node.commit(path.append_new(choice as u8), acc); } } Node::Extension(node) => { - node.child.commit(acc); + node.child.commit(path.concat(node.prefix.clone()), acc); + } + Node::Leaf(_) => { + //path.extend(&node.partial); } - Node::Leaf(_) => {} } - + //println!("commit {path:?} => {node:?}"); let hash = hash.get_or_init(|| node.compute_hash()); - acc.push((*hash, node.encode_to_vec())); + acc.push((path.clone(), node.encode_to_vec())); + if let Node::Leaf(leaf) = node.as_ref() { + acc.push((path.concat(leaf.partial.clone()), leaf.value.clone())); + } let hash = *hash; *self = hash.into(); diff --git a/crates/common/trie/node/branch.rs b/crates/common/trie/node/branch.rs index edcf0da174..93cc1964ea 100644 --- a/crates/common/trie/node/branch.rs +++ b/crates/common/trie/node/branch.rs @@ -44,7 +44,9 @@ impl BranchNode { // Delegate to children if present let child_ref = &self.choices[choice]; if child_ref.is_valid() { - let child_node = child_ref.get_node(db)?.ok_or(TrieError::InconsistentTree)?; + let child_node = child_ref + .get_node(db, path.current())? + .ok_or(TrieError::InconsistentTree)?; child_node.get(db, path) } else { Ok(None) @@ -74,7 +76,8 @@ impl BranchNode { // Insert into existing child and then update it (choice_ref, ValueOrHash::Value(value)) => { let child_node = choice_ref - .get_node(db)? + .get_node(db, path.current())? + .or_else(|| panic!("{:?} not found ref {choice_ref:?}", path)) .ok_or(TrieError::InconsistentTree)?; *choice_ref = child_node.insert(db, path, value)?.into(); @@ -89,7 +92,7 @@ impl BranchNode { )); } else { *choice_ref = choice_ref - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)? .insert(db, path, value)? .into(); @@ -98,6 +101,7 @@ impl BranchNode { } } else if let ValueOrHash::Value(value) = value { // Insert into self + self.update(value); } else { todo!("handle override case (error?)") @@ -130,13 +134,14 @@ impl BranchNode { [+1 children] Branch { [childA, childB, ... ], None } -> Branch { [childA, childB, ... ], None } */ + let base_path = path.clone(); // Step 1: Remove value // Check if the value is located in a child subtrie let value = if let Some(choice_index) = path.next_choice() { if self.choices[choice_index].is_valid() { let child_node = self.choices[choice_index] - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; // Remove value from child node let (child_node, old_value) = child_node.remove(db, path.clone())?; @@ -176,7 +181,9 @@ impl BranchNode { // If this node doesn't have a value and has only one child, replace it with its child node (1, false) => { let (choice_index, child_ref) = children[0]; - let child = child_ref.get_node(db)?.ok_or(TrieError::InconsistentTree)?; + let child = child_ref + .get_node(db, base_path.current().append_new(choice_index as u8))? + .ok_or(TrieError::InconsistentTree)?; match child { // Replace self with an extension node leading to the child Node::Branch(_) => ExtensionNode::new( @@ -243,7 +250,9 @@ impl BranchNode { // Continue to child let child_ref = &self.choices[choice]; if child_ref.is_valid() { - let child_node = child_ref.get_node(db)?.ok_or(TrieError::InconsistentTree)?; + let child_node = child_ref + .get_node(db, path.current())? + .ok_or(TrieError::InconsistentTree)?; child_node.get_path(db, path, node_path)?; } } diff --git a/crates/common/trie/node/extension.rs b/crates/common/trie/node/extension.rs index 9878a6f5b4..0024899004 100644 --- a/crates/common/trie/node/extension.rs +++ b/crates/common/trie/node/extension.rs @@ -28,7 +28,7 @@ impl ExtensionNode { if path.skip_prefix(&self.prefix) { let child_node = self .child - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; child_node.get(db, path) @@ -56,12 +56,13 @@ impl ExtensionNode { */ let match_index = path.count_prefix(&self.prefix); if match_index == self.prefix.len() { + let path = path.offset(match_index); // Insert into child node let child_node = self .child - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; - let new_child_node = child_node.insert(db, path.offset(match_index), value)?; + let new_child_node = child_node.insert(db, path, value)?; self.child = new_child_node.into(); Ok(self.into()) } else if match_index == 0 { @@ -72,7 +73,7 @@ impl ExtensionNode { }; let mut choices = BranchNode::EMPTY_CHOICES; let branch_node = if self.prefix.at(0) == 16 { - match new_node.get_node(db)? { + match new_node.get_node(db, path.current())? { Some(Node::Leaf(leaf)) => BranchNode::new_with_value(choices, leaf.value), _ => return Err(TrieError::InconsistentTree), } @@ -107,7 +108,7 @@ impl ExtensionNode { if path.skip_prefix(&self.prefix) { let child_node = self .child - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; // Remove value from child subtrie let (child_node, old_value) = child_node.remove(db, path)?; @@ -174,7 +175,7 @@ impl ExtensionNode { if path.skip_prefix(&self.prefix) { let child_node = self .child - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; child_node.get_path(db, path, node_path)?; } diff --git a/crates/common/trie/trie.rs b/crates/common/trie/trie.rs index cda570fcad..f883951cf0 100644 --- a/crates/common/trie/trie.rs +++ b/crates/common/trie/trie.rs @@ -47,12 +47,13 @@ pub type ValueRLP = Vec; /// RLP-encoded trie node pub type NodeRLP = Vec; /// Represents a node in the Merkle Patricia Trie. -pub type TrieNode = (NodeHash, NodeRLP); +pub type TrieNode = (Nibbles, NodeRLP); /// Libmdx-based Ethereum Compatible Merkle Patricia Trie pub struct Trie { db: Box, pub root: NodeRef, + pending_removal: HashSet, } impl Default for Trie { @@ -67,6 +68,7 @@ impl Trie { Self { db, root: NodeRef::default(), + pending_removal: HashSet::new(), } } @@ -79,6 +81,7 @@ impl Trie { } else { Default::default() }, + pending_removal: HashSet::new(), } } @@ -91,14 +94,19 @@ impl Trie { } /// Retrieve an RLP-encoded value from the trie given its RLP-encoded path. - pub fn get(&self, path: &PathRLP) -> Result, TrieError> { + pub fn get(&self, pathrlp: &PathRLP) -> Result, TrieError> { + let path = Nibbles::from_bytes(pathrlp); + Ok(match self.root { - NodeRef::Node(ref node, _) => node.get(self.db.as_ref(), Nibbles::from_bytes(path))?, - NodeRef::Hash(hash) if hash.is_valid() => { - let rlp = self.db.get(hash)?.ok_or(TrieError::InconsistentTree)?; - let node = Node::decode(&rlp).map_err(TrieError::RLPDecode)?; - node.get(self.db.as_ref(), Nibbles::from_bytes(path))? - } + NodeRef::Node(ref node, _) => node.get(self.db.as_ref(), path)?, + NodeRef::Hash(hash) if hash.is_valid() => Node::decode( + &self + .db + .get(Nibbles::default())? + .ok_or(TrieError::InconsistentTree)?, + ) + .map_err(TrieError::RLPDecode)? + .get(self.db.as_ref(), path)?, _ => None, }) } @@ -106,11 +114,12 @@ impl Trie { /// Insert an RLP-encoded value into the trie. pub fn insert(&mut self, path: PathRLP, value: ValueRLP) -> Result<(), TrieError> { let path = Nibbles::from_bytes(&path); + self.pending_removal.remove(&path); self.root = if self.root.is_valid() { // If the trie is not empty, call the root node's insertion logic. self.root - .get_node(self.db.as_ref())? + .get_node(self.db.as_ref(), Nibbles::default())? .ok_or(TrieError::InconsistentTree)? .insert(self.db.as_ref(), path, value)? .into() @@ -128,11 +137,14 @@ impl Trie { if !self.root.is_valid() { return Ok(None); } + if path.len() == 32 { + self.pending_removal.insert(Nibbles::from_bytes(path)); + } // If the trie is not empty, call the root node's removal logic. let (node, value) = self .root - .get_node(self.db.as_ref())? + .get_node(self.db.as_ref(), Nibbles::default())? .ok_or(TrieError::InconsistentTree)? .remove(self.db.as_ref(), Nibbles::from_bytes(path))?; self.root = node.map(Into::into).unwrap_or_default(); @@ -174,11 +186,8 @@ impl Trie { /// This method will also compute the hash of all internal nodes indirectly. It will not clear /// the cached nodes. pub fn commit(&mut self) -> Result<(), TrieError> { - if self.root.is_valid() { - let mut acc = Vec::new(); - self.root.commit(&mut acc); - self.db.put_batch(acc)?; // we'll try to avoid calling this for every commit - } + let acc = self.commit_without_storing(); + self.db.put_batch(acc)?; Ok(()) } @@ -188,8 +197,12 @@ impl Trie { pub fn commit_without_storing(&mut self) -> Vec { let mut acc = Vec::new(); if self.root.is_valid() { - self.root.commit(&mut acc); + self.root.commit(Nibbles::default(), &mut acc); + } + if self.root.compute_hash() == NodeHash::Hashed(*EMPTY_TRIE_HASH) { + acc.push((Nibbles::default(), vec![RLP_NULL])) } + acc.extend(self.pending_removal.drain().map(|nib| (nib, vec![]))); acc } @@ -210,7 +223,7 @@ impl Trie { node_path.push(data[..len as usize].to_vec()); } - let root = match self.root.get_node(self.db.as_ref())? { + let root = match self.root.get_node(self.db.as_ref(), Nibbles::default())? { Some(x) => x, None => return Ok(Vec::new()), }; @@ -232,7 +245,7 @@ impl Trie { if self.root.is_valid() { let encoded_root = self .root - .get_node(self.db.as_ref())? + .get_node(self.db.as_ref(), Nibbles::default())? .ok_or(TrieError::InconsistentTree)? .encode_raw(); @@ -345,7 +358,7 @@ impl Trie { struct NullTrieDB; impl TrieDB for NullTrieDB { - fn get(&self, _key: NodeHash) -> Result>, TrieError> { + fn get(&self, _key: Nibbles) -> Result>, TrieError> { Ok(None) } @@ -359,7 +372,9 @@ impl Trie { /// Obtain the encoded node given its path. /// Allows usage of full paths (byte slice of 32 bytes) or compact-encoded nibble slices (with length lower than 32) - pub fn get_node(&self, partial_path: &PathRLP) -> Result, TrieError> { + pub fn get_node(&self, _partial_path: &PathRLP) -> Result, TrieError> { + todo!(); + /* // Convert compact-encoded nibbles into a byte slice if necessary let partial_path = match partial_path.len() { // Compact-encoded nibbles @@ -422,13 +437,14 @@ impl Trie { } else { Ok(Vec::new()) } + */ } pub fn root_node(&self) -> Result, TrieError> { if self.hash_no_commit() == *EMPTY_TRIE_HASH { return Ok(None); } - self.root.get_node(self.db.as_ref()) + self.root.get_node(self.db.as_ref(), Nibbles::default()) } /// Creates a new Trie based on a temporary InMemory DB @@ -437,7 +453,7 @@ impl Trie { use std::sync::Arc; use std::sync::Mutex; - let hmap: BTreeMap> = BTreeMap::new(); + let hmap: BTreeMap<[u8; 33], Vec> = BTreeMap::new(); let map = Arc::new(Mutex::new(hmap)); let db = InMemoryTrieDB::new(map); Trie::new(Box::new(db)) @@ -466,7 +482,7 @@ impl ProofTrie { // If the trie is not empty, call the root node's insertion logic. self.0 .root - .get_node(self.0.db.as_ref())? + .get_node(self.0.db.as_ref(), Nibbles::default())? .ok_or(TrieError::InconsistentTree)? .insert(self.0.db.as_ref(), partial_path, external_ref)? .into() diff --git a/crates/common/trie/trie_iter.rs b/crates/common/trie/trie_iter.rs index 45aded9f6f..3635b9ed04 100644 --- a/crates/common/trie/trie_iter.rs +++ b/crates/common/trie/trie_iter.rs @@ -42,7 +42,7 @@ impl TrieIterator { node: NodeRef, new_stack: &mut Vec<(Nibbles, NodeRef)>, ) -> Result<(), TrieError> { - let Some(next_node) = node.get_node(db).ok().flatten() else { + let Some(next_node) = node.get_node(db, prefix_nibbles.clone()).ok().flatten() else { return Ok(()); }; match &next_node { @@ -139,7 +139,10 @@ impl Iterator for TrieIterator { }; // Fetch the last node in the stack let (mut path, next_node_ref) = self.stack.pop()?; - let next_node = next_node_ref.get_node(self.db.as_ref()).ok().flatten()?; + let next_node = next_node_ref + .get_node(self.db.as_ref(), path.clone()) + .ok() + .flatten()?; match &next_node { Node::Branch(branch_node) => { // Add all children to the stack (in reverse order so we process first child frist) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 887381ef81..3becf19190 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -2009,12 +2009,13 @@ pub enum PeerHandlerError { PeerTableError(#[from] PeerTableError), } -#[derive(Debug, Clone, std::hash::Hash)] +#[derive(Debug, Clone)] pub struct RequestMetadata { pub hash: H256, pub path: Nibbles, /// What node is the parent of this node pub parent_path: Nibbles, + pub previous: Option, } #[derive(Debug, thiserror::Error)] diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 98509958e8..c34322e10c 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -24,7 +24,7 @@ use ethrex_common::{ }; use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode, error::RLPDecodeError}; use ethrex_storage::{EngineType, STATE_TRIE_SEGMENTS, Store, error::StoreError}; -use ethrex_trie::{NodeHash, Trie, TrieError}; +use ethrex_trie::{Nibbles, Node, Trie, TrieError}; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use std::collections::{BTreeMap, HashSet}; use std::path::PathBuf; @@ -686,13 +686,14 @@ impl FullBlockSyncState { .await { if let Some(batch_failure) = batch_failure { - warn!("Failed to add block during FullSync: {err}"); + let failed_block_hash = batch_failure.failed_block_hash; + warn!(%err, block=%failed_block_hash, "Failed to add block during FullSync"); // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. if let ChainError::InvalidBlock(_) = err { let mut block_hashes_with_invalid_ancestor: Vec = vec![]; if let Some(index) = block_batch_hashes .iter() - .position(|x| x == &batch_failure.failed_block_hash) + .position(|x| x == &failed_block_hash) { block_hashes_with_invalid_ancestor = block_batch_hashes[index..].to_vec(); } @@ -929,7 +930,7 @@ impl Syncer { let store_clone = store.clone(); let current_state_root = tokio::task::spawn_blocking(move || -> Result { - let mut trie = store_clone.open_state_trie(computed_state_root)?; + let mut trie = store_clone.open_direct_state_trie(computed_state_root)?; for (account_hash, account) in account_states_snapshot { METRICS @@ -940,7 +941,11 @@ impl Syncer { METRICS .current_step .set(crate::metrics::CurrentStepValue::InsertingAccountRanges); - let current_state_root = trie.hash()?; + let (current_state_root, mut changes) = + trie.collect_changes_since_last_hash(); + // TODO: do we need this? + changes.retain(|(path, _)| path.len() < 64); + trie.db().put_batch(changes)?; Ok(current_state_root) }) .await??; @@ -1028,9 +1033,6 @@ impl Syncer { ); *METRICS.storage_tries_download_end_time.lock().await = Some(SystemTime::now()); - let maybe_big_account_storage_state_roots: Arc>> = - Arc::new(Mutex::new(HashMap::new())); - *METRICS.storage_tries_insert_start_time.lock().await = Some(SystemTime::now()); METRICS .current_step @@ -1053,8 +1055,6 @@ impl Syncer { RLPDecode::decode(&snapshot_contents) .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?; - let maybe_big_account_storage_state_roots_clone = - maybe_big_account_storage_state_roots.clone(); let store_clone = store.clone(); let pivot_hash_moved = pivot_header.hash(); info!("Starting compute of account_storages_snapshot"); @@ -1068,7 +1068,6 @@ impl Syncer { .filter(|(_account_hash, storage)| !storage.is_empty()) .map(|(account_hash, key_value_pairs)| { compute_storage_roots( - maybe_big_account_storage_state_roots_clone.clone(), store.clone(), account_hash, key_value_pairs, @@ -1096,15 +1095,16 @@ impl Syncer { let mut healing_done = false; while !healing_done { // This if is an edge case for the skip snap sync scenario - if block_is_stale(&pivot_header) { - pivot_header = update_pivot( - pivot_header.number, - pivot_header.timestamp, - &mut self.peers, - block_sync_state, - ) - .await?; - } + // TODO: UNCOMMENT CONDITION + //if block_is_stale(&pivot_header) { + pivot_header = update_pivot( + pivot_header.number, + pivot_header.timestamp, + &mut self.peers, + block_sync_state, + ) + .await?; + //} healing_done = heal_state_trie_wrap( pivot_header.state_root, store.clone(), @@ -1115,6 +1115,7 @@ impl Syncer { &mut code_hash_collector, ) .await?; + free_peers_and_log_if_not_empty(&mut self.peers).await?; if !healing_done { continue; } @@ -1135,6 +1136,7 @@ impl Syncer { debug_assert!(validate_state_root(store.clone(), pivot_header.state_root).await); debug_assert!(validate_storage_root(store.clone(), pivot_header.state_root).await); + info!("Finished healing"); // Finish code hash collection @@ -1235,25 +1237,20 @@ impl Syncer { } } -type StorageRoots = (H256, Vec<(NodeHash, Vec)>); +type StorageRoots = (H256, Vec<(Nibbles, Vec)>); fn compute_storage_roots( - maybe_big_account_storage_state_roots: Arc>>, store: Store, account_hash: H256, key_value_pairs: Vec<(H256, U256)>, pivot_hash: H256, ) -> Result { - let account_storage_root = match maybe_big_account_storage_state_roots - .lock() - .map_err(|_| SyncError::MaybeBigAccount)? - .entry(account_hash) - { - Entry::Occupied(occupied_entry) => *occupied_entry.get(), - Entry::Vacant(_vacant_entry) => *EMPTY_TRIE_HASH, + let storage_trie = store.open_direct_storage_trie(account_hash, *EMPTY_TRIE_HASH)?; + let trie_hash = match storage_trie.db().get(Nibbles::default())? { + Some(noderlp) => Node::decode(&noderlp)?.compute_hash().finalize(), + None => *EMPTY_TRIE_HASH, }; - - let mut storage_trie = store.open_storage_trie(account_hash, account_storage_root)?; + let mut storage_trie = store.open_direct_storage_trie(account_hash, trie_hash)?; for (hashed_key, value) in key_value_pairs { if let Err(err) = storage_trie.insert(hashed_key.0.to_vec(), value.encode_to_vec()) { @@ -1263,20 +1260,16 @@ fn compute_storage_roots( } } - let (computed_storage_root, changes) = storage_trie.collect_changes_since_last_hash(); + let (computed_storage_root, mut changes) = storage_trie.collect_changes_since_last_hash(); + // TODO: do we need this? + changes.retain(|(path, _)| path.len() < 64); let account_state = store .get_account_state_by_acc_hash(pivot_hash, account_hash)? .ok_or(SyncError::AccountState(pivot_hash, account_hash))?; if computed_storage_root == account_state.storage_root { METRICS.storage_tries_state_roots_computed.inc(); - } else { - maybe_big_account_storage_state_roots - .lock() - .map_err(|_| SyncError::MaybeBigAccount)? - .insert(account_hash, computed_storage_root); } - Ok((account_hash, changes)) } diff --git a/crates/networking/p2p/sync/state_healing.rs b/crates/networking/p2p/sync/state_healing.rs index ff74426376..71525eeb7d 100644 --- a/crates/networking/p2p/sync/state_healing.rs +++ b/crates/networking/p2p/sync/state_healing.rs @@ -10,7 +10,7 @@ use std::{ cmp::min, - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::atomic::Ordering, time::{Duration, Instant}, }; @@ -18,7 +18,7 @@ use std::{ use ethrex_common::{H256, constants::EMPTY_KECCACK_HASH, types::AccountState}; use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode}; use ethrex_storage::Store; -use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, NodeHash, TrieDB, TrieError}; +use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, TrieDB, TrieError}; use tracing::{debug, error, info}; use crate::{ @@ -43,6 +43,7 @@ pub struct MembatchEntryValue { node: Node, children_not_in_storage_count: u64, parent_path: Nibbles, + previous: Option, } pub async fn heal_state_trie_wrap( @@ -98,6 +99,7 @@ async fn heal_state_trie( hash: state_root, path: Nibbles::default(), // We need to be careful, the root parent is a special case parent_path: Nibbles::default(), + previous: None, // None => can't assume anything }]; let mut last_update = Instant::now(); let mut inflight_tasks: u64 = 0; @@ -107,7 +109,8 @@ async fn heal_state_trie( let mut downloads_fail = 0; let mut leafs_healed = 0; let mut empty_try_recv: u64 = 0; - let mut nodes_to_write: Vec = Vec::new(); + let mut heals_per_cycle: u64 = 0; + let mut nodes_to_write: Vec<(Nibbles, Node)> = Vec::new(); let mut db_joinset = tokio::task::JoinSet::new(); // channel to send the tasks to the peers @@ -135,21 +138,13 @@ async fn heal_state_trie( METRICS .healing_empty_try_recv .store(empty_try_recv, Ordering::Relaxed); - if is_stale { - debug!( - "State Healing stopping due to staleness, snap peers available {num_peers}, inflight_tasks: {inflight_tasks}, Maximum depth reached on loop {longest_path_seen}, leafs healed {leafs_healed}, global leafs healed {}, Download success rate {downloads_rate}, Paths to go {}, Membatch size {}", - global_leafs_healed, - paths.len(), - membatch.len() - ); - } else { - debug!( - "State Healing in Progress, snap peers available {num_peers}, inflight_tasks: {inflight_tasks}, Maximum depth reached on loop {longest_path_seen}, leafs healed {leafs_healed}, global leafs healed {}, Download success rate {downloads_rate}, Paths to go {}, Membatch size {}", - global_leafs_healed, - paths.len(), - membatch.len() - ); - } + debug!( + "State Healing {}, snap peers available {num_peers}, inflight_tasks: {inflight_tasks}, Maximum depth reached on loop {longest_path_seen}, leafs healed {leafs_healed}, global leafs healed {}, Download success rate {downloads_rate}, Paths to go {}, Membatch size {}, Processing per cycle {heals_per_cycle}", + if is_stale { "stopping" } else { "in progress" }, + global_leafs_healed, + paths.len(), + membatch.len() + ); downloads_success = 0; downloads_fail = 0; } @@ -260,6 +255,7 @@ async fn heal_state_trie( // If there is at least one "batch" of nodes to heal, heal it if let Some((nodes, batch)) = nodes_to_heal.pop() { + heals_per_cycle += 1; let return_paths = heal_state_batch( batch, nodes, @@ -277,29 +273,33 @@ async fn heal_state_trie( let is_done = paths.is_empty() && nodes_to_heal.is_empty() && inflight_tasks == 0; if nodes_to_write.len() > 100_000 || is_done || is_stale { - let to_write = nodes_to_write; - nodes_to_write = Vec::new(); + let to_write = std::mem::take(&mut nodes_to_write); let store = store.clone(); - if db_joinset.len() > 3 { + if db_joinset.len() > 0 { db_joinset.join_next().await; } db_joinset.spawn_blocking(|| { spawned_rt::tasks::block_on(async move { // TODO: replace put batch with the async version + let mut encoded_to_write = BTreeMap::new(); + for (path, node) in to_write { + for i in 0..path.len() { + encoded_to_write.insert(path.slice(0, i), vec![]); + } + if let Node::Leaf(leaf) = &node { + encoded_to_write + .insert(path.concat(leaf.partial.clone()), leaf.value.clone()); + } + encoded_to_write.insert(path, node.encode_to_vec()); + } let trie_db = store - .open_state_trie(*EMPTY_TRIE_HASH) + .open_direct_state_trie(*EMPTY_TRIE_HASH) .expect("Store should open"); let db = trie_db.db(); - db.put_batch( - to_write - .into_iter() - .filter_map(|node| match node.compute_hash() { - hash @ NodeHash::Hashed(_) => Some((hash, node.encode_to_vec())), - NodeHash::Inline(_) => None, - }) - .collect(), - ) - .expect("The put batch on the store failed"); + // TODO: do we need this? + encoded_to_write.retain(|path, _| path.len() < 32); + db.put_batch(encoded_to_write.into_iter().collect()) + .expect("The put batch on the store failed"); }) }); } @@ -338,9 +338,9 @@ async fn heal_state_batch( nodes: Vec, store: Store, membatch: &mut HashMap, - nodes_to_write: &mut Vec, // TODO: change tuple to struct + nodes_to_write: &mut Vec<(Nibbles, Node)>, // TODO: change tuple to struct ) -> Result, SyncError> { - let trie = store.open_state_trie(*EMPTY_TRIE_HASH)?; + let trie = store.open_direct_state_trie(*EMPTY_TRIE_HASH)?; for node in nodes.into_iter() { let path = batch.remove(0); let (missing_children_count, missing_children) = @@ -348,17 +348,20 @@ async fn heal_state_batch( batch.extend(missing_children); if missing_children_count == 0 { commit_node( + &store, node, &path.path, &path.parent_path, membatch, nodes_to_write, - ); + ) + .await; } else { let entry = MembatchEntryValue { node: node.clone(), children_not_in_storage_count: missing_children_count, parent_path: path.parent_path.clone(), + previous: path.previous, }; membatch.insert(path.path.clone(), entry); } @@ -366,14 +369,15 @@ async fn heal_state_batch( Ok(batch) } -fn commit_node( +async fn commit_node( + store: &Store, node: Node, path: &Nibbles, parent_path: &Nibbles, membatch: &mut HashMap, - nodes_to_write: &mut Vec, + nodes_to_write: &mut Vec<(Nibbles, Node)>, ) { - nodes_to_write.push(node); + nodes_to_write.push((path.clone(), node)); if parent_path == path { return; // Case where we're saving the root @@ -385,13 +389,15 @@ fn commit_node( membatch_entry.children_not_in_storage_count -= 1; if membatch_entry.children_not_in_storage_count == 0 { - commit_node( + Box::pin(commit_node( + store, membatch_entry.node, parent_path, &membatch_entry.parent_path, membatch, nodes_to_write, - ); + )) + .await; } else { membatch.insert(parent_path.clone(), membatch_entry); } @@ -408,27 +414,56 @@ pub fn node_missing_children( match &node { Node::Branch(node) => { for (index, child) in node.choices.iter().enumerate() { - if child.is_valid() && child.get_node(trie_state)?.is_none() { - missing_children_count += 1; - paths.extend(vec![RequestMetadata { - hash: child.compute_hash().finalize(), - path: path.clone().append_new(index as u8), - parent_path: path.clone(), - }]); + let child_path = path.clone().append_new(index as u8); + if !child.is_valid() { + continue; + } + let (validity, previous) = match child + .get_node_checked(trie_state, child_path.clone()) + .inspect_err(|_| { + error!("Malformed data when doing get child of a branch node") + })? { + Some((validity, previous)) => (validity, Some(previous)), + None => (false, None), + }; + if validity { + continue; } - } - } - Node::Extension(node) => { - if node.child.is_valid() && node.child.get_node(trie_state)?.is_none() { - missing_children_count += 1; + missing_children_count += 1; paths.extend(vec![RequestMetadata { - hash: node.child.compute_hash().finalize(), - path: path.concat(node.prefix.clone()), + hash: child.compute_hash().finalize(), + path: child_path, parent_path: path.clone(), + previous, }]); } } + Node::Extension(node) => { + let child_path = path.concat(node.prefix.clone()); + if !node.child.is_valid() { + return Ok((0, vec![])); + } + let (validity, previous) = match node + .child + .get_node_checked(trie_state, child_path.clone()) + .inspect_err(|_| error!("Malformed data when doing get child of a branch node"))? + { + Some((validity, previous)) => (validity, Some(previous)), + None => (false, None), + }; + if validity { + return Ok((0, vec![])); + } + missing_children_count += 1; + + paths.extend(vec![RequestMetadata { + hash: node.child.compute_hash().finalize(), + path: child_path, + parent_path: path.clone(), + previous, + }]); + } _ => {} } Ok((missing_children_count, paths)) diff --git a/crates/networking/p2p/sync/storage_healing.rs b/crates/networking/p2p/sync/storage_healing.rs index bdda8cb0b3..143d1d4c8f 100644 --- a/crates/networking/p2p/sync/storage_healing.rs +++ b/crates/networking/p2p/sync/storage_healing.rs @@ -16,11 +16,11 @@ use bytes::Bytes; use ethrex_common::{H256, types::AccountState}; use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode, error::RLPDecodeError}; use ethrex_storage::{Store, error::StoreError}; -use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, NodeHash}; +use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node}; use rand::random; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::{ - collections::{HashMap, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, sync::atomic::Ordering, time::Instant, }; @@ -157,7 +157,7 @@ pub async fn heal_storage_trie( Result>>, > = JoinSet::new(); - let mut nodes_to_write: HashMap)>> = HashMap::new(); + let mut nodes_to_write: HashMap> = HashMap::new(); let mut db_joinset = tokio::task::JoinSet::new(); // channel to send the tasks to the peers @@ -175,7 +175,7 @@ pub async fn heal_storage_trie( .store(state.empty_count as u64, Ordering::Relaxed); state.last_update = Instant::now(); debug!( - "We are storage healing. Snap Peers {}. Inflight tasks {}. Download Queue {}. Maximum length {}. Leafs Healed {}. Global Leafs Healed {global_leafs_healed}. Roots Healed {}. Good Download Percentage {}. Empty count {}. Disconnected Count {}.", + "We are storage healing. Snap Peers {}. Inflight tasks {}. Download Queue {}. Maximum length {}. Leafs Healed {}. Global Leafs Healed {global_leafs_healed}. Roots Healed {}. Good Downloads {}. Good Download Percentage {}. Empty count {}. Disconnected Count {}.", peers .peer_table .peer_count_by_capabilities(&SUPPORTED_SNAP_CAPABILITIES) @@ -186,6 +186,7 @@ pub async fn heal_storage_trie( state.maximum_length_seen, state.leafs_healed, state.roots_healed, + state.succesful_downloads, state.succesful_downloads as f64 / (state.succesful_downloads as f64 + state.failed_downloads as f64), state.empty_count, @@ -201,15 +202,27 @@ pub async fn heal_storage_trie( let is_stale = current_unix_time() > state.staleness_timestamp; if nodes_to_write.values().map(Vec::len).sum::() > 100_000 || is_done || is_stale { - let to_write = nodes_to_write.drain().collect(); + let to_write: Vec<_> = nodes_to_write.drain().collect(); let store = state.store.clone(); - if db_joinset.len() > 3 { + if db_joinset.len() > 0 { db_joinset.join_next().await; } db_joinset.spawn_blocking(|| { spawned_rt::tasks::block_on(async move { + let mut encoded_to_write = vec![]; + for (hashed_account, nodes) in to_write { + let mut account_nodes = vec![]; + for (path, node) in nodes { + for i in 0..path.len() { + account_nodes.push((path.slice(0, i), vec![])); + } + account_nodes.push((path, node.encode_to_vec())); + } + encoded_to_write.push((hashed_account, account_nodes)); + } + store - .write_storage_trie_nodes_batch(to_write) + .write_storage_trie_nodes_batch(encoded_to_write) .await .expect("db write failed"); }) @@ -277,6 +290,7 @@ pub async fn heal_storage_trie( &mut state.maximum_length_seen, &mut nodes_to_write, ) + .await .expect("We shouldn't be getting store errors"); // TODO: if we have a stor error we should stop } Err(RequestStorageTrieNodes::SendMessageError(id, _err)) => { @@ -456,7 +470,7 @@ async fn zip_requeue_node_responses_score_peer( } #[allow(clippy::too_many_arguments)] -fn process_node_responses( +async fn process_node_responses( node_processing_queue: &mut Vec, download_queue: &mut VecDeque, store: Store, @@ -465,7 +479,7 @@ fn process_node_responses( global_leafs_healed: &mut u64, roots_healed: &mut usize, maximum_length_seen: &mut usize, - to_write: &mut HashMap)>>, + to_write: &mut HashMap>, ) -> Result<(), StoreError> { while let Some(node_response) = node_processing_queue.pop() { trace!("We are processing node response {:?}", node_response); @@ -486,9 +500,11 @@ fn process_node_responses( if missing_children_count == 0 { // We flush to the database this node - commit_node(&node_response, membatch, roots_healed, to_write).inspect_err(|err| { - error!("{err} in commit node while committing {node_response:?}") - })?; + commit_node(&store, &node_response, membatch, roots_healed, to_write) + .await + .inspect_err(|err| { + error!("{err} in commit node while committing {node_response:?}") + })?; } else { let key = ( node_response.node_request.acc_path.clone(), @@ -532,12 +548,24 @@ fn get_initial_downloads( if account.storage_root == *EMPTY_TRIE_HASH { return None; } - if store - .contains_storage_node(*acc_path, account.storage_root) - .expect("We should be able to open the store") + let trie = store + .open_direct_storage_trie(*acc_path, account.storage_root) + .expect("We should be able to open the store"); + + let previous = match trie + .root + .get_node_checked(trie.db(), Nibbles::default()) + .expect("To be able to read the store") { - return None; - } + Some((validity, previous)) => { + if validity { + return None; + } else { + Some(previous) + } + } + None => None, + }; Some(NodeRequest { acc_path: Nibbles::from_bytes(&acc_path.0), storage_path: Nibbles::default(), // We need to be careful, the root parent is a special case @@ -552,12 +580,28 @@ fn get_initial_downloads( .accounts_with_storage_root .par_iter() .filter_map(|(acc_path, storage_root)| { - if store - .contains_storage_node(*acc_path, *storage_root) - .expect("We should be able to open the store") - { + if *storage_root == *EMPTY_TRIE_HASH { return None; } + let trie = store + .open_direct_storage_trie(*acc_path, *storage_root) + .expect("We should be able to open the store"); + + let previous = match trie + .root + .get_node_checked(trie.db(), Nibbles::default()) + .expect("To be able to read the store") + { + Some((validity, previous)) => { + if validity { + return None; + } else { + Some(previous) + } + } + None => None, + }; + Some(NodeRequest { acc_path: Nibbles::from_bytes(&acc_path.0), storage_path: Nibbles::default(), // We need to be careful, the root parent is a special case @@ -580,7 +624,7 @@ pub fn determine_missing_children( let mut count = 0; let node = node_response.node.clone(); let trie = store - .open_storage_trie( + .open_direct_storage_trie( H256::from_slice(&node_response.node_request.acc_path.to_bytes()), *EMPTY_TRIE_HASH, ) @@ -588,70 +632,84 @@ pub fn determine_missing_children( error!("Malformed data when opening the storage trie in determine missing children") })?; let trie_state = trie.db(); + match &node { Node::Branch(node) => { for (index, child) in node.choices.iter().enumerate() { - if child.is_valid() - && child - .get_node(trie_state) - .inspect_err(|_| { - error!("Malformed data when doing get child of a branch node") - })? - .is_none() - { - count += 1; - - paths.extend(vec![NodeRequest { - acc_path: node_response.node_request.acc_path.clone(), - storage_path: node_response - .node_request - .storage_path - .append_new(index as u8), - parent: node_response.node_request.storage_path.clone(), - hash: child.compute_hash().finalize(), - }]); + let child_path = node_response + .node_request + .storage_path + .append_new(index as u8); + if !child.is_valid() { + continue; } - } - } - Node::Extension(node) => { - if node.child.is_valid() - && node - .child - .get_node(trie_state) + let (validity, previous) = match child + .get_node_checked(trie_state, child_path.clone()) .inspect_err(|_| { - error!("Malformed data when doing get child of an extension node") - })? - .is_none() - { + error!("Malformed data when doing get child of a branch node") + })? { + Some((validity, previous)) => (validity, Some(previous)), + None => (false, None), + }; + if validity { + continue; + } count += 1; paths.extend(vec![NodeRequest { acc_path: node_response.node_request.acc_path.clone(), - storage_path: node_response - .node_request - .storage_path - .concat(node.prefix.clone()), + storage_path: child_path, parent: node_response.node_request.storage_path.clone(), - hash: node.child.compute_hash().finalize(), + hash: child.compute_hash().finalize(), }]); } } + Node::Extension(node) => { + let child_path = node_response + .node_request + .storage_path + .concat(node.prefix.clone()); + if !node.child.is_valid() { + return Ok((vec![], 0)); + } + let (validity, previous) = match node + .child + .get_node_checked(trie_state, child_path.clone()) + .inspect_err(|_| error!("Malformed data when doing get child of a branch node"))? + { + Some((validity, previous)) => (validity, Some(previous)), + None => (false, None), + }; + if validity { + return Ok((vec![], 0)); + } + count += 1; + + paths.extend(vec![NodeRequest { + acc_path: node_response.node_request.acc_path.clone(), + storage_path: child_path, + parent: node_response.node_request.storage_path.clone(), + hash: node.child.compute_hash().finalize(), + }]); + } _ => {} } Ok((paths, count)) } -fn commit_node( +async fn commit_node( + store: &Store, node: &NodeResponse, membatch: &mut Membatch, roots_healed: &mut usize, - to_write: &mut HashMap)>>, + to_write: &mut HashMap>, ) -> Result<(), StoreError> { let hashed_account = H256::from_slice(&node.node_request.acc_path.to_bytes()); + to_write .entry(hashed_account) .or_default() - .push((node.node.compute_hash(), node.node.encode_to_vec())); + .push((node.node_request.storage_path.clone(), node.node.clone())); // Special case, we have just commited the root, we stop if node.node_request.storage_path == node.node_request.parent { @@ -674,12 +732,14 @@ fn commit_node( parent_entry.missing_children_count -= 1; if parent_entry.missing_children_count == 0 { - commit_node( + Box::pin(commit_node( + store, &parent_entry.node_response, membatch, roots_healed, to_write, - )?; + )) + .await?; } else { membatch.insert(parent_key, parent_entry); } diff --git a/crates/networking/rpc/eth/account.rs b/crates/networking/rpc/eth/account.rs index 6cbf145319..54c4ef1c98 100644 --- a/crates/networking/rpc/eth/account.rs +++ b/crates/networking/rpc/eth/account.rs @@ -218,44 +218,34 @@ impl RpcHandler for GetProofRequest { let Some(block_number) = self.block.resolve_block_number(storage).await? else { return Ok(Value::Null); }; + let Some(header) = storage.get_block_header(block_number)? else { + return Ok(Value::Null); + }; // Create account proof let Some(account_proof) = storage - .get_account_proof(block_number, &self.address) + .get_account_proof(header.state_root, self.address, &self.storage_keys) .await? else { return Err(RpcErr::Internal("Could not get account proof".to_owned())); }; - let account = storage - .get_account_state(block_number, self.address) - .await?; - // Create storage proofs for all provided storage keys - let mut storage_proofs = Vec::new(); - for storage_key in self.storage_keys.iter() { - let value = storage - .get_storage_at(block_number, self.address, *storage_key) - .await? - .unwrap_or_default(); - let proof = if let Some(account) = &account { - storage.get_storage_proof(self.address, account.storage_root, storage_key)? - } else { - Vec::new() - }; - let storage_proof = StorageProof { - key: storage_key.into_uint(), - proof, - value, - }; - storage_proofs.push(storage_proof); - } - let account = account.unwrap_or_default(); + let storage_proof = account_proof + .storage_proof + .into_iter() + .map(|sp| StorageProof { + key: sp.key.into_uint(), + value: sp.value, + proof: sp.proof, + }) + .collect(); + let account = account_proof.account; let account_proof = AccountProof { - account_proof, + account_proof: account_proof.proof, address: self.address, balance: account.balance, code_hash: account.code_hash, nonce: account.nonce, storage_hash: account.storage_root, - storage_proof: storage_proofs, + storage_proof, }; serde_json::to_value(account_proof).map_err(|error| RpcErr::Internal(error.to_string())) } diff --git a/crates/storage/api.rs b/crates/storage/api.rs index 051289c73b..22cefb8f3e 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -7,7 +7,7 @@ use std::{fmt::Debug, panic::RefUnwindSafe}; use crate::UpdateBatch; use crate::{error::StoreError, store::STATE_TRIE_SEGMENTS}; -use ethrex_trie::{Nibbles, NodeHash, Trie}; +use ethrex_trie::{Nibbles, Trie}; // We need async_trait because the stabilized feature lacks support for object safety // (i.e. dyn StoreEngine) @@ -220,6 +220,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { &self, hashed_address: H256, storage_root: H256, + state_root: H256, ) -> Result; /// Obtain a state trie from the given state root @@ -227,6 +228,20 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Used for internal store operations fn open_state_trie(&self, state_root: H256) -> Result; + /// Obtain a storage trie from the given address and storage_root + /// Doesn't check if the account is stored + /// Used for internal store operations + fn open_direct_storage_trie( + &self, + hashed_address: H256, + storage_root: H256, + ) -> Result; + + /// Obtain a state trie from the given state root + /// Doesn't check if the state root is valid + /// Used for internal store operations + fn open_direct_state_trie(&self, state_root: H256) -> Result; + /// Obtain a state trie locked for reads from the given state root /// Doesn't check if the state root is valid /// Used for internal store operations @@ -241,8 +256,9 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { &self, hashed_address: H256, storage_root: H256, + state_root: H256, ) -> Result { - self.open_storage_trie(hashed_address, storage_root) + self.open_storage_trie(hashed_address, storage_root, state_root) } async fn forkchoice_update( @@ -339,7 +355,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { async fn write_storage_trie_nodes_batch( &self, - storage_trie_nodes: Vec<(H256, Vec<(NodeHash, Vec)>)>, + storage_trie_nodes: Vec<(H256, Vec<(Nibbles, Vec)>)>, ) -> Result<(), StoreError>; async fn write_account_code_batch( diff --git a/crates/storage/error.rs b/crates/storage/error.rs index 97597b9fb4..5a87cb2114 100644 --- a/crates/storage/error.rs +++ b/crates/storage/error.rs @@ -36,4 +36,6 @@ pub enum StoreError { IncompatibleChainConfig, #[error("Failed to convert index: {0}")] TryInto(#[from] std::num::TryFromIntError), + #[error("Update batch contains no blocks")] + UpdateBatchNoBlocks, } diff --git a/crates/storage/lib.rs b/crates/storage/lib.rs index 82bfcba267..ac843e4760 100644 --- a/crates/storage/lib.rs +++ b/crates/storage/lib.rs @@ -12,3 +12,4 @@ pub use store::{ AccountUpdatesList, EngineType, MAX_SNAPSHOT_READS, STATE_TRIE_SEGMENTS, Store, UpdateBatch, hash_address, hash_key, }; +pub use trie_db::layering::apply_prefix; diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 8fa690c4b0..b98260ab4c 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -1,8 +1,8 @@ -use crate::api::StoreEngine; use crate::error::StoreError; use crate::store_db::in_memory::Store as InMemoryStore; #[cfg(feature = "rocksdb")] use crate::store_db::rocksdb::Store as RocksDBStore; +use crate::{api::StoreEngine, trie_db::layering::apply_prefix}; use bytes::Bytes; use ethereum_types::{Address, H256, U256}; @@ -16,7 +16,7 @@ use ethrex_common::{ }; use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; -use ethrex_trie::{Nibbles, NodeHash, Trie, TrieLogger, TrieNode, TrieWitness}; +use ethrex_trie::{Nibbles, NodeRLP, Trie, TrieLogger, TrieNode, TrieWitness}; use sha3::{Digest as _, Keccak256}; use std::sync::Arc; use std::{ @@ -38,7 +38,7 @@ pub struct Store { pub latest_block_header: Arc>, } -pub type StorageTrieNodes = Vec<(H256, Vec<(NodeHash, Vec)>)>; +pub type StorageTrieNodes = Vec<(H256, Vec<(Nibbles, Vec)>)>; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EngineType { @@ -60,11 +60,11 @@ pub struct UpdateBatch { pub code_updates: Vec<(H256, Bytes)>, } -type StorageUpdates = Vec<(H256, Vec<(NodeHash, Vec)>)>; +type StorageUpdates = Vec<(H256, Vec<(Nibbles, Vec)>)>; pub struct AccountUpdatesList { pub state_trie_hash: H256, - pub state_updates: Vec<(NodeHash, Vec)>, + pub state_updates: Vec<(Nibbles, Vec)>, pub storage_updates: StorageUpdates, pub code_updates: Vec<(H256, Bytes)>, } @@ -383,6 +383,7 @@ impl Store { ) -> Result { let mut ret_storage_updates = Vec::new(); let mut code_updates = Vec::new(); + let state_root = state_trie.hash_no_commit(); for update in account_updates { let hashed_address = hash_address(&update.address); if update.removed { @@ -410,6 +411,7 @@ impl Store { let mut storage_trie = self.engine.open_storage_trie( H256::from_slice(&hashed_address), account_state.storage_root, + state_root, )?; for (storage_key, storage_value) in &update.added_storage { let hashed_key = hash_key(storage_key); @@ -444,6 +446,7 @@ impl Store { account_updates: &[AccountUpdate], mut storage_tries: HashMap, ) -> Result<(Trie, HashMap), StoreError> { + let state_root = state_trie.hash_no_commit(); for update in account_updates.iter() { let hashed_address = hash_address(&update.address); if update.removed { @@ -473,6 +476,7 @@ impl Store { let trie = self.engine.open_storage_trie( H256::from_slice(&hashed_address), account_state.storage_root, + state_root, )?; vacant.insert(TrieLogger::open_trie(trie)) } @@ -500,6 +504,7 @@ impl Store { &self, genesis_accounts: BTreeMap, ) -> Result { + let mut nodes = HashMap::new(); let mut genesis_state_trie = self.engine.open_state_trie(*EMPTY_TRIE_HASH)?; for (address, account) in genesis_accounts { let hashed_address = hash_address(&address); @@ -507,16 +512,19 @@ impl Store { let code_hash = code_hash(&account.code); self.add_account_code(code_hash, account.code).await?; // Store the account's storage in a clean storage trie and compute its root - let mut storage_trie = self - .engine - .open_storage_trie(H256::from_slice(&hashed_address), *EMPTY_TRIE_HASH)?; + let mut storage_trie = self.engine.open_storage_trie( + H256::from_slice(&hashed_address), + *EMPTY_TRIE_HASH, + *EMPTY_TRIE_HASH, + )?; for (storage_key, storage_value) in account.storage { if !storage_value.is_zero() { let hashed_key = hash_key(&H256(storage_key.to_big_endian())); storage_trie.insert(hashed_key, storage_value.encode_to_vec())?; } } - let storage_root = storage_trie.hash()?; + let (storage_root, new_nodes) = storage_trie.collect_changes_since_last_hash(); + nodes.insert(H256::from_slice(&hashed_address), new_nodes); // Add account to trie let account_state = AccountState { nonce: account.nonce, @@ -526,7 +534,20 @@ impl Store { }; genesis_state_trie.insert(hashed_address, account_state.encode_to_vec())?; } - genesis_state_trie.hash().map_err(StoreError::Trie) + let (state_root, state_nodes) = genesis_state_trie.collect_changes_since_last_hash(); + + genesis_state_trie.db().put_batch( + nodes + .into_iter() + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(state_nodes) + .collect(), + )?; + Ok(state_root) } pub async fn add_receipt( @@ -823,6 +844,9 @@ impl Store { block_hash: BlockHash, address: Address, ) -> Result, StoreError> { + let Some(header) = self.get_block_header_by_hash(block_hash)? else { + return Ok(None); + }; // Fetch Account from state_trie let Some(state_trie) = self.state_trie(block_hash)? else { return Ok(None); @@ -837,6 +861,7 @@ impl Store { Ok(Some(self.engine.open_storage_trie( H256::from_slice(&hashed_address), storage_root, + header.state_root, )?)) } @@ -851,57 +876,71 @@ impl Store { let Some(state_trie) = self.state_trie(block_hash)? else { return Ok(None); }; - self.get_account_state_from_trie(&state_trie, address) - } - - pub fn get_account_state_by_hash( - &self, - block_hash: BlockHash, - address: Address, - ) -> Result, StoreError> { - let Some(state_trie) = self.state_trie(block_hash)? else { - return Ok(None); - }; - self.get_account_state_from_trie(&state_trie, address) - } - - pub fn get_account_state_from_trie( - &self, - state_trie: &Trie, - address: Address, - ) -> Result, StoreError> { - let hashed_address = hash_address(&address); - let Some(encoded_state) = state_trie.get(&hashed_address)? else { - return Ok(None); - }; - Ok(Some(AccountState::decode(&encoded_state)?)) + get_account_state_from_trie(&state_trie, address) } + /// Constructs a merkle proof for the given account address against a given state. + /// If storage_keys are provided, also constructs the storage proofs for those keys. + /// + /// Returns `None` if the state trie is missing, otherwise returns the proof. pub async fn get_account_proof( &self, - block_number: BlockNumber, - address: &Address, - ) -> Result>>, StoreError> { - let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else { - return Ok(None); - }; - let Some(state_trie) = self.state_trie(block_hash)? else { - return Ok(None); - }; - Ok(Some(state_trie.get_proof(&hash_address(address))).transpose()?) - } - - /// Constructs a merkle proof for the given storage_key in a storage_trie with a known root - pub fn get_storage_proof( - &self, + state_root: H256, address: Address, - storage_root: H256, - storage_key: &H256, - ) -> Result>, StoreError> { - let trie = self - .engine - .open_storage_trie(hash_address_fixed(&address), storage_root)?; - Ok(trie.get_proof(&hash_key(storage_key))?) + storage_keys: &[H256], + ) -> Result, StoreError> { + // TODO: check state root + // let Some(state_trie) = self.open_state_trie(state_trie)? else { + // return Ok(None); + // }; + let state_trie = self.open_state_trie(state_root)?; + let hashed_address = hash_address_fixed(&address); + let address_path = hashed_address.0.to_vec(); + let proof = state_trie.get_proof(&address_path)?; + let account_opt = state_trie + .get(&address_path)? + .map(|encoded_state| AccountState::decode(&encoded_state)) + .transpose()?; + + let mut storage_proof = Vec::with_capacity(storage_keys.len()); + + if let Some(account) = &account_opt { + let storage_trie = self.engine.open_storage_trie( + hashed_address, + account.storage_root, + state_trie.hash_no_commit(), + )?; + + for key in storage_keys { + let hashed_key = hash_key(key); + let proof = storage_trie.get_proof(&hashed_key)?; + let value = storage_trie + .get(&hashed_key)? + .map(|rlp| U256::decode(&rlp).map_err(StoreError::RLPDecode)) + .transpose()? + .unwrap_or_default(); + + let slot_proof = StorageSlotProof { + proof, + key: *key, + value, + }; + storage_proof.push(slot_proof); + } + } else { + storage_proof.extend(storage_keys.iter().map(|key| StorageSlotProof { + proof: Vec::new(), + key: *key, + value: U256::zero(), + })); + } + let account = account_opt.unwrap_or_default(); + let account_proof = AccountProof { + proof, + account, + storage_proof, + }; + Ok(Some(account_proof)) } // Returns an iterator across all accounts in the state trie given by the state_root @@ -942,7 +981,7 @@ impl Store { let storage_root = AccountState::decode(&account_rlp)?.storage_root; let mut iter = self .engine - .open_locked_storage_trie(hashed_address, storage_root)? + .open_locked_storage_trie(hashed_address, storage_root, state_root)? .into_iter(); iter.advance(starting_slot.0.to_vec())?; Ok(Some(iter.content().map_while(|(path, value)| { @@ -986,9 +1025,9 @@ impl Store { return Ok(None); }; let storage_root = AccountState::decode(&account_rlp)?.storage_root; - let storage_trie = self - .engine - .open_storage_trie(hashed_address, storage_root)?; + let storage_trie = + self.engine + .open_storage_trie(hashed_address, storage_root, state_root)?; let mut proof = storage_trie.get_proof(&starting_hash.as_bytes().to_vec())?; if let Some(last_hash) = last_hash { proof.extend_from_slice(&storage_trie.get_proof(&last_hash.as_bytes().to_vec())?); @@ -1030,9 +1069,11 @@ impl Store { let Ok(hashed_address) = account_path.clone().try_into().map(H256) else { return Ok(vec![]); }; - let storage_trie = self - .engine - .open_storage_trie(hashed_address, account_state.storage_root)?; + let storage_trie = self.engine.open_storage_trie( + hashed_address, + account_state.storage_root, + state_root, + )?; // Fetch storage trie nodes let mut nodes = vec![]; let mut bytes_used = 0; @@ -1073,14 +1114,24 @@ impl Store { self.engine.open_locked_state_trie(state_root) } + pub fn open_direct_storage_trie(&self, addr: H256, root: H256) -> Result { + self.engine.open_direct_storage_trie(addr, root) + } + + pub fn open_direct_state_trie(&self, root: H256) -> Result { + self.engine.open_direct_state_trie(root) + } + /// Obtain a storage trie from the given address and storage_root. /// Doesn't check if the account is stored pub fn open_storage_trie( &self, account_hash: H256, storage_root: H256, + state_root: H256, ) -> Result { - self.engine.open_storage_trie(account_hash, storage_root) + self.engine + .open_storage_trie(account_hash, storage_root, state_root) } /// Obtain a read-locked storage trie from the given address and storage_root. @@ -1089,33 +1140,20 @@ impl Store { &self, account_hash: H256, storage_root: H256, + state_root: H256, ) -> Result { self.engine - .open_locked_storage_trie(account_hash, storage_root) + .open_locked_storage_trie(account_hash, storage_root, state_root) } - /// Returns true if the given node is part of the state trie's internal storage - pub fn contains_state_node(&self, node_hash: H256) -> Result { - // Root is irrelevant, we only care about the internal state - Ok(self - .open_state_trie(*EMPTY_TRIE_HASH)? - .db() - .get(node_hash.into())? - .is_some()) - } - - /// Returns true if the given node is part of the given storage trie's internal storage - pub fn contains_storage_node( - &self, - hashed_address: H256, - node_hash: H256, - ) -> Result { - // Root is irrelevant, we only care about the internal state - Ok(self - .open_storage_trie(hashed_address, *EMPTY_TRIE_HASH)? - .db() - .get(node_hash.into())? - .is_some()) + pub fn has_state_root(&self, state_root: H256) -> Result { + let trie = self.engine.open_state_trie(state_root)?; + // NOTE: here we hash the root because the trie doesn't check the state root is correct + let Some(root) = trie.db().get(Nibbles::default())? else { + return Ok(false); + }; + let root_hash = ethrex_trie::Node::decode(&root)?.compute_hash().finalize(); + Ok(state_root == root_hash) } /// Sets the hash of the last header downloaded during a snap sync @@ -1269,6 +1307,29 @@ impl Store { } } +pub struct AccountProof { + pub proof: Vec, + pub account: AccountState, + pub storage_proof: Vec, +} + +pub struct StorageSlotProof { + pub proof: Vec, + pub key: H256, + pub value: U256, +} + +fn get_account_state_from_trie( + state_trie: &Trie, + address: Address, +) -> Result, StoreError> { + let hashed_address = hash_address(&address); + let Some(encoded_state) = state_trie.get(&hashed_address)? else { + return Ok(None); + }; + Ok(Some(AccountState::decode(&encoded_state)?)) +} + pub struct AncestorIterator { store: Store, next_hash: BlockHash, @@ -1408,7 +1469,9 @@ mod tests { }) .collect(); slots.sort_by_key(|a| a.0); - let mut trie = store.open_storage_trie(address, *EMPTY_TRIE_HASH).unwrap(); + let mut trie = store + .open_storage_trie(address, *EMPTY_TRIE_HASH, *EMPTY_TRIE_HASH) + .unwrap(); for (slot, value) in &slots { trie.insert(slot.0.to_vec(), value.encode_to_vec()).unwrap(); } diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index d64a4479d9..f3c3109c4f 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -4,13 +4,13 @@ use ethereum_types::H256; use ethrex_common::types::{ Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index, Receipt, }; -use ethrex_trie::{InMemoryTrieDB, Nibbles, NodeHash, Trie}; +use ethrex_trie::{InMemoryTrieDB, Nibbles, Trie, db::nibbles_to_fixed_size}; use std::{ collections::{BTreeMap, HashMap}, fmt::Debug, sync::{Arc, Mutex, MutexGuard}, }; -pub type NodeMap = Arc>>>; +pub type NodeMap = Arc>>>; #[derive(Default, Clone)] pub struct Store(Arc>); @@ -29,7 +29,7 @@ pub struct StoreInner { receipts: HashMap>, pub state_trie_nodes: NodeMap, // A storage trie for each hashed account address - pub storage_trie_nodes: HashMap, + pub storage_trie_nodes: HashMap, pending_blocks: HashMap, // Stores invalid blocks and their latest valid ancestor invalid_ancestors: HashMap, @@ -73,6 +73,14 @@ impl Store { #[async_trait::async_trait] impl StoreEngine for Store { + fn open_direct_storage_trie(&self, _: H256, _: H256) -> Result { + todo!() + } + + fn open_direct_state_trie(&self, _: H256) -> Result { + todo!() + } + async fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { let mut store = self.inner()?; { @@ -82,7 +90,7 @@ impl StoreEngine for Store { .lock() .map_err(|_| StoreError::LockError)?; for (node_hash, node_data) in update_batch.account_updates { - state_trie_store.insert(node_hash, node_data); + state_trie_store.insert(nibbles_to_fixed_size(node_hash), node_data); } } @@ -94,12 +102,12 @@ impl StoreEngine for Store { for (hashed_address, nodes) in update_batch.storage_updates { let mut addr_store = store .storage_trie_nodes - .entry(hashed_address) + .entry(Nibbles::from_bytes(&hashed_address.0)) .or_default() .lock() .map_err(|_| StoreError::LockError)?; for (node_hash, node_data) in nodes { - addr_store.insert(node_hash, node_data); + addr_store.insert(nibbles_to_fixed_size(node_hash), node_data); } } @@ -386,9 +394,13 @@ impl StoreEngine for Store { &self, hashed_address: H256, storage_root: H256, + state_root: H256, ) -> Result { let mut store = self.inner()?; - let trie_backend = store.storage_trie_nodes.entry(hashed_address).or_default(); + let trie_backend = store + .storage_trie_nodes + .entry(Nibbles::from_bytes(&hashed_address.0)) + .or_default(); let db = Box::new(InMemoryTrieDB::new(trie_backend.clone())); Ok(Trie::open(db, storage_root)) } @@ -586,19 +598,19 @@ impl StoreEngine for Store { async fn write_storage_trie_nodes_batch( &self, - storage_trie_nodes: Vec<(H256, Vec<(NodeHash, Vec)>)>, + storage_trie_nodes: Vec<(H256, Vec<(Nibbles, Vec)>)>, ) -> Result<(), StoreError> { let mut store = self.inner()?; for (hashed_address, nodes) in storage_trie_nodes { let mut addr_store = store .storage_trie_nodes - .entry(hashed_address) + .entry(Nibbles::from_bytes(&hashed_address.0)) .or_default() .lock() .map_err(|_| StoreError::LockError)?; for (node_hash, node_data) in nodes { - addr_store.insert(node_hash, node_data); + addr_store.insert(nibbles_to_fixed_size(node_hash), node_data); } } diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index 1c08d4ed03..c4efcc899f 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -1,4 +1,10 @@ -use crate::{rlp::AccountCodeHashRLP, trie_db::rocksdb_locked::RocksDBLockedTrieDB}; +use crate::{ + rlp::AccountCodeHashRLP, + trie_db::{ + layering::{TrieLayerCache, TrieWrapper, apply_prefix}, + rocksdb_locked::RocksDBLockedTrieDB, + }, +}; use bytes::Bytes; use ethrex_common::{ H256, @@ -7,12 +13,16 @@ use ethrex_common::{ Transaction, }, }; -use ethrex_trie::{Nibbles, NodeHash, Trie}; +use ethrex_trie::{Nibbles, Trie}; use rocksdb::{ BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor, MultiThreaded, - OptimisticTransactionDB, Options, WriteBatchWithTransaction, + OptimisticTransactionDB, Options, WriteBatch, WriteBatchWithTransaction, +}; +use std::{ + collections::HashSet, + path::Path, + sync::{Arc, RwLock}, }; -use std::{collections::HashSet, path::Path, sync::Arc}; use tracing::info; use crate::{ @@ -76,20 +86,10 @@ const CF_CHAIN_DATA: &str = "chain_data"; /// - [`Vec`] = `BlockHashRLP::from(block_hash).bytes().clone()` const CF_SNAP_STATE: &str = "snap_state"; -/// State trie nodes column family: [`NodeHash`] => [`Vec`] -/// - [`NodeHash`] = `node_hash.as_ref()` -/// - [`Vec`] = `node_data` -const CF_STATE_TRIE_NODES: &str = "state_trie_nodes"; - -/// Storage tries nodes column family: [`Vec`] => [`Vec`] -/// - [`Vec`] = Composite key -/// ```rust,no_run -/// // let mut key = Vec::with_capacity(64); -/// // key.extend_from_slice(address_hash.as_bytes()); -/// // key.extend_from_slice(node_hash.as_ref()); -/// ``` +/// State trie nodes column family: [`Nibbles`] => [`Vec`] +/// - [`Nibbles`] = `node_hash.as_ref()` /// - [`Vec`] = `node_data` -const CF_STORAGE_TRIES_NODES: &str = "storage_tries_nodes"; +const CF_TRIE_NODES: &str = "trie_nodes"; /// Pending blocks column family: [`Vec`] => [`Vec`] /// - [`Vec`] = `BlockHashRLP::from(block.hash()).bytes().clone()` @@ -104,6 +104,7 @@ const CF_INVALID_ANCESTORS: &str = "invalid_ancestors"; #[derive(Debug)] pub struct Store { db: Arc>, + trie_cache: Arc>, } impl Store { @@ -159,8 +160,7 @@ impl Store { CF_TRANSACTION_LOCATIONS, CF_CHAIN_DATA, CF_SNAP_STATE, - CF_STATE_TRIE_NODES, - CF_STORAGE_TRIES_NODES, + CF_TRIE_NODES, CF_PENDING_BLOCKS, CF_INVALID_ANCESTORS, ]; @@ -229,7 +229,7 @@ impl Store { block_opts.set_cache_index_and_filter_blocks(true); cf_opts.set_block_based_table_factory(&block_opts); } - CF_STATE_TRIE_NODES | CF_STORAGE_TRIES_NODES => { + CF_TRIE_NODES => { cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); cf_opts.set_write_buffer_size(512 * 1024 * 1024); // 512MB cf_opts.set_max_write_buffer_number(6); @@ -299,7 +299,10 @@ impl Store { } } - Ok(Self { db: Arc::new(db) }) + Ok(Self { + db: Arc::new(db), + trie_cache: Default::default(), + }) } // Helper method to get column family handle @@ -365,7 +368,6 @@ impl Store { batch_ops: Vec<(String, Vec, Vec)>, ) -> Result<(), StoreError> { let db = self.db.clone(); - tokio::task::spawn_blocking(move || { let mut batch = WriteBatchWithTransaction::default(); @@ -438,11 +440,30 @@ impl Store { impl StoreEngine for Store { async fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { let db = self.db.clone(); + let trie_cache = self.trie_cache.clone(); + let parent_state_root = self + .get_block_header_by_hash( + update_batch + .blocks + .first() + .ok_or(StoreError::UpdateBatchNoBlocks)? + .header + .parent_hash, + )? + .map(|header| header.state_root) + .unwrap_or_default(); + let last_state_root = update_batch + .blocks + .last() + .ok_or(StoreError::UpdateBatchNoBlocks)? + .header + .state_root; tokio::task::spawn_blocking(move || { + let _span = tracing::trace_span!("Block DB update").entered(); + let [ - cf_state, - cf_storage, + cf_trie_nodes, cf_receipts, cf_codes, cf_block_numbers, @@ -452,8 +473,7 @@ impl StoreEngine for Store { ] = open_cfs( &db, [ - CF_STATE_TRIE_NODES, - CF_STORAGE_TRIES_NODES, + CF_TRIE_NODES, CF_RECEIPTS, CF_ACCOUNT_CODES, CF_BLOCK_NUMBERS, @@ -466,19 +486,31 @@ impl StoreEngine for Store { let _span = tracing::trace_span!("Block DB update").entered(); let mut batch = WriteBatchWithTransaction::default(); - for (node_hash, node_data) in update_batch.account_updates { - batch.put_cf(&cf_state, node_hash.as_ref(), node_data); - } - - for (address_hash, storage_updates) in update_batch.storage_updates { - for (node_hash, node_data) in storage_updates { - // Key: address_hash + node_hash - let mut key = Vec::with_capacity(64); - key.extend_from_slice(address_hash.as_bytes()); - key.extend_from_slice(node_hash.as_ref()); - batch.put_cf(&cf_storage, key, node_data); + let mut trie = trie_cache.write().map_err(|_| StoreError::LockError)?; + if let Some(root) = trie.get_commitable(parent_state_root) { + let nodes = trie.commit(root).unwrap_or_default(); + for (key, value) in nodes { + if value.is_empty() { + batch.delete_cf(&cf_trie_nodes, key); + } else { + batch.put_cf(&cf_trie_nodes, key, value); + } } } + trie.put_batch( + parent_state_root, + last_state_root, + update_batch + .storage_updates + .into_iter() + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(update_batch.account_updates) + .collect(), + ); for block in update_batch.blocks { let block_number = block.header.number; @@ -1068,40 +1100,80 @@ impl StoreEngine for Store { &self, hashed_address: H256, storage_root: H256, + state_root: H256, + ) -> Result { + let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: self.trie_cache.clone(), + db, + prefix: Some(hashed_address), + }); + Ok(Trie::open(wrap_db, storage_root)) + } + + fn open_state_trie(&self, state_root: H256) -> Result { + let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: self.trie_cache.clone(), + db, + prefix: None, + }); + Ok(Trie::open(wrap_db, state_root)) + } + + fn open_direct_storage_trie( + &self, + hashed_address: H256, + storage_root: H256, ) -> Result { let db = Box::new(RocksDBTrieDB::new( self.db.clone(), - CF_STORAGE_TRIES_NODES, + CF_TRIE_NODES, Some(hashed_address), )?); Ok(Trie::open(db, storage_root)) } - fn open_state_trie(&self, state_root: H256) -> Result { - let db = Box::new(RocksDBTrieDB::new( - self.db.clone(), - CF_STATE_TRIE_NODES, - None, - )?); + fn open_direct_state_trie(&self, state_root: H256) -> Result { + let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?); Ok(Trie::open(db, state_root)) } fn open_locked_state_trie(&self, state_root: H256) -> Result { - let db = RocksDBLockedTrieDB::new(self.db.clone(), CF_STATE_TRIE_NODES, None)?; - Ok(Trie::open(Box::new(db), state_root)) + let db = Box::new(RocksDBLockedTrieDB::new( + self.db.clone(), + CF_TRIE_NODES, + None, + )?); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: self.trie_cache.clone(), + db, + prefix: None, + }); + Ok(Trie::open(wrap_db, state_root)) } fn open_locked_storage_trie( &self, hashed_address: H256, storage_root: H256, + state_root: H256, ) -> Result { - let db = RocksDBLockedTrieDB::new( + let db = Box::new(RocksDBLockedTrieDB::new( self.db.clone(), - CF_STORAGE_TRIES_NODES, - Some(hashed_address), - )?; - Ok(Trie::open(Box::new(db), storage_root)) + CF_TRIE_NODES, + None, + )?); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: self.trie_cache.clone(), + db, + prefix: Some(hashed_address), + }); + Ok(Trie::open(wrap_db, storage_root)) } async fn forkchoice_update( @@ -1358,21 +1430,31 @@ impl StoreEngine for Store { async fn write_storage_trie_nodes_batch( &self, - storage_trie_nodes: Vec<(H256, Vec<(NodeHash, Vec)>)>, + storage_trie_nodes: Vec<(H256, Vec<(Nibbles, Vec)>)>, ) -> Result<(), StoreError> { - let mut batch_ops = Vec::new(); + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let mut batch = WriteBatchWithTransaction::default(); + let cf = db.cf_handle(&CF_TRIE_NODES).ok_or_else(|| { + StoreError::Custom(format!("Column family not found: CF_TRIE_NODES")) + })?; - for (address_hash, nodes) in storage_trie_nodes { - for (node_hash, node_data) in nodes { - // Create composite key: address_hash + node_hash - let mut key = Vec::with_capacity(64); - key.extend_from_slice(address_hash.as_bytes()); - key.extend_from_slice(node_hash.as_ref()); - batch_ops.push((CF_STORAGE_TRIES_NODES.to_string(), key, node_data)); + for (address_hash, nodes) in storage_trie_nodes { + for (node_hash, node_data) in nodes { + let key = apply_prefix(Some(address_hash), node_hash); + if node_data.is_empty() { + batch.delete_cf(&cf, key.as_ref()); + } else { + batch.put_cf(&cf, key.as_ref(), node_data); + } + } } - } - self.write_batch_async(batch_ops).await + db.write(batch) + .map_err(|e| StoreError::Custom(format!("RocksDB batch write error: {}", e))) + }) + .await + .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))? } async fn write_account_code_batch( diff --git a/crates/storage/trie_db/layering.rs b/crates/storage/trie_db/layering.rs new file mode 100644 index 0000000000..a97a8a17d5 --- /dev/null +++ b/crates/storage/trie_db/layering.rs @@ -0,0 +1,136 @@ +use ethrex_common::H256; +use ethrex_rlp::decode::RLPDecode; +use std::{collections::HashMap, sync::Arc, sync::RwLock}; + +use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, TrieDB, TrieError}; + +// TODO: make this configurable or use safe hash for this +const COMMIT_THRESHOLD: usize = 10; + +#[derive(Debug)] +struct TrieLayer { + nodes: HashMap, Vec>, + parent: H256, + id: usize, +} + +#[derive(Debug, Default)] +pub struct TrieLayerCache { + /// Monotonically increasing ID for layers, starting at 1. + /// TODO: this implementation panics on overflow + last_id: usize, + layers: HashMap, +} + +impl TrieLayerCache { + pub fn get(&self, mut state_root: H256, key: Nibbles) -> Option> { + while let Some(layer) = self.layers.get(&state_root) { + if let Some(value) = layer.nodes.get(key.as_ref()) { + return Some(value.clone()); + } + state_root = layer.parent; + } + None + } + pub fn get_commitable(&mut self, mut state_root: H256) -> Option { + let mut counter = 0; + while let Some(layer) = self.layers.get(&state_root) { + state_root = layer.parent; + counter += 1; + if counter > COMMIT_THRESHOLD { + return Some(state_root); + } + } + None + } + pub fn put_batch( + &mut self, + parent: H256, + state_root: H256, + key_values: Vec<(Nibbles, Vec)>, + ) { + self.layers + .entry(state_root) + .or_insert_with(|| { + self.last_id += 1; + TrieLayer { + nodes: HashMap::new(), + parent, + id: self.last_id, + } + }) + .nodes + .extend( + key_values + .into_iter() + .map(|(path, node)| (path.as_ref().to_vec(), node)), + ); + } + pub fn commit(&mut self, state_root: H256) -> Option, Vec)>> { + let mut layer = self.layers.remove(&state_root)?; + // ensure parents are commited + let parent_nodes = self.commit(layer.parent); + // older layers are useless + self.layers.retain(|_, item| item.id > layer.id); + Some( + parent_nodes + .unwrap_or_default() + .into_iter() + .chain(layer.nodes.drain()) + .collect(), + ) + } +} + +pub struct TrieWrapper { + pub state_root: H256, + pub inner: Arc>, + pub db: Box, + pub prefix: Option, +} + +pub fn apply_prefix(prefix: Option, path: Nibbles) -> Nibbles { + // Apply a prefix with an invalid nibble (17) as a separator, to + // differentiate between a state trie value and a storage trie root. + match prefix { + Some(prefix) => Nibbles::from_bytes(prefix.as_bytes()) + .append_new(17) + .concat(path), + None => path, + } +} + +impl TrieDB for TrieWrapper { + fn get(&self, key: Nibbles) -> Result>, TrieError> { + let key = apply_prefix(self.prefix, key); + if let Some(value) = self + .inner + .read() + .map_err(|_| TrieError::LockError)? + .get(self.state_root, key.clone()) + { + return Ok(Some(value)); + } + self.db.get(key) + } + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { + let last_pair = key_values.iter().rev().find(|(_path, rlp)| !rlp.is_empty()); + let new_state_root = match last_pair { + Some((_, noderlp)) => { + let root_node = Node::decode(&noderlp)?; + root_node.compute_hash().finalize() + } + None => *EMPTY_TRIE_HASH, + }; + let mut inner = self.inner.write().map_err(|_| TrieError::LockError)?; + inner.put_batch( + self.state_root, + new_state_root, + key_values + .into_iter() + .map(move |(path, node)| (apply_prefix(self.prefix, path), node)) + .collect(), + ); + Ok(()) + } +} diff --git a/crates/storage/trie_db/mod.rs b/crates/storage/trie_db/mod.rs index 2a8abfb007..b6510a1fb3 100644 --- a/crates/storage/trie_db/mod.rs +++ b/crates/storage/trie_db/mod.rs @@ -2,3 +2,5 @@ pub mod rocksdb; #[cfg(feature = "rocksdb")] pub mod rocksdb_locked; + +pub mod layering; diff --git a/crates/storage/trie_db/rocksdb.rs b/crates/storage/trie_db/rocksdb.rs index 90858ac117..cb93c96be5 100644 --- a/crates/storage/trie_db/rocksdb.rs +++ b/crates/storage/trie_db/rocksdb.rs @@ -1,8 +1,10 @@ use ethrex_common::H256; -use ethrex_trie::{NodeHash, TrieDB, error::TrieError}; +use ethrex_trie::{Nibbles, TrieDB, error::TrieError}; use rocksdb::{MultiThreaded, OptimisticTransactionDB}; use std::sync::Arc; +use crate::trie_db::layering::apply_prefix; + /// RocksDB implementation for the TrieDB trait, with get and put operations. pub struct RocksDBTrieDB { /// RocksDB database @@ -40,39 +42,36 @@ impl RocksDBTrieDB { .ok_or_else(|| TrieError::DbError(anyhow::anyhow!("Column family not found"))) } - fn make_key(&self, node_hash: NodeHash) -> Vec { - match &self.address_prefix { - Some(address) => { - // For storage tries, prefix with address - let mut key = address.as_bytes().to_vec(); - key.extend_from_slice(node_hash.as_ref()); - key - } - None => { - // For state trie, use node hash directly - node_hash.as_ref().to_vec() - } - } + fn make_key(&self, node_hash: Nibbles) -> Vec { + apply_prefix(self.address_prefix, node_hash) + .as_ref() + .to_vec() } } impl TrieDB for RocksDBTrieDB { - fn get(&self, key: NodeHash) -> Result>, TrieError> { + fn get(&self, key: Nibbles) -> Result>, TrieError> { let cf = self.cf_handle()?; let db_key = self.make_key(key); - self.db - .get_cf(&cf, db_key) - .map_err(|e| TrieError::DbError(anyhow::anyhow!("RocksDB get error: {}", e))) + let res = self + .db + .get_cf(&cf, &db_key) + .map_err(|e| TrieError::DbError(anyhow::anyhow!("RocksDB get error: {}", e)))?; + Ok(res) } - fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { let cf = self.cf_handle()?; let mut batch = rocksdb::WriteBatchWithTransaction::default(); for (key, value) in key_values { let db_key = self.make_key(key); - batch.put_cf(&cf, db_key, value); + if value.is_empty() { + batch.delete_cf(&cf, db_key); + } else { + batch.put_cf(&cf, db_key, value); + } } self.db @@ -84,7 +83,7 @@ impl TrieDB for RocksDBTrieDB { #[cfg(test)] mod tests { use super::*; - use ethrex_trie::NodeHash; + use ethrex_trie::Nibbles; use rocksdb::{ColumnFamilyDescriptor, MultiThreaded, Options}; use tempfile::TempDir; @@ -111,12 +110,12 @@ mod tests { let trie_db = RocksDBTrieDB::new(db, "test_cf", None).unwrap(); // Test data - let node_hash = NodeHash::from(H256::from([1u8; 32])); + let node_hash = Nibbles::from_hex(vec![1]); let node_data = vec![1, 2, 3, 4, 5]; // Test put_batch trie_db - .put_batch(vec![(node_hash, node_data.clone())]) + .put_batch(vec![(node_hash.clone(), node_data.clone())]) .unwrap(); // Test get @@ -124,7 +123,7 @@ mod tests { assert_eq!(retrieved_data, node_data); // Test get nonexistent - let nonexistent_hash = NodeHash::from(H256::from([2u8; 32])); + let nonexistent_hash = Nibbles::from_hex(vec![2]); assert!(trie_db.get(nonexistent_hash).unwrap().is_none()); } @@ -152,12 +151,12 @@ mod tests { let trie_db = RocksDBTrieDB::new(db, "test_cf", Some(address)).unwrap(); // Test data - let node_hash = NodeHash::from(H256::from([1u8; 32])); + let node_hash = Nibbles::from_hex(vec![1]); let node_data = vec![1, 2, 3, 4, 5]; // Test put_batch trie_db - .put_batch(vec![(node_hash, node_data.clone())]) + .put_batch(vec![(node_hash.clone(), node_data.clone())]) .unwrap(); // Test get @@ -189,9 +188,9 @@ mod tests { // Test data let batch_data = vec![ - (NodeHash::from(H256::from([1u8; 32])), vec![1, 2, 3]), - (NodeHash::from(H256::from([2u8; 32])), vec![4, 5, 6]), - (NodeHash::from(H256::from([3u8; 32])), vec![7, 8, 9]), + (Nibbles::from_hex(vec![1]), vec![1, 2, 3]), + (Nibbles::from_hex(vec![1]), vec![4, 5, 6]), + (Nibbles::from_hex(vec![1]), vec![7, 8, 9]), ]; // Test batch put diff --git a/crates/storage/trie_db/rocksdb_locked.rs b/crates/storage/trie_db/rocksdb_locked.rs index a843ba4605..ed3f8f8e78 100644 --- a/crates/storage/trie_db/rocksdb_locked.rs +++ b/crates/storage/trie_db/rocksdb_locked.rs @@ -1,8 +1,10 @@ use ethrex_common::H256; -use ethrex_trie::{NodeHash, TrieDB, error::TrieError}; +use ethrex_trie::{Nibbles, TrieDB, error::TrieError}; use rocksdb::{MultiThreaded, OptimisticTransactionDB, SnapshotWithThreadMode}; use std::sync::Arc; +use crate::trie_db::layering::apply_prefix; + /// RocksDB locked implementation for the TrieDB trait, read-only with consistent snapshot. pub struct RocksDBLockedTrieDB { /// RocksDB database @@ -40,19 +42,10 @@ impl RocksDBLockedTrieDB { }) } - fn make_key(&self, node_hash: NodeHash) -> Vec { - match &self.address_prefix { - Some(address) => { - // For storage tries, prefix with address - let mut key = address.as_bytes().to_vec(); - key.extend_from_slice(node_hash.as_ref()); - key - } - None => { - // For state trie, use node hash directly - node_hash.as_ref().to_vec() - } - } + fn make_key(&self, node_hash: Nibbles) -> Vec { + apply_prefix(self.address_prefix, node_hash) + .as_ref() + .to_vec() } } @@ -69,7 +62,7 @@ impl Drop for RocksDBLockedTrieDB { } impl TrieDB for RocksDBLockedTrieDB { - fn get(&self, key: NodeHash) -> Result>, TrieError> { + fn get(&self, key: Nibbles) -> Result>, TrieError> { let db_key = self.make_key(key); self.snapshot @@ -77,7 +70,7 @@ impl TrieDB for RocksDBLockedTrieDB { .map_err(|e| TrieError::DbError(anyhow::anyhow!("RocksDB snapshot get error: {}", e))) } - fn put_batch(&self, _key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { + fn put_batch(&self, _key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { Err(TrieError::DbError(anyhow::anyhow!( "LockedTrie is read-only" ))) diff --git a/tooling/archive_sync/src/main.rs b/tooling/archive_sync/src/main.rs index 6c3ca4ea3b..76079cc5a8 100644 --- a/tooling/archive_sync/src/main.rs +++ b/tooling/archive_sync/src/main.rs @@ -172,7 +172,7 @@ async fn process_dump_storage( hashed_address: H256, storage_root: H256, ) -> eyre::Result<()> { - let mut trie = store.open_storage_trie(hashed_address, *EMPTY_TRIE_HASH)?; + let mut trie = store.open_storage_trie(hashed_address, *EMPTY_TRIE_HASH, todo!())?; for (key, val) in dump_storage { // The key we receive is the preimage of the one stored in the trie trie.insert(keccak(key.0).0.to_vec(), val.encode_to_vec())?;