Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
store trie nodes in kvdb
  • Loading branch information
rphmeier committed May 11, 2018
commit 17bb75f43e7e2c89b6b23feb8a667d7fd58d7842
161 changes: 98 additions & 63 deletions substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Client backend that uses RocksDB database as storage. State is still kept in memory.

extern crate substrate_client as client;
extern crate ethereum_types;
extern crate kvdb_rocksdb;
extern crate kvdb;
extern crate hashdb;
Expand Down Expand Up @@ -52,8 +53,6 @@ use runtime_support::Hashable;
use state_machine::backend::Backend as StateBackend;
use state_machine::CodeExecutor;

const STATE_HISTORY: block::Number = 64;

/// Database settings.
pub struct DatabaseSettings {
/// Cache size in bytes. If `None` default is used.
Expand Down Expand Up @@ -280,13 +279,15 @@ impl client::backend::BlockImportOperation for BlockImportOperation {
Ok(())
}

fn set_storage<I: Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>>(&mut self, changes: I) -> Result<(), client::error::Error> {
self.pending_state.commit(changes);
fn update_storage(&mut self, update: MemoryDB) -> Result<(), client::error::Error> {
self.pending_state.commit(update);
Ok(())
}

fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> Result<(), client::error::Error> {
self.pending_state.commit(iter.into_iter().map(|(k, v)| (k, Some(v))));
// TODO: wipe out existing trie.
let (_, update) = self.pending_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v))));
self.pending_state.commit(update);
Ok(())
}
}
Expand All @@ -296,8 +297,8 @@ struct Ephemeral<'a> {
overlay: &'a mut MemoryDB,
}

impl HashDB for Ephemeral {
fn keys(&self) -> HashMap<H256, i32> {
impl<'a> HashDB for Ephemeral<'a> {
fn keys(&self) -> HashMap<TrieH256, i32> {
self.overlay.keys() // TODO: iterate backing
}

Expand Down Expand Up @@ -339,13 +340,16 @@ impl HashDB for Ephemeral {
}
}

/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub struct DbState {
db: Arc<KeyValueDB>,
root: TrieH256,
updates: MemoryDB,
}

impl state_machine::Backend for DbState {
type Error = TrieError;
type Error = client::error::Error;
type Transaction = MemoryDB;

fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
let mut read_overlay = MemoryDB::default();
Expand All @@ -354,31 +358,14 @@ impl state_machine::Backend for DbState {
overlay: &mut read_overlay,
};

TrieDB::new(&eph, &self.root)?.get(key).map(|x| x.map(|val| val.to_vec()))
}

fn commit<I>(&mut self, changes: I)
where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>
{
let mut write_overlay = MemoryDB::default();
let eph = Ephemeral {
backing: &*self.db,
overlay: &mut write_overlay,
};

let mut trie = TrieDBMut::from_existing(&mut eph, &mut self.root).expect("prior state root to exist"); // TODO: handle gracefully
for (key, change) in changes {
let result = match change {
Some(val) => trie.insert(&key, &val),
None => trie.remove(&key),
};
let map_e = |e: Box<TrieError>| ::client::error::Error::from(format!("Trie lookup error: {}", e));

if let Err(e) = result {
warn!("Failed to write to trie: {}", e);
}
}
TrieDB::new(&eph, &self.root).map_err(map_e)?
.get(key).map(|x| x.map(|val| val.to_vec())).map_err(map_e)
}

trie.commit();
fn commit(&mut self, transaction: MemoryDB) {
self.updates = transaction;
}

fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
Expand All @@ -388,21 +375,58 @@ impl state_machine::Backend for DbState {
overlay: &mut read_overlay,
};

TrieDB::new(&eph, &self.root)?.get(key).map(|x| x.map(|val| val.to_vec()))
let collect_all = || -> Result<_, Box<TrieError>> {
let trie = TrieDB::new(&eph, &self.root)?;
let mut v = Vec::new();
for x in trie.iter()? {
let (key, value) = x?;
v.push((key.to_vec(), value.to_vec()));
}

Ok(v)
};

match collect_all() {
Ok(v) => v,
Err(e) => {
debug!("Error extracting trie values: {}", e);
Vec::new()
}
}
}

fn storage_root<I>(&self, delta: I) -> [u8; 32]
fn storage_root<I>(&self, delta: I) -> ([u8; 32], MemoryDB)
where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>
{
self.mem.storage_root(delta)
let mut write_overlay = MemoryDB::default();
let mut root = self.root;
{
let mut eph = Ephemeral {
backing: &*self.db,
overlay: &mut write_overlay,
};

let mut trie = TrieDBMut::from_existing(&mut eph, &mut root).expect("prior state root to exist"); // TODO: handle gracefully
for (key, change) in delta {
let result = match change {
Some(val) => trie.insert(&key, &val),
None => trie.remove(&key), // TODO: archive mode
};

if let Err(e) = result {
warn!("Failed to write to trie: {}", e);
}
}
}

(root.0.into(), write_overlay)
}
}

/// In-memory backend. Keeps all states and blocks in memory. Useful for testing.
pub struct Backend {
db: Arc<KeyValueDB>,
blockchain: BlockchainDb,
old_states: RwLock<HashMap<BlockKey, state_machine::backend::InMemory>>,
}

impl Backend {
Expand All @@ -427,20 +451,9 @@ impl Backend {
fn from_kvdb(db: Arc<KeyValueDB>) -> Result<Backend, client::error::Error> {
let blockchain = BlockchainDb::new(db.clone())?;

//load latest state
let mut state = state_machine::backend::InMemory::new();
let mut old_states = HashMap::new();

{
let iter = db.iter(columns::STATE).map(|(k, v)| (k.to_vec(), Some(v.to_vec())));
state.commit(iter);
old_states.insert(number_to_db_key(blockchain.meta.read().best_number), state);
}

Ok(Backend {
db,
blockchain,
old_states: RwLock::new(old_states)
})
}
}
Expand All @@ -458,7 +471,7 @@ impl client::backend::Backend for Backend {
})
}

fn commit_operation(&self, operation: Self::BlockImportOperation) -> Result<(), client::error::Error> {
fn commit_operation(&self, mut operation: Self::BlockImportOperation) -> Result<(), client::error::Error> {
let mut transaction = DBTransaction::new();
if let Some(pending_block) = operation.pending_block {
let hash: block::HeaderHash = pending_block.header.blake2_256().into();
Expand All @@ -475,17 +488,14 @@ impl client::backend::Backend for Backend {
if pending_block.is_best {
transaction.put(columns::META, meta::BEST_BLOCK, &key);
}
for (key, val) in operation.pending_state.changes.into_iter() {
match val {
Some(v) => { transaction.put(columns::STATE, &key, &v); },
None => { transaction.delete(columns::STATE, &key); },
for (key, (val, rc)) in operation.pending_state.updates.drain() {
if rc > 0 {
transaction.put(columns::STATE, &key.0[..], &val);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray newline

} else {
transaction.delete(columns::STATE, &key.0[..]);
}
}
let mut states = self.old_states.write();
states.insert(key, operation.pending_state.mem);
if number >= STATE_HISTORY {
states.remove(&number_to_db_key(number - STATE_HISTORY));
}
debug!("DB Commit {:?} ({})", hash, number);
self.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, pending_block.is_best);
Expand All @@ -498,11 +508,31 @@ impl client::backend::Backend for Backend {
}

fn state_at(&self, block: BlockId) -> Result<Self::State, client::error::Error> {
if let Some(state) = self.blockchain.id(block)?.and_then(|id| self.old_states.read().get(&id).cloned()) {
Ok(DbState { mem: state, changes: Vec::new() })
} else {
Err(client::error::ErrorKind::UnknownBlock(block).into())
use client::blockchain::Backend as BcBackend;

// special case for genesis initialization
match block {
BlockId::Hash(h) if h == Default::default() => {
let mut root = TrieH256::default();
let mut db = MemoryDB::default();
TrieDBMut::new(&mut db, &mut root);

return Ok(DbState {
db: self.db.clone(),
updates: Default::default(),
root,
})
}
_ => {}
}

self.blockchain.header(block).and_then(|maybe_hdr| maybe_hdr.map(|hdr| {
DbState {
db: self.db.clone(),
updates: Default::default(),
root: hdr.state_root.0.into(),
}
}).ok_or_else(|| client::error::ErrorKind::UnknownBlock(block).into()))
}
}

Expand All @@ -516,11 +546,17 @@ mod tests {
#[test]
fn block_hash_inserted_correctly() {
let db = Backend::new_test();
for i in 1..10 {
for i in 0..10 {
assert!(db.blockchain().hash(i).unwrap().is_none());

{
let mut op = db.begin_operation(BlockId::Number(i - 1)).unwrap();
let id = if i == 0 {
BlockId::Hash(Default::default())
} else {
BlockId::Number(i - 1)
};

let mut op = db.begin_operation(id).unwrap();
let header = block::Header {
number: i,
parent_hash: Default::default(),
Expand All @@ -535,7 +571,6 @@ mod tests {
None,
true,
).unwrap();
op.set_storage(vec![].into_iter()).unwrap();
db.commit_operation(op).unwrap();
}

Expand Down
4 changes: 2 additions & 2 deletions substrate/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

//! Polkadot Client data backend

use state_machine;
use state_machine::backend::Backend as StateBackend;
use error;
use primitives::block::{self, Id as BlockId};
Expand All @@ -32,7 +31,7 @@ pub trait BlockImportOperation {
/// Append block data to the transaction.
fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_new_best: bool) -> error::Result<()>;
/// Inject storage data into the database.
fn storage_update(&mut self, update: <Self::State as StateBackend>::Transaction) -> error::Result<()>;
fn update_storage(&mut self, update: <Self::State as StateBackend>::Transaction) -> error::Result<()>;
/// Inject storage data into the database replacing any existing data.
fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> error::Result<()>;
}
Expand All @@ -47,6 +46,7 @@ pub trait Backend {
type State: StateBackend;

/// Begin a new block insertion transaction with given parent block id.
/// When constructing the genesis, this is called with all-zero hash.
fn begin_operation(&self, block: BlockId) -> error::Result<Self::BlockImportOperation>;
/// Commit block insertion.
fn commit_operation(&self, transaction: Self::BlockImportOperation) -> error::Result<()>;
Expand Down
4 changes: 2 additions & 2 deletions substrate/client/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<B, E> BlockBuilder<B, E> where
/// can be validly executed (by executing it); if it is invalid, it'll be returned along with
/// the error. Otherwise, it will return a mutable reference to self (in order to chain).
pub fn push(&mut self, tx: Extrinsic) -> error::Result<()> {
let output = state_machine::execute(&self.state, &mut self.changes, &self.executor, "execute_transaction",
let (output, _) = state_machine::execute(&self.state, &mut self.changes, &self.executor, "execute_transaction",
&vec![].and(&self.header).and(&tx))?;
self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid");
self.transactions.push(tx);
Expand All @@ -79,7 +79,7 @@ impl<B, E> BlockBuilder<B, E> where
/// Consume the builder to return a valid `Block` containing all pushed transactions.
pub fn bake(mut self) -> error::Result<Block> {
self.header.extrinsics_root = ordered_trie_root(self.transactions.iter().map(Slicable::encode)).0.into();
let output = state_machine::execute(&self.state, &mut self.changes, &self.executor, "finalise_block",
let (output, _) = state_machine::execute(&self.state, &mut self.changes, &self.executor, "finalise_block",
&self.header.encode())?;
self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid");
Ok(Block {
Expand Down
4 changes: 2 additions & 2 deletions substrate/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl<B, E> Client<B, E> where
/// No changes are made.
pub fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result<CallResult> {
let mut changes = OverlayedChanges::default();
let return_data = state_machine::execute(
let (return_data, _) = state_machine::execute(
&self.state_at(id)?,
&mut changes,
&self.executor,
Expand All @@ -253,7 +253,7 @@ impl<B, E> Client<B, E> where
overlay: &mut OverlayedChanges,
f: F
) -> error::Result<T> {
Ok(runtime_io::with_externalities(&mut Ext { backend: &self.state_at(id)?, overlay }, f))
Ok(runtime_io::with_externalities(&mut Ext::new(overlay, &self.state_at(id)?), f))
}

/// Create a new block, built on the head of the chain.
Expand Down
4 changes: 2 additions & 2 deletions substrate/client/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ impl backend::BlockImportOperation for BlockImportOperation {
Ok(())
}

fn set_storage<I: Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>>(&mut self, changes: I) -> error::Result<()> {
self.pending_state.commit(changes);
fn update_storage(&mut self, update: <Self::State as StateBackend>::Transaction) -> error::Result<()> {
self.pending_state.commit(update);
Ok(())
}

Expand Down