Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
149 changes: 117 additions & 32 deletions core/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl<'a> state_db::MetaDb for StateMetaDb<'a> {
/// Block database
pub struct BlockchainDb<Block: BlockT> {
db: Arc<KeyValueDB>,
meta: RwLock<Meta<NumberFor<Block>, Block::Hash>>,
meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
}

Expand All @@ -159,7 +159,7 @@ impl<Block: BlockT> BlockchainDb<Block> {
Ok(BlockchainDb {
db,
leaves: RwLock::new(leaves),
meta: RwLock::new(meta),
meta: Arc::new(RwLock::new(meta)),
})
}

Expand Down Expand Up @@ -368,6 +368,7 @@ impl state_machine::Storage<Blake2Hasher> for DbGenesisStorage {

pub struct DbChangesTrieStorage<Block: BlockT> {
db: Arc<KeyValueDB>,
meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
min_blocks_to_keep: Option<u64>,
_phantom: ::std::marker::PhantomData<Block>,
}
Expand All @@ -381,7 +382,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
}

/// Prune obsolete changes tries.
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block: NumberFor<Block>) {
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor<Block>) {
// never prune on archive nodes
let min_blocks_to_keep = match self.min_blocks_to_keep {
Some(min_blocks_to_keep) => min_blocks_to_keep,
Expand All @@ -399,23 +400,44 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
&config,
&*self,
min_blocks_to_keep,
block.as_(),
&state_machine::ChangesTrieAnchorBlockId {
hash: utils::convert_hash(&block_hash),
number: block_num.as_(),
},
|node| tx.delete(columns::CHANGES_TRIE, node.as_ref()));
}
}

impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
fn root(&self, block: u64) -> Result<Option<H256>, String> {
Ok(read_db::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Number(As::sa(block)))
.map_err(|err| format!("{}", err))
.and_then(|header| match header {
Some(header) => Block::Header::decode(&mut &header[..])
.ok_or_else(|| format!("Failed to parse header of block {}", block))
.map(Some),
None => Ok(None)
})?
.and_then(|header| header.digest().log(DigestItem::as_changes_trie_root)
.map(|root| H256::from_slice(root.as_ref()))))
fn root(&self, anchor: &state_machine::ChangesTrieAnchorBlockId<H256>, block: u64) -> Result<Option<H256>, String> {
// check API requirement
assert!(block <= anchor.number, "API requirement");

// we need to get hash of the block to resolve changes trie root
let block_id = if block <= self.meta.read().finalized_number.as_() {
// if block is finalized, we could just read canonical hash
BlockId::Number(As::sa(block))
} else {
// if block is not finalized yet, we should find the required block hash by traversing
// back from the anchor to the block with given number
let mut current_num = anchor.number;
let mut current_hash: Block::Hash = ::utils::convert_hash(&anchor.hash);
while current_num != block {
let current_header: Block::Header = ::utils::require_header::<Block>(
&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Hash(current_hash)
).map_err(|e| e.to_string())?;

current_hash = *current_header.parent_hash();
current_num = current_num - 1;
}

BlockId::Hash(current_hash)
};

Ok(::utils::require_header::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, block_id)
.map_err(|e| e.to_string())?
.digest().log(DigestItem::as_changes_trie_root)
.map(|root| H256::from_slice(root.as_ref())))
}
}

Expand Down Expand Up @@ -461,6 +483,7 @@ impl<Block: BlockT> Backend<Block> {
fn from_kvdb(db: Arc<KeyValueDB>, pruning: PruningMode, canonicalization_delay: u64) -> Result<Self, client::error::Error> {
let is_archive_pruning = pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone())?;
let meta = blockchain.meta.clone();
let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e));
let state_db: StateDb<Block::Hash, H256> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?;
let storage_db = StorageDb {
Expand All @@ -469,6 +492,7 @@ impl<Block: BlockT> Backend<Block> {
};
let changes_tries_storage = DbChangesTrieStorage {
db,
meta,
min_blocks_to_keep: if is_archive_pruning { None } else { Some(MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR) },
_phantom: Default::default(),
};
Expand Down Expand Up @@ -544,7 +568,7 @@ impl<Block: BlockT> Backend<Block> {
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v));
self.changes_tries_storage.prune(changes_trie_config, transaction, f_num);
self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num);
}

Ok(())
Expand Down Expand Up @@ -1157,10 +1181,13 @@ mod tests {
#[test]
fn changes_trie_storage_works() {
let backend = Backend::<Block>::new_test(1000, 100);
backend.changes_tries_storage.meta.write().finalized_number = 1000;


let check_changes = |backend: &Backend<Block>, block: u64, changes: Vec<(Vec<u8>, Vec<u8>)>| {
let (changes_root, mut changes_trie_update) = prepare_changes(changes);
assert_eq!(backend.changes_tries_storage.root(block), Ok(Some(changes_root)));
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: Default::default(), number: block };
assert_eq!(backend.changes_tries_storage.root(&anchor, block), Ok(Some(changes_root)));

for (key, (val, _)) in changes_trie_update.drain() {
assert_eq!(backend.changes_trie_storage().unwrap().get(&key), Ok(Some(val)));
Expand All @@ -1184,9 +1211,66 @@ mod tests {
check_changes(&backend, 2, changes2);
}

#[test]
fn changes_trie_storage_works_with_forks() {
let backend = Backend::<Block>::new_test(1000, 100);

let changes0 = vec![(b"k0".to_vec(), b"v0".to_vec())];
let changes1 = vec![(b"k1".to_vec(), b"v1".to_vec())];
let changes2 = vec![(b"k2".to_vec(), b"v2".to_vec())];
let block0 = insert_header(&backend, 0, Default::default(), changes0.clone(), Default::default());
let block1 = insert_header(&backend, 1, block0, changes1.clone(), Default::default());
let block2 = insert_header(&backend, 2, block1, changes2.clone(), Default::default());

let changes2_1_0 = vec![(b"k3".to_vec(), b"v3".to_vec())];
let changes2_1_1 = vec![(b"k4".to_vec(), b"v4".to_vec())];
let block2_1_0 = insert_header(&backend, 3, block2, changes2_1_0.clone(), Default::default());
let block2_1_1 = insert_header(&backend, 4, block2_1_0, changes2_1_1.clone(), Default::default());

let changes2_2_0 = vec![(b"k5".to_vec(), b"v5".to_vec())];
let changes2_2_1 = vec![(b"k6".to_vec(), b"v6".to_vec())];
let block2_2_0 = insert_header(&backend, 3, block2, changes2_2_0.clone(), Default::default());
let block2_2_1 = insert_header(&backend, 4, block2_2_0, changes2_2_1.clone(), Default::default());

// finalized block1
backend.changes_tries_storage.meta.write().finalized_number = 1;

// branch1: when asking for finalized block hash
let (changes1_root, _) = prepare_changes(changes1);
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_1_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 1), Ok(Some(changes1_root)));

// branch2: when asking for finalized block hash
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_2_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 1), Ok(Some(changes1_root)));

// branch1: when asking for non-finalized block hash
let (changes2_1_0_root, _) = prepare_changes(changes2_1_0);
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_1_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_1_0_root)));

// branch2: when asking for non-finalized block hash
let (changes2_2_0_root, _) = prepare_changes(changes2_2_0);
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_2_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_2_0_root)));

// finalized first block of branch2
backend.changes_tries_storage.meta.write().finalized_number = 3;

// branch1: when asking for finalized block of other branch
// => result is incorrect (returned for the block of branch1), but this is expected,
// because the other fork is abandoned (forked before finalized header)
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_2_0_root)));

// branch2: when asking for finalized block of this branch
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_1_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_2_0_root)));
}

#[test]
fn changes_tries_are_pruned_on_finalization() {
let mut backend = Backend::<Block>::new_test(1000, 100);
backend.changes_tries_storage.meta.write().finalized_number = 1000;
backend.changes_tries_storage.min_blocks_to_keep = Some(8);
let config = ChangesTrieConfiguration {
digest_interval: 2,
Expand All @@ -1209,26 +1293,27 @@ mod tests {
let _ = insert_header(&backend, 12, block11, vec![(b"key_at_12".to_vec(), b"val_at_12".to_vec())], Default::default());

// check that roots of all tries are in the columns::CHANGES_TRIE
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: Default::default(), number: 100 };
fn read_changes_trie_root(backend: &Backend<Block>, num: u64) -> H256 {
backend.blockchain().header(BlockId::Number(num)).unwrap().unwrap().digest().logs().iter()
.find(|i| i.as_changes_trie_root().is_some()).unwrap().as_changes_trie_root().unwrap().clone()
}
let root1 = read_changes_trie_root(&backend, 1); assert_eq!(backend.changes_tries_storage.root(1).unwrap(), Some(root1));
let root2 = read_changes_trie_root(&backend, 2); assert_eq!(backend.changes_tries_storage.root(2).unwrap(), Some(root2));
let root3 = read_changes_trie_root(&backend, 3); assert_eq!(backend.changes_tries_storage.root(3).unwrap(), Some(root3));
let root4 = read_changes_trie_root(&backend, 4); assert_eq!(backend.changes_tries_storage.root(4).unwrap(), Some(root4));
let root5 = read_changes_trie_root(&backend, 5); assert_eq!(backend.changes_tries_storage.root(5).unwrap(), Some(root5));
let root6 = read_changes_trie_root(&backend, 6); assert_eq!(backend.changes_tries_storage.root(6).unwrap(), Some(root6));
let root7 = read_changes_trie_root(&backend, 7); assert_eq!(backend.changes_tries_storage.root(7).unwrap(), Some(root7));
let root8 = read_changes_trie_root(&backend, 8); assert_eq!(backend.changes_tries_storage.root(8).unwrap(), Some(root8));
let root9 = read_changes_trie_root(&backend, 9); assert_eq!(backend.changes_tries_storage.root(9).unwrap(), Some(root9));
let root10 = read_changes_trie_root(&backend, 10); assert_eq!(backend.changes_tries_storage.root(10).unwrap(), Some(root10));
let root11 = read_changes_trie_root(&backend, 11); assert_eq!(backend.changes_tries_storage.root(11).unwrap(), Some(root11));
let root12 = read_changes_trie_root(&backend, 12); assert_eq!(backend.changes_tries_storage.root(12).unwrap(), Some(root12));
let root1 = read_changes_trie_root(&backend, 1); assert_eq!(backend.changes_tries_storage.root(&anchor, 1).unwrap(), Some(root1));
let root2 = read_changes_trie_root(&backend, 2); assert_eq!(backend.changes_tries_storage.root(&anchor, 2).unwrap(), Some(root2));
let root3 = read_changes_trie_root(&backend, 3); assert_eq!(backend.changes_tries_storage.root(&anchor, 3).unwrap(), Some(root3));
let root4 = read_changes_trie_root(&backend, 4); assert_eq!(backend.changes_tries_storage.root(&anchor, 4).unwrap(), Some(root4));
let root5 = read_changes_trie_root(&backend, 5); assert_eq!(backend.changes_tries_storage.root(&anchor, 5).unwrap(), Some(root5));
let root6 = read_changes_trie_root(&backend, 6); assert_eq!(backend.changes_tries_storage.root(&anchor, 6).unwrap(), Some(root6));
let root7 = read_changes_trie_root(&backend, 7); assert_eq!(backend.changes_tries_storage.root(&anchor, 7).unwrap(), Some(root7));
let root8 = read_changes_trie_root(&backend, 8); assert_eq!(backend.changes_tries_storage.root(&anchor, 8).unwrap(), Some(root8));
let root9 = read_changes_trie_root(&backend, 9); assert_eq!(backend.changes_tries_storage.root(&anchor, 9).unwrap(), Some(root9));
let root10 = read_changes_trie_root(&backend, 10); assert_eq!(backend.changes_tries_storage.root(&anchor, 10).unwrap(), Some(root10));
let root11 = read_changes_trie_root(&backend, 11); assert_eq!(backend.changes_tries_storage.root(&anchor, 11).unwrap(), Some(root11));
let root12 = read_changes_trie_root(&backend, 12); assert_eq!(backend.changes_tries_storage.root(&anchor, 12).unwrap(), Some(root12));

// now simulate finalization of block#12, causing prune of tries at #1..#4
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 12);
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, Default::default(), 12);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root1).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root2).unwrap().is_none());
Expand All @@ -1241,7 +1326,7 @@ mod tests {

// now simulate finalization of block#16, causing prune of tries at #5..#8
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 16);
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, Default::default(), 16);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root5).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root6).unwrap().is_none());
Expand All @@ -1252,7 +1337,7 @@ mod tests {
// => no changes tries are pruned, because we never prune in archive mode
backend.changes_tries_storage.min_blocks_to_keep = None;
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config), &mut tx, 20);
backend.changes_tries_storage.prune(Some(config), &mut tx, Default::default(), 20);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root9).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root10).unwrap().is_some());
Expand Down
19 changes: 19 additions & 0 deletions core/client/db/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,17 @@ pub fn read_header<Block: BlockT>(
}
}

/// Required header from the database.
pub fn require_header<Block: BlockT>(
db: &KeyValueDB,
col_index: Option<u32>,
col: Option<u32>,
id: BlockId<Block>,
) -> client::error::Result<Block::Header> {
read_header(db, col_index, col, id)
.and_then(|header| header.ok_or_else(|| client::error::ErrorKind::UnknownBlock(format!("{}", id)).into()))
}

/// Read meta from the database.
pub fn read_meta<Block>(db: &KeyValueDB, col_meta: Option<u32>, col_header: Option<u32>) -> Result<
Meta<<<Block as BlockT>::Header as HeaderT>::Number, Block::Hash>,
Expand Down Expand Up @@ -234,3 +245,11 @@ pub fn read_meta<Block>(db: &KeyValueDB, col_meta: Option<u32>, col_header: Opti
genesis_hash,
})
}

/// Converts one hash type into another.
pub(crate) fn convert_hash<H1: Default + AsMut<[u8]>, H2: AsRef<[u8]>>(src: &H2) -> H1 {
let mut dest = H1::default();
let len = ::std::cmp::min(dest.as_mut().len(), src.as_ref().len());
dest.as_mut().copy_from_slice(&src.as_ref()[..len]);
dest
}
22 changes: 18 additions & 4 deletions core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use primitives::storage::well_known_keys;
use codec::{Encode, Decode};
use state_machine::{
Backend as StateBackend, CodeExecutor,
ExecutionStrategy, ExecutionManager, prove_read,
key_changes, key_changes_proof, OverlayedChanges
ExecutionStrategy, ExecutionManager, ChangesTrieAnchorBlockId,
prove_read, key_changes, key_changes_proof, OverlayedChanges
};

use backend::{self, BlockImportOperation};
Expand Down Expand Up @@ -404,7 +404,10 @@ impl<B, E, Block> Client<B, E, Block> where
config,
storage,
self.require_block_number_from_id(&BlockId::Hash(first))?.as_(),
self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
&ChangesTrieAnchorBlockId {
hash: convert_hash(&last),
number: self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
},
self.backend.blockchain().info()?.best_number.as_(),
key)
.map_err(|err| error::ErrorKind::ChangesTrieAccessFailed(err).into())
Expand Down Expand Up @@ -437,7 +440,10 @@ impl<B, E, Block> Client<B, E, Block> where
config,
storage,
self.require_block_number_from_id(&BlockId::Hash(first))?.as_(),
self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
&ChangesTrieAnchorBlockId {
hash: convert_hash(&last),
number: self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
},
max_number.as_(),
key)
.map_err(|err| error::ErrorKind::ChangesTrieAccessFailed(err).into())
Expand Down Expand Up @@ -1198,6 +1204,14 @@ impl<B, E, Block> api::Miscellaneous<Block> for Client<B, E, Block> where
}
}

/// Converts one hash type into another.
pub(crate) fn convert_hash<H1: Default + AsMut<[u8]>, H2: AsRef<[u8]>>(src: &H2) -> H1 {
let mut dest = H1::default();
let len = ::std::cmp::min(dest.as_mut().len(), src.as_ref().len());
dest.as_mut().copy_from_slice(&src.as_ref()[..len]);
dest
}

#[cfg(test)]
pub(crate) mod tests {
use std::collections::HashMap;
Expand Down
24 changes: 12 additions & 12 deletions core/client/src/light/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use hash_db::Hasher;
use heapsize::HeapSizeOf;
use primitives::ChangesTrieConfiguration;
use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor};
use state_machine::{CodeExecutor, ChangesTrieRootsStorage, read_proof_check,
key_changes_proof_check};
use state_machine::{CodeExecutor, ChangesTrieRootsStorage, ChangesTrieAnchorBlockId,
read_proof_check, key_changes_proof_check};

use call_executor::CallResult;
use cht;
use client::convert_hash;
use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult};
use light::call_executor::check_execution_proof;

Expand Down Expand Up @@ -192,9 +193,8 @@ impl<E, Block, H> FetchChecker<Block> for LightDataChecker<E, H>
request: &RemoteReadRequest<Block::Header>,
remote_proof: Vec<Vec<u8>>
) -> ClientResult<Option<Vec<u8>>> {
let mut root: H::Out = Default::default();
root.as_mut().copy_from_slice(request.header.state_root().as_ref());
read_proof_check::<H>(root, remote_proof, &request.key).map_err(Into::into)
read_proof_check::<H>(convert_hash(request.header.state_root()), remote_proof, &request.key)
.map_err(Into::into)
}

fn check_execution_proof(
Expand Down Expand Up @@ -229,7 +229,10 @@ impl<E, Block, H> FetchChecker<Block> for LightDataChecker<E, H>
},
remote_proof,
first_number,
request.last_block.0.as_(),
&ChangesTrieAnchorBlockId {
hash: convert_hash(&request.last_block.1),
number: request.last_block.0.as_(),
},
remote_max.as_(),
&request.key)
.map(|pairs| pairs.into_iter().map(|(b, x)| (As::sa(b), x)).collect())
Expand All @@ -248,15 +251,12 @@ impl<'a, H, Hash> ChangesTrieRootsStorage<H> for RootsStorage<'a, Hash>
H: Hasher,
Hash: 'a + Send + Sync + Clone + AsRef<[u8]>,
{
fn root(&self, block: u64) -> Result<Option<H::Out>, String> {
fn root(&self, _anchor: &ChangesTrieAnchorBlockId<H::Out>, block: u64) -> Result<Option<H::Out>, String> {
// we can't ask for roots from parallel forks here => ignore anchor
Ok(block.checked_sub(self.first)
.and_then(|index| self.roots.get(index as usize))
.cloned()
.map(|root| {
let mut hasher_root: H::Out = Default::default();
hasher_root.as_mut().copy_from_slice(root.as_ref());
hasher_root
}))
.map(|root| convert_hash(&root)))
}
}

Expand Down
Loading