Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit a08c79b

Browse files
committed
Transactional updates in the state-db
1 parent 4fa9fb6 commit a08c79b

File tree

7 files changed

+436
-212
lines changed

7 files changed

+436
-212
lines changed

core/client/db/src/lib.rs

Lines changed: 158 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -625,119 +625,22 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
625625
};
626626

627627
trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash);
628-
let commit = self.storage.state_db.canonicalize_block(&hash);
628+
let commit = self.storage.state_db.canonicalize_block(&hash)
629+
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
629630
apply_state_commit(transaction, commit);
630631
};
631632

632633
Ok(())
633634
}
634635

635-
// write stuff to a transaction after a new block is finalized.
636-
// this canonicalizes finalized blocks. Fails if called with a block which
637-
// was not a child of the last finalized block.
638-
fn note_finalized(
639-
&self,
640-
transaction: &mut DBTransaction,
641-
f_header: &Block::Header,
642-
f_hash: Block::Hash,
643-
) -> Result<(), client::error::Error> where
644-
Block: BlockT<Hash=H256>,
645-
{
646-
let f_num = f_header.number().clone();
647-
648-
if f_num.as_() > self.storage.state_db.best_canonical() {
649-
let parent_hash = f_header.parent_hash().clone();
650-
651-
let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone());
652-
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);
653-
654-
let commit = self.storage.state_db.canonicalize_block(&f_hash);
655-
apply_state_commit(transaction, commit);
656-
657-
// read config from genesis, since it is readonly atm
658-
use client::backend::Backend;
659-
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
660-
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
661-
.and_then(|v| Decode::decode(&mut &*v));
662-
self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num);
663-
}
664-
665-
Ok(())
666-
}
667-
}
668-
669-
fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet<H256>) {
670-
for (key, val) in commit.data.inserted.into_iter() {
671-
transaction.put(columns::STATE, &key[..], &val);
672-
}
673-
for key in commit.data.deleted.into_iter() {
674-
transaction.delete(columns::STATE, &key[..]);
675-
}
676-
for (key, val) in commit.meta.inserted.into_iter() {
677-
transaction.put(columns::STATE_META, &key[..], &val);
678-
}
679-
for key in commit.meta.deleted.into_iter() {
680-
transaction.delete(columns::STATE_META, &key[..]);
681-
}
682-
}
683-
684-
impl<Block> client::backend::AuxStore for Backend<Block> where Block: BlockT<Hash=H256> {
685-
fn insert_aux<
686-
'a,
687-
'b: 'a,
688-
'c: 'a,
689-
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
690-
D: IntoIterator<Item=&'a &'b [u8]>,
691-
>(&self, insert: I, delete: D) -> client::error::Result<()> {
692-
let mut transaction = DBTransaction::new();
693-
for (k, v) in insert {
694-
transaction.put(columns::AUX, k, v);
695-
}
696-
for k in delete {
697-
transaction.delete(columns::AUX, k);
698-
}
699-
self.storage.db.write(transaction).map_err(db_err)?;
700-
Ok(())
701-
}
702-
703-
fn get_aux(&self, key: &[u8]) -> Result<Option<Vec<u8>>, client::error::Error> {
704-
Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?)
705-
}
706-
}
707-
708-
impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT<Hash=H256> {
709-
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
710-
type Blockchain = BlockchainDb<Block>;
711-
type State = CachingState<Blake2Hasher, DbState, Block>;
712-
type ChangesTrieStorage = DbChangesTrieStorage<Block>;
713-
714-
fn begin_operation(&self) -> Result<Self::BlockImportOperation, client::error::Error> {
715-
let old_state = self.state_at(BlockId::Hash(Default::default()))?;
716-
Ok(BlockImportOperation {
717-
pending_block: None,
718-
old_state,
719-
db_updates: MemoryDB::default(),
720-
storage_updates: Default::default(),
721-
changes_trie_updates: MemoryDB::default(),
722-
aux_ops: Vec::new(),
723-
finalized_blocks: Vec::new(),
724-
})
725-
}
726-
727-
fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId<Block>) -> Result<(), client::error::Error> {
728-
operation.old_state = self.state_at(block)?;
729-
Ok(())
730-
}
731-
732-
fn commit_operation(&self, mut operation: Self::BlockImportOperation)
636+
fn try_commit_operation(&self, mut operation: BlockImportOperation<Block, Blake2Hasher>)
733637
-> Result<(), client::error::Error>
734638
{
735639
let mut transaction = DBTransaction::new();
736640
operation.apply_aux(&mut transaction);
737641

642+
let mut meta_updates = Vec::new();
738643
if !operation.finalized_blocks.is_empty() {
739-
let mut meta_updates = Vec::new();
740-
741644
let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash;
742645
for (block, justification) in operation.finalized_blocks {
743646
let block_hash = self.blockchain.expect_block_hash_from_id(&block)?;
@@ -752,10 +655,6 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
752655
)?);
753656
last_finalized_hash = block_hash;
754657
}
755-
756-
for (hash, number, is_best, is_finalized) in meta_updates {
757-
self.blockchain.update_meta(hash, number, is_best, is_finalized);
758-
}
759658
}
760659

761660
if let Some(pending_block) = operation.pending_block {
@@ -851,23 +750,28 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
851750
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
852751
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
853752
apply_state_commit(&mut transaction, commit);
854-
self.changes_tries_storage.commit(&mut transaction, operation.changes_trie_updates);
855753

856754
let finalized = match pending_block.leaf_state {
857755
NewBlockState::Final => true,
858756
_ => false,
859757
};
860758

759+
let header = &pending_block.header;
760+
let is_best = pending_block.leaf_state.is_best();
761+
let changes_trie_updates = operation.changes_trie_updates;
762+
763+
764+
self.changes_tries_storage.commit(&mut transaction, changes_trie_updates);
765+
861766
if finalized {
862767
// TODO: ensure best chain contains this block.
863-
self.ensure_sequential_finalization(&pending_block.header, None)?;
864-
self.note_finalized(&mut transaction, &pending_block.header, hash)?;
768+
self.ensure_sequential_finalization(header, None)?;
769+
self.note_finalized(&mut transaction, header, hash)?;
865770
} else {
866771
// canonicalize blocks which are old enough, regardless of finality.
867-
self.force_delayed_canonicalize(&mut transaction, hash, *pending_block.header.number())?
772+
self.force_delayed_canonicalize(&mut transaction, hash, *header.number())?
868773
}
869774

870-
let is_best = pending_block.leaf_state.is_best();
871775
debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, is_best);
872776

873777
{
@@ -886,10 +790,14 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
886790
drop(leaves);
887791
}
888792

793+
for (hash, number, is_best, is_finalized) in meta_updates {
794+
self.blockchain.update_meta(hash, number, is_best, is_finalized);
795+
}
796+
889797
self.blockchain.update_meta(
890798
hash.clone(),
891799
number.clone(),
892-
pending_block.leaf_state.is_best(),
800+
is_best,
893801
finalized,
894802
);
895803

@@ -902,25 +810,155 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
902810
Some(number),
903811
|| is_best
904812
);
813+
} else {
814+
// No pending block, just write the transaction and apply meta changes
815+
self.storage.db.write(transaction).map_err(db_err)?;
816+
for (hash, number, is_best, is_finalized) in meta_updates {
817+
self.blockchain.update_meta(hash, number, is_best, is_finalized);
818+
}
905819
}
906820
Ok(())
907821
}
908822

823+
824+
// write stuff to a transaction after a new block is finalized.
825+
// this canonicalizes finalized blocks. Fails if called with a block which
826+
// was not a child of the last finalized block.
827+
fn note_finalized(
828+
&self,
829+
transaction: &mut DBTransaction,
830+
f_header: &Block::Header,
831+
f_hash: Block::Hash,
832+
) -> Result<(), client::error::Error> where
833+
Block: BlockT<Hash=H256>,
834+
{
835+
let f_num = f_header.number().clone();
836+
837+
if f_num.as_() > self.storage.state_db.best_canonical() {
838+
let parent_hash = f_header.parent_hash().clone();
839+
840+
let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone());
841+
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);
842+
843+
let commit = self.storage.state_db.canonicalize_block(&f_hash)
844+
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
845+
apply_state_commit(transaction, commit);
846+
847+
// read config from genesis, since it is readonly atm
848+
use client::backend::Backend;
849+
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
850+
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
851+
.and_then(|v| Decode::decode(&mut &*v));
852+
self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num);
853+
}
854+
855+
Ok(())
856+
}
857+
}
858+
859+
fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet<H256>) {
860+
for (key, val) in commit.data.inserted.into_iter() {
861+
transaction.put(columns::STATE, &key[..], &val);
862+
}
863+
for key in commit.data.deleted.into_iter() {
864+
transaction.delete(columns::STATE, &key[..]);
865+
}
866+
for (key, val) in commit.meta.inserted.into_iter() {
867+
transaction.put(columns::STATE_META, &key[..], &val);
868+
}
869+
for key in commit.meta.deleted.into_iter() {
870+
transaction.delete(columns::STATE_META, &key[..]);
871+
}
872+
}
873+
874+
impl<Block> client::backend::AuxStore for Backend<Block> where Block: BlockT<Hash=H256> {
875+
fn insert_aux<
876+
'a,
877+
'b: 'a,
878+
'c: 'a,
879+
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
880+
D: IntoIterator<Item=&'a &'b [u8]>,
881+
>(&self, insert: I, delete: D) -> client::error::Result<()> {
882+
let mut transaction = DBTransaction::new();
883+
for (k, v) in insert {
884+
transaction.put(columns::AUX, k, v);
885+
}
886+
for k in delete {
887+
transaction.delete(columns::AUX, k);
888+
}
889+
self.storage.db.write(transaction).map_err(db_err)?;
890+
Ok(())
891+
}
892+
893+
fn get_aux(&self, key: &[u8]) -> Result<Option<Vec<u8>>, client::error::Error> {
894+
Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?)
895+
}
896+
}
897+
898+
impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT<Hash=H256> {
899+
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
900+
type Blockchain = BlockchainDb<Block>;
901+
type State = CachingState<Blake2Hasher, DbState, Block>;
902+
type ChangesTrieStorage = DbChangesTrieStorage<Block>;
903+
904+
fn begin_operation(&self) -> Result<Self::BlockImportOperation, client::error::Error> {
905+
let old_state = self.state_at(BlockId::Hash(Default::default()))?;
906+
Ok(BlockImportOperation {
907+
pending_block: None,
908+
old_state,
909+
db_updates: MemoryDB::default(),
910+
storage_updates: Default::default(),
911+
changes_trie_updates: MemoryDB::default(),
912+
aux_ops: Vec::new(),
913+
finalized_blocks: Vec::new(),
914+
})
915+
}
916+
917+
fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId<Block>) -> Result<(), client::error::Error> {
918+
operation.old_state = self.state_at(block)?;
919+
Ok(())
920+
}
921+
922+
fn commit_operation(&self, operation: Self::BlockImportOperation)
923+
-> Result<(), client::error::Error>
924+
{
925+
match self.try_commit_operation(operation) {
926+
Ok(_) => {
927+
self.storage.state_db.apply_pending();
928+
Ok(())
929+
},
930+
e @ Err(_) => {
931+
self.storage.state_db.revert_pending();
932+
e
933+
}
934+
}
935+
}
936+
909937
fn finalize_block(&self, block: BlockId<Block>, justification: Option<Justification>)
910938
-> Result<(), client::error::Error>
911939
{
912940
let mut transaction = DBTransaction::new();
913941
let hash = self.blockchain.expect_block_hash_from_id(&block)?;
914942
let header = self.blockchain.expect_header(block)?;
915-
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
916-
&mut transaction,
917-
&hash,
918-
&header,
919-
None,
920-
justification,
921-
)?;
922-
self.storage.db.write(transaction).map_err(db_err)?;
923-
self.blockchain.update_meta(hash, number, is_best, is_finalized);
943+
let commit = || {
944+
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
945+
&mut transaction,
946+
&hash,
947+
&header,
948+
None,
949+
justification,
950+
)?;
951+
self.storage.db.write(transaction).map_err(db_err)?;
952+
self.blockchain.update_meta(hash, number, is_best, is_finalized);
953+
Ok(())
954+
};
955+
match commit() {
956+
Ok(()) => self.storage.state_db.apply_pending(),
957+
e @ Err(_) => {
958+
self.storage.state_db.revert_pending();
959+
return e;
960+
}
961+
}
924962
Ok(())
925963
}
926964

0 commit comments

Comments
 (0)