Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5d7f6d5
set up data types and control flow for statement distribution
rphmeier Jun 27, 2020
28f12c5
add some set-like methods to View
rphmeier Jun 27, 2020
f8992a2
implement sending to peers
rphmeier Jun 27, 2020
72b339b
start fixing equivocation handling
rphmeier Jun 29, 2020
6a16cd7
Add a section to the statement distribution subsystem on equivocation…
rphmeier Jun 30, 2020
ca3b523
fix typo and amend wording
rphmeier Jun 30, 2020
8464686
implement flood protection
rphmeier Jul 1, 2020
cac9adc
have peer knowledge tracker follow when peer first learns about a can…
rphmeier Jul 1, 2020
1be7e37
send dependents after circulating
rphmeier Jul 1, 2020
47f2df8
add another TODO
rphmeier Jul 1, 2020
cd30c21
trigger send in one more place
rphmeier Jul 1, 2020
a39ad10
refactors from review
rphmeier Jul 1, 2020
751122b
send new statements to candidate backing
rphmeier Jul 1, 2020
d413426
instantiate active head data with runtime API values
rphmeier Jul 1, 2020
7b82e1b
track our view changes and peer view changes
rphmeier Jul 1, 2020
94a69ff
apply a benefit to peers who send us statements we want
rphmeier Jul 1, 2020
a3b4b6a
remove unneeded TODO
rphmeier Jul 1, 2020
eb51e80
add some comments and improve Hash implementation
rphmeier Jul 1, 2020
b9876ae
start tests and fix `note_statement`
rphmeier Jul 2, 2020
f605ed2
test active_head seconding logic
rphmeier Jul 2, 2020
5650620
test that the per-peer tracking logic works
rphmeier Jul 2, 2020
dd00108
test per-peer knowledge tracker
rphmeier Jul 2, 2020
c93b545
test that peer view updates lead to messages being sent
rphmeier Jul 2, 2020
19ee505
test statement circulation
rphmeier Jul 2, 2020
9093736
address review comments
rphmeier Jul 2, 2020
6d18b62
have view set methods return references
rphmeier Jul 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
implement sending to peers
  • Loading branch information
rphmeier committed Jun 30, 2020
commit f8992a2327a994ea55229ccefaef5f3fecc6391a
191 changes: 156 additions & 35 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use polkadot_subsystem::messages::{
use node_primitives::{ProtocolId, View, SignedFullStatement};
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{CompactStatement, ValidatorIndex};
use parity_scale_codec::{Encode, Decode};

use futures::prelude::*;

use std::cmp::Ordering;
use std::collections::{HashMap, HashSet, BTreeSet};
use std::collections::{HashMap, HashSet};

const PROTOCOL_V1: ProtocolId = *b"sdn1";

Expand Down Expand Up @@ -70,12 +70,60 @@ struct PeerRelayParentKnowledge {
known_statements: HashSet<(CompactStatement, ValidatorIndex)>,
}

impl PeerRelayParentKnowledge {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint.
///
/// This returns `false` if the peer cannot accept this statement, without altering internal
/// state.
///
/// If the peer can accept the statement, this returns `true` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
fn accept(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool {
if self.known_statements.contains(fingerprint) {
return false;
}

match fingerprint.0 {
CompactStatement::Candidate(ref h) => {
self.known_candidates.insert(h.clone());
},
CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
if self.known_candidates.contains(h) {
return false;
}
}
}

self.known_statements.insert(fingerprint.clone());
true
}
}

struct PeerData {
role: ObservedRole,
view: View,
view_knowledge: HashMap<Hash, PeerRelayParentKnowledge>,
}

impl PeerData {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint.
///
/// This returns `false` if the peer cannot accept this statement, without altering internal
/// state.
///
/// If the peer can accept the statement, this returns `true` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
fn accept(
&mut self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
) -> bool {
self.view_knowledge.get_mut(relay_parent).map_or(false, |k| k.accept(fingerprint))
}
}

// A statement stored while a relay chain head is active.
//
// These are orderable first by (Seconded, Valid, Invalid), then by the underlying hash,
Expand All @@ -92,39 +140,102 @@ impl StoredStatement {
}
}

impl PartialOrd for StoredStatement {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
impl std::borrow::Borrow<CompactStatement> for StoredStatement {
fn borrow(&self) -> &CompactStatement {
&self.compact
}
}

impl Ord for StoredStatement {
fn cmp(&self, other: &Self) -> Ordering {
let to_idx = |x: &CompactStatement| match *x {
CompactStatement::Candidate(_) => 0u8,
CompactStatement::Valid(_) => 1,
CompactStatement::Invalid(_) => 2,
};

match (&self.compact, &other.compact) {
(&CompactStatement::Candidate(ref h), &CompactStatement::Candidate(ref h2)) |
(&CompactStatement::Valid(ref h), &CompactStatement::Valid(ref h2)) |
(&CompactStatement::Invalid(ref h), &CompactStatement::Invalid(ref h2)) => {
h.cmp(h2).then(
self.statement.validator_index().cmp(&other.statement.validator_index())
)
},

(ref a, ref b) => to_idx(a).cmp(&to_idx(b)),
}
impl std::hash::Hash for StoredStatement {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.fingerprint().hash(state)
}
}

struct ActiveHeadData {
// All candidates we are aware of for this head, keyed by hash.
candidates: HashSet<Hash>,
// Stored statements for circulation to peers.
statements: BTreeSet<StoredStatement>,
// Stored seconded statements for circulation to peers.
seconded_statements: HashSet<StoredStatement>,
// Stored other statements for circulation to peers.
other_statements: HashSet<StoredStatement>,
}

impl ActiveHeadData {
/// Note the given statement. If it was not already known, returns `Some`, with a handle to the
/// statement.
fn note_statement(&mut self, statement: SignedFullStatement) -> Option<&StoredStatement> {
let compact = statement.payload().to_compact();
let stored = StoredStatement {
compact: compact.clone(),
statement,
};

match compact {
CompactStatement::Candidate(h) => {
self.candidates.insert(h);
if self.seconded_statements.insert(stored) {
// This will always return `Some` because it was just inserted.
self.seconded_statements.get(&compact)
} else {
None
}
}
CompactStatement::Valid(_) | CompactStatement::Invalid(_) => {
if self.other_statements.insert(stored) {
// This will always return `Some` because it was just inserted.
self.other_statements.get(&compact)
} else {
None
}
}
}
}

/// Get an iterator over all statements for the active head. Seconded statements come first.
fn statements(&self) -> impl Iterator<Item = &'_ StoredStatement> + '_ {
self.seconded_statements.iter().chain(self.other_statements.iter())
}
}

async fn share_message(
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
relay_parent: Hash,
statement: SignedFullStatement,
) -> SubsystemResult<()> {
if let Some(stored)
= active_heads.get_mut(&relay_parent).and_then(|d| d.note_statement(statement))
{
let fingerprint = stored.fingerprint();
let peers_to_send: Vec<_> = peers.iter_mut()
.filter_map(|(p, data)| if data.accept(&relay_parent, &fingerprint) {
Some(p.clone())
} else {
None
})
.collect();

if peers_to_send.is_empty() { return Ok(()) }

let payload = stored.statement.encode();
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage(
peers_to_send,
PROTOCOL_V1,
payload,
))).await?;
}
Ok(())
}

async fn handle_network_update(
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
update: NetworkBridgeEvent,
) -> SubsystemResult<()> {
Ok(())
}

async fn run(
Expand All @@ -144,20 +255,30 @@ async fn run(
let message = ctx.recv().await?;
match message {
FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => {

active_heads.entry(relay_parent).or_insert(ActiveHeadData {
candidates: HashSet::new(),
seconded_statements: HashSet::new(),
other_statements: HashSet::new(),
});
}
FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => {

// do nothing - we will handle this when our view changes.
}
FromOverseer::Signal(OverseerSignal::Conclude) => break,
FromOverseer::Communication { msg } => match msg {
StatementDistributionMessage::Share(relay_parent, statement) => {
// place into `active_heads` and circulate to all peers with
// the head in their view.
}
StatementDistributionMessage::NetworkBridgeUpdate(event) => match event {
_ => unimplemented!(),
}
StatementDistributionMessage::Share(relay_parent, statement) => share_message(
&mut peers,
&mut active_heads,
&mut ctx,
relay_parent,
statement,
).await?,
StatementDistributionMessage::NetworkBridgeUpdate(event) => handle_network_update(
&mut peers,
&mut active_heads,
&mut ctx,
event,
).await?,
}
}
}
Expand Down