diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 2e298b6563e7c..af07213bb0abb 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -18,9 +18,9 @@ use std::sync::Arc; use futures::sync::mpsc; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use primitives::{self, block, AuthorityId}; -use primitives::block::Id as BlockId; +use primitives::block::{Id as BlockId, HeaderHash}; use primitives::storage::{StorageKey, StorageData}; use runtime_support::Hashable; use codec::Slicable; @@ -45,6 +45,8 @@ pub struct Client { backend: Arc, executor: E, import_notification_sinks: Mutex>>, + import_lock: Mutex<()>, + importing_block: RwLock>, // holds the block hash currently being imported. TODO: replace this with block queue } /// A source of blockchain evenets. @@ -183,6 +185,8 @@ impl Client where backend, executor, import_notification_sinks: Mutex::new(Vec::new()), + import_lock: Mutex::new(()), + importing_block: RwLock::new(None), }) } @@ -283,13 +287,32 @@ impl Client where header: JustifiedHeader, body: Option, ) -> error::Result { - // TODO: import lock - // TODO: import justification. let (header, justification) = header.into_inner(); match self.backend.blockchain().status(BlockId::Hash(header.parent_hash))? { - blockchain::BlockStatus::InChain => (), + blockchain::BlockStatus::InChain => {}, blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), } + let hash: block::HeaderHash = header.blake2_256().into(); + + let _import_lock = self.import_lock.lock(); + *self.importing_block.write() = Some(hash); + let result = self.execute_and_import_block(origin, hash, header, justification, body); + *self.importing_block.write() = None; + result + } + + fn execute_and_import_block( + &self, + origin: BlockOrigin, + hash: HeaderHash, + header: block::Header, + justification: bft::Justification, + body: Option, + ) -> error::Result { + match self.backend.blockchain().status(BlockId::Hash(hash))? { + blockchain::BlockStatus::InChain => return Ok(ImportResult::AlreadyInChain), + blockchain::BlockStatus::Unknown => {}, + } let mut transaction = self.backend.begin_operation(BlockId::Hash(header.parent_hash))?; let storage_update = match transaction.state()? { @@ -308,14 +331,12 @@ impl Client where }; let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1; - let hash: block::HeaderHash = header.blake2_256().into(); trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number, is_new_best, origin); transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?; if let Some(storage_update) = storage_update { transaction.update_storage(storage_update)?; } self.backend.commit_operation(transaction)?; - if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast { let notification = BlockImportNotification { hash: hash, @@ -325,7 +346,6 @@ impl Client where }; self.import_notification_sinks.lock().retain(|sink| !sink.unbounded_send(notification.clone()).is_err()); } - Ok(ImportResult::Queued) } @@ -342,6 +362,11 @@ impl Client where /// Get block status. pub fn block_status(&self, id: &BlockId) -> error::Result { // TODO: more efficient implementation + if let BlockId::Hash(ref h) = id { + if self.importing_block.read().as_ref().map_or(false, |importing| h == importing) { + return Ok(BlockStatus::Queued); + } + } match self.backend.blockchain().header(*id).map_err(|e| error::Error::from_blockchain(Box::new(e)))?.is_some() { true => Ok(BlockStatus::InChain), false => Ok(BlockStatus::Unknown), diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/consensus.rs index 5c4689384deb3..1d170ba9cb39f 100644 --- a/substrate/network/src/consensus.rs +++ b/substrate/network/src/consensus.rs @@ -22,12 +22,12 @@ use std::time::{Instant, Duration}; use io::SyncIo; use protocol::Protocol; use network::PeerId; -use primitives::{Hash, block::HeaderHash, block::Id as BlockId, block::Header}; +use primitives::{Hash, block::Id as BlockId, block::Header}; use message::{self, Message}; use runtime_support::Hashable; // TODO: Add additional spam/DoS attack protection. -const MESSAGE_LIFETIME_SECONDS: u64 = 600; +const MESSAGE_LIFETIME: Duration = Duration::from_secs(600); struct CandidateRequest { id: message::RequestId, @@ -48,12 +48,11 @@ pub struct Consensus { bft_message_sink: Option<(mpsc::UnboundedSender, Hash)>, messages: Vec<(Hash, Instant, message::Message)>, message_hashes: HashSet, - last_block_hash: HeaderHash, } impl Consensus { /// Create a new instance. - pub fn new(best_hash: HeaderHash) -> Consensus { + pub fn new() -> Consensus { Consensus { peers: HashMap::new(), our_candidate: None, @@ -61,7 +60,6 @@ impl Consensus { bft_message_sink: None, messages: Default::default(), message_hashes: Default::default(), - last_block_hash: best_hash, } } @@ -283,37 +281,25 @@ impl Consensus { self.peers.remove(&peer_id); } - pub fn collect_garbage(&mut self, best_hash_and_header: Option<(HeaderHash, &Header)>) { + pub fn collect_garbage(&mut self, best_header: Option<&Header>) { let hashes = &mut self.message_hashes; - let last_block_hash = &mut self.last_block_hash; let before = self.messages.len(); - let (best_hash, best_header) = best_hash_and_header.map(|(h, header)| (Some(h), Some(header))).unwrap_or((None, None)); - if best_header.as_ref().map_or(false, |header| header.parent_hash != *last_block_hash) { - trace!(target:"sync", "Clearing conensus message cache"); - self.messages.clear(); - hashes.clear(); - } else { - let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS); - let now = Instant::now(); - if let Some(hash) = best_hash { - *last_block_hash = hash; - } - self.messages.retain(|&(ref hash, timestamp, ref message)| { - timestamp < now + expiration || - best_header.map_or(true, |header| { - if match *message { - Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash, - Message::Statement(ref msg) => msg.parent_hash != header.parent_hash, - _ => true, - } { - hashes.remove(hash); - true - } else { - false - } + let now = Instant::now(); + self.messages.retain(|&(ref hash, timestamp, ref message)| { + if timestamp >= now - MESSAGE_LIFETIME && + best_header.map_or(true, |header| + match *message { + Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash, + Message::Statement(ref msg) => msg.parent_hash != header.parent_hash, + _ => true, }) - }); - } + { + true + } else { + hashes.remove(hash); + false + } + }); if self.messages.len() != before { trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len()); } @@ -322,3 +308,68 @@ impl Consensus { } } } + +#[cfg(test)] +mod tests { + use primitives::Hash; + use primitives::bft::Justification; + use primitives::block::{HeaderHash, Header}; + use std::time::Instant; + use message::{self, Message}; + use super::{Consensus, MESSAGE_LIFETIME}; + + #[test] + fn collects_garbage() { + let prev_hash = HeaderHash::random(); + let best_hash = HeaderHash::random(); + let mut consensus = Consensus::new(); + let now = Instant::now(); + let m1_hash = Hash::random(); + let m2_hash = Hash::random(); + let m1 = Message::BftMessage(message::LocalizedBftMessage { + parent_hash: prev_hash, + message: message::BftMessage::Auxiliary(Justification { + round_number: 0, + hash: Default::default(), + signatures: Default::default(), + }), + }); + let m2 = Message::BftMessage(message::LocalizedBftMessage { + parent_hash: best_hash, + message: message::BftMessage::Auxiliary(Justification { + round_number: 0, + hash: Default::default(), + signatures: Default::default(), + }), + }); + consensus.messages.push((m1_hash, now, m1)); + consensus.messages.push((m2_hash, now, m2.clone())); + consensus.message_hashes.insert(m1_hash); + consensus.message_hashes.insert(m2_hash); + + // nothing to collect + consensus.collect_garbage(None); + assert_eq!(consensus.messages.len(), 2); + assert_eq!(consensus.message_hashes.len(), 2); + + // random header, nothing should be cleared + let mut header = Header::from_block_number(0); + consensus.collect_garbage(Some(&header)); + assert_eq!(consensus.messages.len(), 2); + assert_eq!(consensus.message_hashes.len(), 2); + + // header that matches one of the messages + header.parent_hash = prev_hash; + consensus.collect_garbage(Some(&header)); + assert_eq!(consensus.messages.len(), 1); + assert_eq!(consensus.message_hashes.len(), 1); + assert!(consensus.message_hashes.contains(&m2_hash)); + + // make timestamp expired + consensus.messages.clear(); + consensus.messages.push((m2_hash, now - MESSAGE_LIFETIME, m2)); + consensus.collect_garbage(None); + assert!(consensus.messages.is_empty()); + assert!(consensus.message_hashes.is_empty()); + } +} diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 8e9b3ace68902..52e9e554842a9 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -116,7 +116,6 @@ impl Protocol { /// Create a new instance. pub fn new(config: ProtocolConfig, chain: Arc, on_demand: Option>, transaction_pool: Arc) -> error::Result { let info = chain.info()?; - let best_hash = info.chain.best_hash; let sync = ChainSync::new(config.roles, &info); let protocol = Protocol { config: config, @@ -124,7 +123,7 @@ impl Protocol { on_demand: on_demand, genesis_hash: info.chain.genesis_hash, sync: RwLock::new(sync), - consensus: Mutex::new(Consensus::new(best_hash)), + consensus: Mutex::new(Consensus::new()), peers: RwLock::new(HashMap::new()), handshaking_peers: RwLock::new(HashMap::new()), transaction_pool: transaction_pool, @@ -528,7 +527,7 @@ impl Protocol { } } - self.consensus.lock().collect_garbage(Some((hash, &header))); + self.consensus.lock().collect_garbage(Some(&header)); } fn on_remote_call_request(&self, io: &mut SyncIo, peer_id: PeerId, request: message::RemoteCallRequest) {