Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 158 additions & 120 deletions core/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,119 +625,22 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
};

trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash);
let commit = self.storage.state_db.canonicalize_block(&hash);
let commit = self.storage.state_db.canonicalize_block(&hash)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(transaction, commit);
};

Ok(())
}

// write stuff to a transaction after a new block is finalized.
// this canonicalizes finalized blocks. Fails if called with a block which
// was not a child of the last finalized block.
fn note_finalized(
&self,
transaction: &mut DBTransaction,
f_header: &Block::Header,
f_hash: Block::Hash,
) -> Result<(), client::error::Error> where
Block: BlockT<Hash=H256>,
{
let f_num = f_header.number().clone();

if f_num.as_() > self.storage.state_db.best_canonical() {
let parent_hash = f_header.parent_hash().clone();

let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone());
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);

let commit = self.storage.state_db.canonicalize_block(&f_hash);
apply_state_commit(transaction, commit);

// read config from genesis, since it is readonly atm
use client::backend::Backend;
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v));
self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num);
}

Ok(())
}
}

fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet<H256>) {
for (key, val) in commit.data.inserted.into_iter() {
transaction.put(columns::STATE, &key[..], &val);
}
for key in commit.data.deleted.into_iter() {
transaction.delete(columns::STATE, &key[..]);
}
for (key, val) in commit.meta.inserted.into_iter() {
transaction.put(columns::STATE_META, &key[..], &val);
}
for key in commit.meta.deleted.into_iter() {
transaction.delete(columns::STATE_META, &key[..]);
}
}

impl<Block> client::backend::AuxStore for Backend<Block> where Block: BlockT<Hash=H256> {
fn insert_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
D: IntoIterator<Item=&'a &'b [u8]>,
>(&self, insert: I, delete: D) -> client::error::Result<()> {
let mut transaction = DBTransaction::new();
for (k, v) in insert {
transaction.put(columns::AUX, k, v);
}
for k in delete {
transaction.delete(columns::AUX, k);
}
self.storage.db.write(transaction).map_err(db_err)?;
Ok(())
}

fn get_aux(&self, key: &[u8]) -> Result<Option<Vec<u8>>, client::error::Error> {
Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?)
}
}

impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT<Hash=H256> {
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
type Blockchain = BlockchainDb<Block>;
type State = CachingState<Blake2Hasher, DbState, Block>;
type ChangesTrieStorage = DbChangesTrieStorage<Block>;

fn begin_operation(&self) -> Result<Self::BlockImportOperation, client::error::Error> {
let old_state = self.state_at(BlockId::Hash(Default::default()))?;
Ok(BlockImportOperation {
pending_block: None,
old_state,
db_updates: MemoryDB::default(),
storage_updates: Default::default(),
changes_trie_updates: MemoryDB::default(),
aux_ops: Vec::new(),
finalized_blocks: Vec::new(),
})
}

fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId<Block>) -> Result<(), client::error::Error> {
operation.old_state = self.state_at(block)?;
Ok(())
}

fn commit_operation(&self, mut operation: Self::BlockImportOperation)
fn try_commit_operation(&self, mut operation: BlockImportOperation<Block, Blake2Hasher>)
-> Result<(), client::error::Error>
{
let mut transaction = DBTransaction::new();
operation.apply_aux(&mut transaction);

let mut meta_updates = Vec::new();
if !operation.finalized_blocks.is_empty() {
let mut meta_updates = Vec::new();

let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash;
for (block, justification) in operation.finalized_blocks {
let block_hash = self.blockchain.expect_block_hash_from_id(&block)?;
Expand All @@ -752,10 +655,6 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
)?);
last_finalized_hash = block_hash;
}

for (hash, number, is_best, is_finalized) in meta_updates {
self.blockchain.update_meta(hash, number, is_best, is_finalized);
}
}

if let Some(pending_block) = operation.pending_block {
Expand Down Expand Up @@ -851,23 +750,28 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(&mut transaction, commit);
self.changes_tries_storage.commit(&mut transaction, operation.changes_trie_updates);

let finalized = match pending_block.leaf_state {
NewBlockState::Final => true,
_ => false,
};

let header = &pending_block.header;
let is_best = pending_block.leaf_state.is_best();
let changes_trie_updates = operation.changes_trie_updates;


self.changes_tries_storage.commit(&mut transaction, changes_trie_updates);

if finalized {
// TODO: ensure best chain contains this block.
self.ensure_sequential_finalization(&pending_block.header, None)?;
self.note_finalized(&mut transaction, &pending_block.header, hash)?;
self.ensure_sequential_finalization(header, None)?;
self.note_finalized(&mut transaction, header, hash)?;
} else {
// canonicalize blocks which are old enough, regardless of finality.
self.force_delayed_canonicalize(&mut transaction, hash, *pending_block.header.number())?
self.force_delayed_canonicalize(&mut transaction, hash, *header.number())?
}

let is_best = pending_block.leaf_state.is_best();
debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, is_best);

{
Expand All @@ -886,10 +790,14 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
drop(leaves);
}

for (hash, number, is_best, is_finalized) in meta_updates {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - why do we need to replace one loop (inside if !operation.finalized_blocks.is_empty() {) with two (inside operation.pending_block.is_some() and operation.pending_block.is_none())? Does it fix something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In case case finalized_blocks are specified metadata for these should be set before the pending_block
  2. All metadata should be updated after the transaction is committed.

self.blockchain.update_meta(hash, number, is_best, is_finalized);
}

self.blockchain.update_meta(
hash.clone(),
number.clone(),
pending_block.leaf_state.is_best(),
is_best,
finalized,
);

Expand All @@ -902,25 +810,155 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
Some(number),
|| is_best
);
} else {
// No pending block, just write the transaction and apply meta changes
self.storage.db.write(transaction).map_err(db_err)?;
for (hash, number, is_best, is_finalized) in meta_updates {
self.blockchain.update_meta(hash, number, is_best, is_finalized);
}
}
Ok(())
}


// write stuff to a transaction after a new block is finalized.
// this canonicalizes finalized blocks. Fails if called with a block which
// was not a child of the last finalized block.
fn note_finalized(
&self,
transaction: &mut DBTransaction,
f_header: &Block::Header,
f_hash: Block::Hash,
) -> Result<(), client::error::Error> where
Block: BlockT<Hash=H256>,
{
let f_num = f_header.number().clone();

if f_num.as_() > self.storage.state_db.best_canonical() {
let parent_hash = f_header.parent_hash().clone();

let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone());
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);

let commit = self.storage.state_db.canonicalize_block(&f_hash)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(transaction, commit);

// read config from genesis, since it is readonly atm
use client::backend::Backend;
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v));
self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num);
}

Ok(())
}
}

fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet<H256>) {
for (key, val) in commit.data.inserted.into_iter() {
transaction.put(columns::STATE, &key[..], &val);
}
for key in commit.data.deleted.into_iter() {
transaction.delete(columns::STATE, &key[..]);
}
for (key, val) in commit.meta.inserted.into_iter() {
transaction.put(columns::STATE_META, &key[..], &val);
}
for key in commit.meta.deleted.into_iter() {
transaction.delete(columns::STATE_META, &key[..]);
}
}

impl<Block> client::backend::AuxStore for Backend<Block> where Block: BlockT<Hash=H256> {
fn insert_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
D: IntoIterator<Item=&'a &'b [u8]>,
>(&self, insert: I, delete: D) -> client::error::Result<()> {
let mut transaction = DBTransaction::new();
for (k, v) in insert {
transaction.put(columns::AUX, k, v);
}
for k in delete {
transaction.delete(columns::AUX, k);
}
self.storage.db.write(transaction).map_err(db_err)?;
Ok(())
}

fn get_aux(&self, key: &[u8]) -> Result<Option<Vec<u8>>, client::error::Error> {
Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?)
}
}

impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT<Hash=H256> {
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
type Blockchain = BlockchainDb<Block>;
type State = CachingState<Blake2Hasher, DbState, Block>;
type ChangesTrieStorage = DbChangesTrieStorage<Block>;

fn begin_operation(&self) -> Result<Self::BlockImportOperation, client::error::Error> {
let old_state = self.state_at(BlockId::Hash(Default::default()))?;
Ok(BlockImportOperation {
pending_block: None,
old_state,
db_updates: MemoryDB::default(),
storage_updates: Default::default(),
changes_trie_updates: MemoryDB::default(),
aux_ops: Vec::new(),
finalized_blocks: Vec::new(),
})
}

fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId<Block>) -> Result<(), client::error::Error> {
operation.old_state = self.state_at(block)?;
Ok(())
}

fn commit_operation(&self, operation: Self::BlockImportOperation)
-> Result<(), client::error::Error>
{
match self.try_commit_operation(operation) {
Ok(_) => {
self.storage.state_db.apply_pending();
Ok(())
},
e @ Err(_) => {
self.storage.state_db.revert_pending();
e
}
}
}

fn finalize_block(&self, block: BlockId<Block>, justification: Option<Justification>)
-> Result<(), client::error::Error>
{
let mut transaction = DBTransaction::new();
let hash = self.blockchain.expect_block_hash_from_id(&block)?;
let header = self.blockchain.expect_header(block)?;
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
&mut transaction,
&hash,
&header,
None,
justification,
)?;
self.storage.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, is_best, is_finalized);
let commit = || {
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
&mut transaction,
&hash,
&header,
None,
justification,
)?;
self.storage.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, is_best, is_finalized);
Ok(())
};
match commit() {
Ok(()) => self.storage.state_db.apply_pending(),
e @ Err(_) => {
self.storage.state_db.revert_pending();
return e;
}
}
Ok(())
}

Expand Down
Loading