Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5d7f6d5
set up data types and control flow for statement distribution
rphmeier Jun 27, 2020
28f12c5
add some set-like methods to View
rphmeier Jun 27, 2020
f8992a2
implement sending to peers
rphmeier Jun 27, 2020
72b339b
start fixing equivocation handling
rphmeier Jun 29, 2020
6a16cd7
Add a section to the statement distribution subsystem on equivocation…
rphmeier Jun 30, 2020
ca3b523
fix typo and amend wording
rphmeier Jun 30, 2020
8464686
implement flood protection
rphmeier Jul 1, 2020
cac9adc
have peer knowledge tracker follow when peer first learns about a can…
rphmeier Jul 1, 2020
1be7e37
send dependents after circulating
rphmeier Jul 1, 2020
47f2df8
add another TODO
rphmeier Jul 1, 2020
cd30c21
trigger send in one more place
rphmeier Jul 1, 2020
a39ad10
refactors from review
rphmeier Jul 1, 2020
751122b
send new statements to candidate backing
rphmeier Jul 1, 2020
d413426
instantiate active head data with runtime API values
rphmeier Jul 1, 2020
7b82e1b
track our view changes and peer view changes
rphmeier Jul 1, 2020
94a69ff
apply a benefit to peers who send us statements we want
rphmeier Jul 1, 2020
a3b4b6a
remove unneeded TODO
rphmeier Jul 1, 2020
eb51e80
add some comments and improve Hash implementation
rphmeier Jul 1, 2020
b9876ae
start tests and fix `note_statement`
rphmeier Jul 2, 2020
f605ed2
test active_head seconding logic
rphmeier Jul 2, 2020
5650620
test that the per-peer tracking logic works
rphmeier Jul 2, 2020
dd00108
test per-peer knowledge tracker
rphmeier Jul 2, 2020
c93b545
test that peer view updates lead to messages being sent
rphmeier Jul 2, 2020
19ee505
test statement circulation
rphmeier Jul 2, 2020
9093736
address review comments
rphmeier Jul 2, 2020
6d18b62
have view set methods return references
rphmeier Jul 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
track our view changes and peer view changes
  • Loading branch information
rphmeier committed Jul 1, 2020
commit 7b82e1bdb3d950d24e701c6e032d4879fff778c0
122 changes: 93 additions & 29 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
//! validity amongst validators.

use polkadot_subsystem::{
Subsystem, SubsystemResult, SubsystemError, SubsystemContext, SpawnedSubsystem,
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
FromOverseer, OverseerSignal,
};
use polkadot_subsystem::messages::{
AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage,
PeerId, ObservedRole, ReputationChange as Rep, CandidateBackingMessage, RuntimeApiMessage,
PeerId, ReputationChange as Rep, CandidateBackingMessage, RuntimeApiMessage,
RuntimeApiRequest,
};
use node_primitives::{ProtocolId, View, SignedFullStatement};
Expand Down Expand Up @@ -57,16 +57,6 @@ const BENEFIT_VALID_STATEMENT: Rep = Rep::new(25, "Peer provided a valid stateme
/// Typically we will only keep 1, but when a validator equivocates we will need to track 2.
const VC_THRESHOLD: usize = 2;

/// The maximum amount of candidates each peer can be aware of each validator seconding at
/// any relay-parent. Short for "Validator Candidate per Peer Threshold".
///
/// This is 2 times the `VC_THRESHOLD` because it includes the candidates in
/// our state that we may have sent them, and the candidates that they may have received from
/// other peers in the meantime. Peers are unlikely to ever be aware of more than 2 candidates
/// except in the case of a targeted attack by a sophisticated adversary. Nevertheless, we
/// establish a finite bound on memory used in such a situation.
const VC_PEER_THRESHOLD: usize = 2 * VC_THRESHOLD;

/// The statement distribution subsystem.
pub struct StatementDistribution;

Expand Down Expand Up @@ -96,10 +86,6 @@ struct VcPerPeerTracker {
}

impl VcPerPeerTracker {
fn contains(&self, h: &Hash) -> bool {
self.local_observed.contains(h) || self.remote_observed.contains(h)
}

// Note that the remote should now be aware that a validator has seconded a given candidate (by hash)
// based on a message that we have sent it from our local pool.
fn note_local(&mut self, h: Hash) {
Expand Down Expand Up @@ -135,6 +121,7 @@ fn note_hash(
}

// knowledge that a peer has about goings-on in a relay parent.
#[derive(Default)]
struct PeerRelayParentKnowledge {
// candidates that the peer is aware of. This indicates that we can
// send other statements pertaining to that candidate.
Expand Down Expand Up @@ -261,7 +248,6 @@ impl PeerRelayParentKnowledge {
}

struct PeerData {
role: ObservedRole,
view: View,
view_knowledge: HashMap<Hash, PeerRelayParentKnowledge>,
}
Expand Down Expand Up @@ -564,6 +550,32 @@ async fn send_statements_about(
Ok(())
}

/// Send all statements at a given relay-parent to a peer.
async fn send_statements(
peer: PeerId,
peer_data: &mut PeerData,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
relay_parent: Hash,
active_head: &ActiveHeadData
) -> SubsystemResult<()> {
for statement in active_head.statements() {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = WireMessage::Statement(
relay_parent,
statement.statement.clone(),
).encode();

ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage(
vec![peer.clone()],
PROTOCOL_V1,
payload,
))).await?;
}
}

Ok(())
}

async fn report_peer(
ctx: &mut impl SubsystemContext,
peer: PeerId,
Expand Down Expand Up @@ -618,8 +630,8 @@ async fn handle_incoming_message<'a>(
let fingerprint = (statement.payload().to_compact(), statement.validator_index());
let max_message_count = active_head.validators.len() * 2;
match peer_data.receive(&relay_parent, &fingerprint, max_message_count) {
Err(e) => {
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await?;
Err(rep) => {
report_peer(ctx, peer, rep).await?;
return Ok(None)
}
Ok(true) => {
Expand All @@ -639,9 +651,46 @@ async fn handle_incoming_message<'a>(

// Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation
// or unpinned to a seconded candidate. So it is safe to place it into the storage.
// TODO [now]: reward the peer if the statement was new or something we'd be interested in.
// slightly lower reward for losing races.
Ok(active_head.note_statement(statement).map(|s| (relay_parent, s)))
}

/// Update a peer's view. Sends all newly unlocked statements based on the previous
async fn update_peer_view_and_send_unlocked(
peer: PeerId,
peer_data: &mut PeerData,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
active_heads: &HashMap<Hash, ActiveHeadData>,
new_view: View,
) -> SubsystemResult<()> {
let old_view = std::mem::replace(&mut peer_data.view, new_view);

// Remove entries for all relay-parents in the old view but not the new.
for removed in old_view.difference(&peer_data.view) {
let _ = peer_data.view_knowledge.remove(&removed);
}

// Add entries for all relay-parents in the new view but not the old.
// Furthermore, send all statements we have for those relay parents.
let new_view = peer_data.view.difference(&old_view).collect::<Vec<_>>();
for new in new_view.iter().copied() {
peer_data.view_knowledge.insert(new, Default::default());

if let Some(active_head) = active_heads.get(&new) {
send_statements(
peer.clone(),
peer_data,
ctx,
new,
active_head,
).await?;
}
}

Ok(())
}

async fn handle_network_update(
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
Expand All @@ -650,9 +699,8 @@ async fn handle_network_update(
update: NetworkBridgeEvent,
) -> SubsystemResult<()> {
match update {
NetworkBridgeEvent::PeerConnected(peer, role) => {
NetworkBridgeEvent::PeerConnected(peer, _role) => {
peers.insert(peer, PeerData {
role,
view: Default::default(),
view_knowledge: Default::default(),
});
Expand Down Expand Up @@ -691,15 +739,31 @@ async fn handle_network_update(

}
NetworkBridgeEvent::PeerViewChange(peer, view) => {
// TODO [now]
// 1. Update the view.
// 2. Send this peer all messages that we have for active heads in the view.
Ok(())
match peers.get_mut(&peer) {
Some(data) => {
update_peer_view_and_send_unlocked(
peer,
data,
ctx,
&*active_heads,
view,
).await
}
None => Ok(()),
}
}
NetworkBridgeEvent::OurViewChange(view) => {
// TODO [now]
// 1. Update our view.
// 2. Clean up everything that is not in the new view.
let old_view = std::mem::replace(our_view, view);
active_heads.retain(|head, _| our_view.contains(head));

for new in our_view.difference(&old_view) {
if !active_heads.contains_key(&new) {
log::warn!(target: "statement_distribution", "Our network bridge view update \
inconsistent with `StartWork` messages we have received from overseer. \
Contains unknown hash {}", new);
}
}

Ok(())
}
}
Expand Down Expand Up @@ -744,7 +808,7 @@ async fn run(
active_heads.entry(relay_parent)
.or_insert(ActiveHeadData::new(validators, session_index));
}
FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => {
FromOverseer::Signal(OverseerSignal::StopWork(_relay_parent)) => {
// do nothing - we will handle this when our view changes.
}
FromOverseer::Signal(OverseerSignal::Conclude) => break,
Expand Down