Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions substrate/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

use std::sync::Arc;
use futures::sync::mpsc;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use primitives::{self, block, AuthorityId};
use primitives::block::Id as BlockId;
use primitives::block::{Id as BlockId, HeaderHash};
use primitives::storage::{StorageKey, StorageData};
use runtime_support::Hashable;
use codec::Slicable;
Expand All @@ -45,6 +45,8 @@ pub struct Client<B, E> {
backend: Arc<B>,
executor: E,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification>>>,
import_lock: Mutex<()>,
importing_block: RwLock<Option<HeaderHash>>, // holds the block hash currently being imported. TODO: replace this with block queue
}

/// A source of blockchain evenets.
Expand Down Expand Up @@ -183,6 +185,8 @@ impl<B, E> Client<B, E> where
backend,
executor,
import_notification_sinks: Mutex::new(Vec::new()),
import_lock: Mutex::new(()),
importing_block: RwLock::new(None),
})
}

Expand Down Expand Up @@ -283,13 +287,32 @@ impl<B, E> Client<B, E> where
header: JustifiedHeader,
body: Option<block::Body>,
) -> error::Result<ImportResult> {
// TODO: import lock
// TODO: import justification.
let (header, justification) = header.into_inner();
match self.backend.blockchain().status(BlockId::Hash(header.parent_hash))? {
blockchain::BlockStatus::InChain => (),
blockchain::BlockStatus::InChain => {},
blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
}
let hash: block::HeaderHash = header.blake2_256().into();

let _import_lock = self.import_lock.lock();
*self.importing_block.write() = Some(hash);
let result = self.execute_and_import_block(origin, hash, header, justification, body);
*self.importing_block.write() = None;
result
}

fn execute_and_import_block(
&self,
origin: BlockOrigin,
hash: HeaderHash,
header: block::Header,
justification: bft::Justification,
body: Option<block::Body>,
) -> error::Result<ImportResult> {
match self.backend.blockchain().status(BlockId::Hash(hash))? {
blockchain::BlockStatus::InChain => return Ok(ImportResult::AlreadyInChain),
blockchain::BlockStatus::Unknown => {},
}

let mut transaction = self.backend.begin_operation(BlockId::Hash(header.parent_hash))?;
let storage_update = match transaction.state()? {
Expand All @@ -308,14 +331,12 @@ impl<B, E> Client<B, E> where
};

let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1;
let hash: block::HeaderHash = header.blake2_256().into();
trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number, is_new_best, origin);
transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?;
if let Some(storage_update) = storage_update {
transaction.update_storage(storage_update)?;
}
self.backend.commit_operation(transaction)?;

if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast {
let notification = BlockImportNotification {
hash: hash,
Expand All @@ -325,7 +346,6 @@ impl<B, E> Client<B, E> where
};
self.import_notification_sinks.lock().retain(|sink| !sink.unbounded_send(notification.clone()).is_err());
}

Ok(ImportResult::Queued)
}

Expand All @@ -342,6 +362,11 @@ impl<B, E> Client<B, E> where
/// Get block status.
pub fn block_status(&self, id: &BlockId) -> error::Result<BlockStatus> {
// TODO: more efficient implementation
if let BlockId::Hash(ref h) = id {
if self.importing_block.read().as_ref().map_or(false, |importing| h == importing) {
return Ok(BlockStatus::Queued);
}
}
match self.backend.blockchain().header(*id).map_err(|e| error::Error::from_blockchain(Box::new(e)))?.is_some() {
true => Ok(BlockStatus::InChain),
false => Ok(BlockStatus::Unknown),
Expand Down
111 changes: 78 additions & 33 deletions substrate/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use std::time::{Instant, Duration};
use io::SyncIo;
use protocol::Protocol;
use network::PeerId;
use primitives::{Hash, block::HeaderHash, block::Id as BlockId, block::Header};
use primitives::{Hash, block::Id as BlockId, block::Header};
use message::{self, Message};
use runtime_support::Hashable;

// TODO: Add additional spam/DoS attack protection.
const MESSAGE_LIFETIME_SECONDS: u64 = 600;
const MESSAGE_LIFETIME: Duration = Duration::from_secs(600);

struct CandidateRequest {
id: message::RequestId,
Expand All @@ -48,20 +48,18 @@ pub struct Consensus {
bft_message_sink: Option<(mpsc::UnboundedSender<message::LocalizedBftMessage>, Hash)>,
messages: Vec<(Hash, Instant, message::Message)>,
message_hashes: HashSet<Hash>,
last_block_hash: HeaderHash,
}

impl Consensus {
/// Create a new instance.
pub fn new(best_hash: HeaderHash) -> Consensus {
pub fn new() -> Consensus {
Consensus {
peers: HashMap::new(),
our_candidate: None,
statement_sink: None,
bft_message_sink: None,
messages: Default::default(),
message_hashes: Default::default(),
last_block_hash: best_hash,
}
}

Expand Down Expand Up @@ -283,37 +281,25 @@ impl Consensus {
self.peers.remove(&peer_id);
}

pub fn collect_garbage(&mut self, best_hash_and_header: Option<(HeaderHash, &Header)>) {
pub fn collect_garbage(&mut self, best_header: Option<&Header>) {
let hashes = &mut self.message_hashes;
let last_block_hash = &mut self.last_block_hash;
let before = self.messages.len();
let (best_hash, best_header) = best_hash_and_header.map(|(h, header)| (Some(h), Some(header))).unwrap_or((None, None));
if best_header.as_ref().map_or(false, |header| header.parent_hash != *last_block_hash) {
trace!(target:"sync", "Clearing conensus message cache");
self.messages.clear();
hashes.clear();
} else {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS);
let now = Instant::now();
if let Some(hash) = best_hash {
*last_block_hash = hash;
}
self.messages.retain(|&(ref hash, timestamp, ref message)| {
timestamp < now + expiration ||
best_header.map_or(true, |header| {
if match *message {
Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash,
Message::Statement(ref msg) => msg.parent_hash != header.parent_hash,
_ => true,
} {
hashes.remove(hash);
true
} else {
false
}
let now = Instant::now();
self.messages.retain(|&(ref hash, timestamp, ref message)| {
if timestamp >= now - MESSAGE_LIFETIME &&
best_header.map_or(true, |header|
match *message {
Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash,
Message::Statement(ref msg) => msg.parent_hash != header.parent_hash,
_ => true,
})
});
}
{
true
} else {
hashes.remove(hash);
false
}
});
if self.messages.len() != before {
trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len());
}
Expand All @@ -322,3 +308,62 @@ impl Consensus {
}
}
}


#[test]
fn collects_garbage() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we place this in a submodule?

use primitives::bft::Justification;
use primitives::block::HeaderHash;

let prev_hash = HeaderHash::random();
let best_hash = HeaderHash::random();
let mut consensus = Consensus::new();
let now = Instant::now();
let m1_hash = Hash::random();
let m2_hash = Hash::random();
let m1 = Message::BftMessage(message::LocalizedBftMessage {
parent_hash: prev_hash,
message: message::BftMessage::Auxiliary(Justification {
round_number: 0,
hash: Default::default(),
signatures: Default::default(),
}),
});
let m2 = Message::BftMessage(message::LocalizedBftMessage {
parent_hash: best_hash,
message: message::BftMessage::Auxiliary(Justification {
round_number: 0,
hash: Default::default(),
signatures: Default::default(),
}),
});
consensus.messages.push((m1_hash, now, m1));
consensus.messages.push((m2_hash, now, m2.clone()));
consensus.message_hashes.insert(m1_hash);
consensus.message_hashes.insert(m2_hash);

// nothing to collect
consensus.collect_garbage(None);
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);

// random header, nothing should be cleared
let mut header = Header::from_block_number(0);
consensus.collect_garbage(Some(&header));
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);

// header that matches one of the messages
header.parent_hash = prev_hash;
consensus.collect_garbage(Some(&header));
assert_eq!(consensus.messages.len(), 1);
assert_eq!(consensus.message_hashes.len(), 1);
assert!(consensus.message_hashes.contains(&m2_hash));

// make timestamp expired
consensus.messages.clear();
consensus.messages.push((m2_hash, now - MESSAGE_LIFETIME, m2));
consensus.collect_garbage(None);
assert!(consensus.messages.is_empty());
assert!(consensus.message_hashes.is_empty());
}
5 changes: 2 additions & 3 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,14 @@ impl Protocol {
/// Create a new instance.
pub fn new(config: ProtocolConfig, chain: Arc<Client>, on_demand: Option<Arc<OnDemandService>>, transaction_pool: Arc<TransactionPool>) -> error::Result<Protocol> {
let info = chain.info()?;
let best_hash = info.chain.best_hash;
let sync = ChainSync::new(config.roles, &info);
let protocol = Protocol {
config: config,
chain: chain,
on_demand: on_demand,
genesis_hash: info.chain.genesis_hash,
sync: RwLock::new(sync),
consensus: Mutex::new(Consensus::new(best_hash)),
consensus: Mutex::new(Consensus::new()),
peers: RwLock::new(HashMap::new()),
handshaking_peers: RwLock::new(HashMap::new()),
transaction_pool: transaction_pool,
Expand Down Expand Up @@ -528,7 +527,7 @@ impl Protocol {
}
}

self.consensus.lock().collect_garbage(Some((hash, &header)));
self.consensus.lock().collect_garbage(Some(&header));
}

fn on_remote_call_request(&self, io: &mut SyncIo, peer_id: PeerId, request: message::RemoteCallRequest) {
Expand Down