Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
161 changes: 128 additions & 33 deletions core/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use hash_db::Hasher;
use kvdb::{KeyValueDB, DBTransaction};
use trie::MemoryDB;
use parking_lot::RwLock;
use primitives::{H256, AuthorityId, Blake2Hasher, ChangesTrieConfiguration};
use primitives::{H256, AuthorityId, Blake2Hasher, ChangesTrieConfiguration, convert_hash};
use primitives::storage::well_known_keys;
use runtime_primitives::{generic::BlockId, Justification, StorageMap, ChildrenStorageMap};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero, Digest, DigestItem};
Expand Down Expand Up @@ -145,7 +145,7 @@ impl<'a> state_db::MetaDb for StateMetaDb<'a> {
/// Block database
pub struct BlockchainDb<Block: BlockT> {
db: Arc<KeyValueDB>,
meta: RwLock<Meta<NumberFor<Block>, Block::Hash>>,
meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
}

Expand All @@ -156,7 +156,7 @@ impl<Block: BlockT> BlockchainDb<Block> {
Ok(BlockchainDb {
db,
leaves: RwLock::new(leaves),
meta: RwLock::new(meta),
meta: Arc::new(RwLock::new(meta)),
})
}

Expand Down Expand Up @@ -387,6 +387,7 @@ impl state_machine::Storage<Blake2Hasher> for DbGenesisStorage {

pub struct DbChangesTrieStorage<Block: BlockT> {
db: Arc<KeyValueDB>,
meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
min_blocks_to_keep: Option<u64>,
_phantom: ::std::marker::PhantomData<Block>,
}
Expand All @@ -400,7 +401,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
}

/// Prune obsolete changes tries.
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block: NumberFor<Block>) {
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor<Block>) {
// never prune on archive nodes
let min_blocks_to_keep = match self.min_blocks_to_keep {
Some(min_blocks_to_keep) => min_blocks_to_keep,
Expand All @@ -418,23 +419,54 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> {
&config,
&*self,
min_blocks_to_keep,
block.as_(),
&state_machine::ChangesTrieAnchorBlockId {
hash: convert_hash(&block_hash),
number: block_num.as_(),
},
|node| tx.delete(columns::CHANGES_TRIE, node.as_ref()));
}
}

impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
fn root(&self, block: u64) -> Result<Option<H256>, String> {
Ok(read_db::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Number(As::sa(block)))
.map_err(|err| format!("{}", err))
.and_then(|header| match header {
Some(header) => Block::Header::decode(&mut &header[..])
.ok_or_else(|| format!("Failed to parse header of block {}", block))
.map(Some),
None => Ok(None)
})?
.and_then(|header| header.digest().log(DigestItem::as_changes_trie_root)
.map(|root| H256::from_slice(root.as_ref()))))
fn root(&self, anchor: &state_machine::ChangesTrieAnchorBlockId<H256>, block: u64) -> Result<Option<H256>, String> {
// check API requirement
assert!(block <= anchor.number, "API requirement");

// we need to get hash of the block to resolve changes trie root
let block_id = if block <= self.meta.read().finalized_number.as_() {
// if block is finalized, we could just read canonical hash
BlockId::Number(As::sa(block))
} else {
// the block is not finalized
let mut current_num = anchor.number;
let mut current_hash: Block::Hash = convert_hash(&anchor.hash);
let maybe_anchor_header: Block::Header = ::utils::require_header::<Block>(
&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Number(As::sa(current_num))
).map_err(|e| e.to_string())?;
if maybe_anchor_header.hash() == current_hash {
// if anchor is canonicalized, then the block is also canonicalized
BlockId::Number(As::sa(block))
} else {
// else (block is not finalized + anchor is not canonicalized):
// => we should find the required block hash by traversing
// back from the anchor to the block with given number
while current_num != block {
let current_header: Block::Header = ::utils::require_header::<Block>(
&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Hash(current_hash)
).map_err(|e| e.to_string())?;

current_hash = *current_header.parent_hash();
current_num = current_num - 1;
}

BlockId::Hash(current_hash)
}
};

Ok(::utils::require_header::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, block_id)
.map_err(|e| e.to_string())?
.digest().log(DigestItem::as_changes_trie_root)
.map(|root| H256::from_slice(root.as_ref())))
}
}

Expand Down Expand Up @@ -480,6 +512,7 @@ impl<Block: BlockT> Backend<Block> {
fn from_kvdb(db: Arc<KeyValueDB>, pruning: PruningMode, canonicalization_delay: u64) -> Result<Self, client::error::Error> {
let is_archive_pruning = pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone())?;
let meta = blockchain.meta.clone();
let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e));
let state_db: StateDb<Block::Hash, H256> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?;
let storage_db = StorageDb {
Expand All @@ -488,6 +521,7 @@ impl<Block: BlockT> Backend<Block> {
};
let changes_tries_storage = DbChangesTrieStorage {
db,
meta,
min_blocks_to_keep: if is_archive_pruning { None } else { Some(MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR) },
_phantom: Default::default(),
};
Expand Down Expand Up @@ -567,7 +601,7 @@ impl<Block: BlockT> Backend<Block> {
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v));
self.changes_tries_storage.prune(changes_trie_config, transaction, f_num);
self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num);
}

Ok(())
Expand Down Expand Up @@ -1191,10 +1225,13 @@ mod tests {
#[test]
fn changes_trie_storage_works() {
let backend = Backend::<Block>::new_test(1000, 100);
backend.changes_tries_storage.meta.write().finalized_number = 1000;


let check_changes = |backend: &Backend<Block>, block: u64, changes: Vec<(Vec<u8>, Vec<u8>)>| {
let (changes_root, mut changes_trie_update) = prepare_changes(changes);
assert_eq!(backend.changes_tries_storage.root(block), Ok(Some(changes_root)));
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: Default::default(), number: block };
assert_eq!(backend.changes_tries_storage.root(&anchor, block), Ok(Some(changes_root)));

for (key, (val, _)) in changes_trie_update.drain() {
assert_eq!(backend.changes_trie_storage().unwrap().get(&key), Ok(Some(val)));
Expand All @@ -1218,9 +1255,66 @@ mod tests {
check_changes(&backend, 2, changes2);
}

#[test]
fn changes_trie_storage_works_with_forks() {
let backend = Backend::<Block>::new_test(1000, 100);

let changes0 = vec![(b"k0".to_vec(), b"v0".to_vec())];
let changes1 = vec![(b"k1".to_vec(), b"v1".to_vec())];
let changes2 = vec![(b"k2".to_vec(), b"v2".to_vec())];
let block0 = insert_header(&backend, 0, Default::default(), changes0.clone(), Default::default());
let block1 = insert_header(&backend, 1, block0, changes1.clone(), Default::default());
let block2 = insert_header(&backend, 2, block1, changes2.clone(), Default::default());

let changes2_1_0 = vec![(b"k3".to_vec(), b"v3".to_vec())];
let changes2_1_1 = vec![(b"k4".to_vec(), b"v4".to_vec())];
let block2_1_0 = insert_header(&backend, 3, block2, changes2_1_0.clone(), Default::default());
let block2_1_1 = insert_header(&backend, 4, block2_1_0, changes2_1_1.clone(), Default::default());

let changes2_2_0 = vec![(b"k5".to_vec(), b"v5".to_vec())];
let changes2_2_1 = vec![(b"k6".to_vec(), b"v6".to_vec())];
let block2_2_0 = insert_header(&backend, 3, block2, changes2_2_0.clone(), Default::default());
let block2_2_1 = insert_header(&backend, 4, block2_2_0, changes2_2_1.clone(), Default::default());

// finalize block1
backend.changes_tries_storage.meta.write().finalized_number = 1;

// branch1: when asking for finalized block hash
let (changes1_root, _) = prepare_changes(changes1);
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_1_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 1), Ok(Some(changes1_root)));

// branch2: when asking for finalized block hash
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_2_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 1), Ok(Some(changes1_root)));

// branch1: when asking for non-finalized block hash (search by traversal)
let (changes2_1_0_root, _) = prepare_changes(changes2_1_0);
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_1_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_1_0_root)));

// branch2: when asking for non-finalized block hash (search using canonicalized hint)
let (changes2_2_0_root, _) = prepare_changes(changes2_2_0);
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_2_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_2_0_root)));

// finalize first block of branch2 (block2_2_0)
backend.changes_tries_storage.meta.write().finalized_number = 3;

// branch2: when asking for finalized block of this branch
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_2_0_root)));

// branch1: when asking for finalized block of other branch
// => result is incorrect (returned for the block of branch1), but this is expected,
// because the other fork is abandoned (forked before finalized header)
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: block2_1_1, number: 4 };
assert_eq!(backend.changes_tries_storage.root(&anchor, 3), Ok(Some(changes2_2_0_root)));
}

#[test]
fn changes_tries_are_pruned_on_finalization() {
let mut backend = Backend::<Block>::new_test(1000, 100);
backend.changes_tries_storage.meta.write().finalized_number = 1000;
backend.changes_tries_storage.min_blocks_to_keep = Some(8);
let config = ChangesTrieConfiguration {
digest_interval: 2,
Expand All @@ -1243,26 +1337,27 @@ mod tests {
let _ = insert_header(&backend, 12, block11, vec![(b"key_at_12".to_vec(), b"val_at_12".to_vec())], Default::default());

// check that roots of all tries are in the columns::CHANGES_TRIE
let anchor = state_machine::ChangesTrieAnchorBlockId { hash: Default::default(), number: 100 };
fn read_changes_trie_root(backend: &Backend<Block>, num: u64) -> H256 {
backend.blockchain().header(BlockId::Number(num)).unwrap().unwrap().digest().logs().iter()
.find(|i| i.as_changes_trie_root().is_some()).unwrap().as_changes_trie_root().unwrap().clone()
}
let root1 = read_changes_trie_root(&backend, 1); assert_eq!(backend.changes_tries_storage.root(1).unwrap(), Some(root1));
let root2 = read_changes_trie_root(&backend, 2); assert_eq!(backend.changes_tries_storage.root(2).unwrap(), Some(root2));
let root3 = read_changes_trie_root(&backend, 3); assert_eq!(backend.changes_tries_storage.root(3).unwrap(), Some(root3));
let root4 = read_changes_trie_root(&backend, 4); assert_eq!(backend.changes_tries_storage.root(4).unwrap(), Some(root4));
let root5 = read_changes_trie_root(&backend, 5); assert_eq!(backend.changes_tries_storage.root(5).unwrap(), Some(root5));
let root6 = read_changes_trie_root(&backend, 6); assert_eq!(backend.changes_tries_storage.root(6).unwrap(), Some(root6));
let root7 = read_changes_trie_root(&backend, 7); assert_eq!(backend.changes_tries_storage.root(7).unwrap(), Some(root7));
let root8 = read_changes_trie_root(&backend, 8); assert_eq!(backend.changes_tries_storage.root(8).unwrap(), Some(root8));
let root9 = read_changes_trie_root(&backend, 9); assert_eq!(backend.changes_tries_storage.root(9).unwrap(), Some(root9));
let root10 = read_changes_trie_root(&backend, 10); assert_eq!(backend.changes_tries_storage.root(10).unwrap(), Some(root10));
let root11 = read_changes_trie_root(&backend, 11); assert_eq!(backend.changes_tries_storage.root(11).unwrap(), Some(root11));
let root12 = read_changes_trie_root(&backend, 12); assert_eq!(backend.changes_tries_storage.root(12).unwrap(), Some(root12));
let root1 = read_changes_trie_root(&backend, 1); assert_eq!(backend.changes_tries_storage.root(&anchor, 1).unwrap(), Some(root1));
let root2 = read_changes_trie_root(&backend, 2); assert_eq!(backend.changes_tries_storage.root(&anchor, 2).unwrap(), Some(root2));
let root3 = read_changes_trie_root(&backend, 3); assert_eq!(backend.changes_tries_storage.root(&anchor, 3).unwrap(), Some(root3));
let root4 = read_changes_trie_root(&backend, 4); assert_eq!(backend.changes_tries_storage.root(&anchor, 4).unwrap(), Some(root4));
let root5 = read_changes_trie_root(&backend, 5); assert_eq!(backend.changes_tries_storage.root(&anchor, 5).unwrap(), Some(root5));
let root6 = read_changes_trie_root(&backend, 6); assert_eq!(backend.changes_tries_storage.root(&anchor, 6).unwrap(), Some(root6));
let root7 = read_changes_trie_root(&backend, 7); assert_eq!(backend.changes_tries_storage.root(&anchor, 7).unwrap(), Some(root7));
let root8 = read_changes_trie_root(&backend, 8); assert_eq!(backend.changes_tries_storage.root(&anchor, 8).unwrap(), Some(root8));
let root9 = read_changes_trie_root(&backend, 9); assert_eq!(backend.changes_tries_storage.root(&anchor, 9).unwrap(), Some(root9));
let root10 = read_changes_trie_root(&backend, 10); assert_eq!(backend.changes_tries_storage.root(&anchor, 10).unwrap(), Some(root10));
let root11 = read_changes_trie_root(&backend, 11); assert_eq!(backend.changes_tries_storage.root(&anchor, 11).unwrap(), Some(root11));
let root12 = read_changes_trie_root(&backend, 12); assert_eq!(backend.changes_tries_storage.root(&anchor, 12).unwrap(), Some(root12));

// now simulate finalization of block#12, causing prune of tries at #1..#4
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 12);
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, Default::default(), 12);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root1).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root2).unwrap().is_none());
Expand All @@ -1275,7 +1370,7 @@ mod tests {

// now simulate finalization of block#16, causing prune of tries at #5..#8
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 16);
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, Default::default(), 16);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root5).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root6).unwrap().is_none());
Expand All @@ -1286,7 +1381,7 @@ mod tests {
// => no changes tries are pruned, because we never prune in archive mode
backend.changes_tries_storage.min_blocks_to_keep = None;
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config), &mut tx, 20);
backend.changes_tries_storage.prune(Some(config), &mut tx, Default::default(), 20);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root9).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root10).unwrap().is_some());
Expand Down
11 changes: 11 additions & 0 deletions core/client/db/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,17 @@ pub fn read_header<Block: BlockT>(
}
}

/// Required header from the database.
pub fn require_header<Block: BlockT>(
db: &KeyValueDB,
col_index: Option<u32>,
col: Option<u32>,
id: BlockId<Block>,
) -> client::error::Result<Block::Header> {
read_header(db, col_index, col, id)
.and_then(|header| header.ok_or_else(|| client::error::ErrorKind::UnknownBlock(format!("{}", id)).into()))
}

/// Read meta from the database.
pub fn read_meta<Block>(db: &KeyValueDB, col_meta: Option<u32>, col_header: Option<u32>) -> Result<
Meta<<<Block as BlockT>::Header as HeaderT>::Number, Block::Hash>,
Expand Down
5 changes: 2 additions & 3 deletions core/client/src/cht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use hash_db;
use heapsize::HeapSizeOf;
use trie;

use primitives::H256;
use primitives::{H256, convert_hash};
use runtime_primitives::traits::{As, Header as HeaderT, SimpleArithmetic, One};
use state_machine::backend::InMemory as InMemoryState;
use state_machine::{prove_read, read_proof_check};
Expand Down Expand Up @@ -113,8 +113,7 @@ pub fn check_proof<Header, Hasher>(
Hasher: hash_db::Hasher,
Hasher::Out: Ord + HeapSizeOf,
{
let mut root: Hasher::Out = Default::default();
root.as_mut().copy_from_slice(local_root.as_ref());
let root: Hasher::Out = convert_hash(&local_root);
let local_cht_key = encode_cht_key(local_number);
let local_cht_value = read_proof_check::<Hasher>(root, remote_proof,
&local_cht_key).map_err(|e| ClientError::from(e))?;
Expand Down
16 changes: 11 additions & 5 deletions core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ use consensus::{ImportBlock, ImportResult, BlockOrigin};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero, As, NumberFor, CurrentHeight, BlockNumberToHash};
use runtime_primitives::{ApplyResult, BuildStorage};
use runtime_api as api;
use primitives::{Blake2Hasher, H256, ChangesTrieConfiguration};
use primitives::{Blake2Hasher, H256, ChangesTrieConfiguration, convert_hash};
use primitives::storage::{StorageKey, StorageData};
use primitives::storage::well_known_keys;
use codec::{Encode, Decode};
use state_machine::{
Backend as StateBackend, CodeExecutor,
ExecutionStrategy, ExecutionManager, prove_read,
key_changes, key_changes_proof, OverlayedChanges
ExecutionStrategy, ExecutionManager, ChangesTrieAnchorBlockId,
prove_read, key_changes, key_changes_proof, OverlayedChanges
};

use backend::{self, BlockImportOperation};
Expand Down Expand Up @@ -355,7 +355,10 @@ impl<B, E, Block> Client<B, E, Block> where
config,
storage,
self.require_block_number_from_id(&BlockId::Hash(first))?.as_(),
self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
&ChangesTrieAnchorBlockId {
hash: convert_hash(&last),
number: self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
},
self.backend.blockchain().info()?.best_number.as_(),
key)
.map_err(|err| error::ErrorKind::ChangesTrieAccessFailed(err).into())
Expand Down Expand Up @@ -388,7 +391,10 @@ impl<B, E, Block> Client<B, E, Block> where
config,
storage,
self.require_block_number_from_id(&BlockId::Hash(first))?.as_(),
self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
&ChangesTrieAnchorBlockId {
hash: convert_hash(&last),
number: self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
},
max_number.as_(),
key)
.map_err(|err| error::ErrorKind::ChangesTrieAccessFailed(err).into())
Expand Down
4 changes: 2 additions & 2 deletions core/client/src/light/call_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use futures::{IntoFuture, Future};

use primitives::convert_hash;
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use state_machine::{Backend as StateBackend, CodeExecutor, OverlayedChanges,
Expand Down Expand Up @@ -136,8 +137,7 @@ pub fn check_execution_proof<Header, E, H>(

{
let local_state_root = request.header.state_root();
let mut root: H::Out = Default::default();
root.as_mut().copy_from_slice(local_state_root.as_ref());
let root: H::Out = convert_hash(&local_state_root);

let mut changes = OverlayedChanges::default();
let local_result = execution_proof_check::<H, _>(
Expand Down
Loading