Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fixed garbage collection logic
  • Loading branch information
arkpar committed May 31, 2018
commit 1487087ab088414b16e66acfa838eed3a438111a
111 changes: 78 additions & 33 deletions substrate/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use std::time::{Instant, Duration};
use io::SyncIo;
use protocol::Protocol;
use network::PeerId;
use primitives::{Hash, block::HeaderHash, block::Id as BlockId, block::Header};
use primitives::{Hash, block::Id as BlockId, block::Header};
use message::{self, Message};
use runtime_support::Hashable;

// TODO: Add additional spam/DoS attack protection.
const MESSAGE_LIFETIME_SECONDS: u64 = 600;
const MESSAGE_LIFETIME: Duration = Duration::from_secs(600);

struct CandidateRequest {
id: message::RequestId,
Expand All @@ -48,20 +48,18 @@ pub struct Consensus {
bft_message_sink: Option<(mpsc::UnboundedSender<message::LocalizedBftMessage>, Hash)>,
messages: Vec<(Hash, Instant, message::Message)>,
message_hashes: HashSet<Hash>,
last_block_hash: HeaderHash,
}

impl Consensus {
/// Create a new instance.
pub fn new(best_hash: HeaderHash) -> Consensus {
pub fn new() -> Consensus {
Consensus {
peers: HashMap::new(),
our_candidate: None,
statement_sink: None,
bft_message_sink: None,
messages: Default::default(),
message_hashes: Default::default(),
last_block_hash: best_hash,
}
}

Expand Down Expand Up @@ -283,37 +281,25 @@ impl Consensus {
self.peers.remove(&peer_id);
}

pub fn collect_garbage(&mut self, best_hash_and_header: Option<(HeaderHash, &Header)>) {
pub fn collect_garbage(&mut self, best_header: Option<&Header>) {
let hashes = &mut self.message_hashes;
let last_block_hash = &mut self.last_block_hash;
let before = self.messages.len();
let (best_hash, best_header) = best_hash_and_header.map(|(h, header)| (Some(h), Some(header))).unwrap_or((None, None));
if best_header.as_ref().map_or(false, |header| header.parent_hash != *last_block_hash) {
trace!(target:"sync", "Clearing conensus message cache");
self.messages.clear();
hashes.clear();
} else {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS);
let now = Instant::now();
if let Some(hash) = best_hash {
*last_block_hash = hash;
}
self.messages.retain(|&(ref hash, timestamp, ref message)| {
timestamp < now + expiration ||
best_header.map_or(true, |header| {
if match *message {
Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash,
Message::Statement(ref msg) => msg.parent_hash != header.parent_hash,
_ => true,
} {
hashes.remove(hash);
true
} else {
false
}
let now = Instant::now();
self.messages.retain(|&(ref hash, timestamp, ref message)| {
if timestamp >= now - MESSAGE_LIFETIME &&
best_header.map_or(true, |header|
match *message {
Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash,
Message::Statement(ref msg) => msg.parent_hash != header.parent_hash,
_ => true,
})
});
}
{
true
} else {
hashes.remove(hash);
false
}
});
if self.messages.len() != before {
trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len());
}
Expand All @@ -322,3 +308,62 @@ impl Consensus {
}
}
}


#[test]
fn collects_garbage() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we place this in a submodule?

use primitives::bft::Justification;
use primitives::block::HeaderHash;

let prev_hash = HeaderHash::random();
let best_hash = HeaderHash::random();
let mut consensus = Consensus::new();
let now = Instant::now();
let m1_hash = Hash::random();
let m2_hash = Hash::random();
let m1 = Message::BftMessage(message::LocalizedBftMessage {
parent_hash: prev_hash,
message: message::BftMessage::Auxiliary(Justification {
round_number: 0,
hash: Default::default(),
signatures: Default::default(),
}),
});
let m2 = Message::BftMessage(message::LocalizedBftMessage {
parent_hash: best_hash,
message: message::BftMessage::Auxiliary(Justification {
round_number: 0,
hash: Default::default(),
signatures: Default::default(),
}),
});
consensus.messages.push((m1_hash, now, m1));
consensus.messages.push((m2_hash, now, m2.clone()));
consensus.message_hashes.insert(m1_hash);
consensus.message_hashes.insert(m2_hash);

// nothing to collect
consensus.collect_garbage(None);
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);

// random header, nothing should be cleared
let mut header = Header::from_block_number(0);
consensus.collect_garbage(Some(&header));
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);

// header that matches one of the messages
header.parent_hash = prev_hash;
consensus.collect_garbage(Some(&header));
assert_eq!(consensus.messages.len(), 1);
assert_eq!(consensus.message_hashes.len(), 1);
assert!(consensus.message_hashes.contains(&m2_hash));

// make timestamp expired
consensus.messages.clear();
consensus.messages.push((m2_hash, now - MESSAGE_LIFETIME, m2));
consensus.collect_garbage(None);
assert!(consensus.messages.is_empty());
assert!(consensus.message_hashes.is_empty());
}
5 changes: 2 additions & 3 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,14 @@ impl Protocol {
/// Create a new instance.
pub fn new(config: ProtocolConfig, chain: Arc<Client>, on_demand: Option<Arc<OnDemandService>>, transaction_pool: Arc<TransactionPool>) -> error::Result<Protocol> {
let info = chain.info()?;
let best_hash = info.chain.best_hash;
let sync = ChainSync::new(config.roles, &info);
let protocol = Protocol {
config: config,
chain: chain,
on_demand: on_demand,
genesis_hash: info.chain.genesis_hash,
sync: RwLock::new(sync),
consensus: Mutex::new(Consensus::new(best_hash)),
consensus: Mutex::new(Consensus::new()),
peers: RwLock::new(HashMap::new()),
handshaking_peers: RwLock::new(HashMap::new()),
transaction_pool: transaction_pool,
Expand Down Expand Up @@ -528,7 +527,7 @@ impl Protocol {
}
}

self.consensus.lock().collect_garbage(Some((hash, &header)));
self.consensus.lock().collect_garbage(Some(&header));
}

fn on_remote_call_request(&self, io: &mut SyncIo, peer_id: PeerId, request: message::RemoteCallRequest) {
Expand Down