Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
31cb1ad
feat bitfield distribution
drahnr Jul 7, 2020
ddb6228
feat bitfield distribution part 2
drahnr Jul 7, 2020
6ac2c62
pair programming with rustc & cargo
drahnr Jul 7, 2020
4b100df
lets go
drahnr Jul 7, 2020
a3bd8c0
move bitfield-distribution to the node/network folder
drahnr Jul 8, 2020
7909d47
shape shifting
drahnr Jul 8, 2020
e3c8248
lunchtime
drahnr Jul 8, 2020
86f4fdb
ignore the two fn recursion for now
drahnr Jul 8, 2020
3cf0878
step by step
drahnr Jul 9, 2020
6a6d7bc
triplesteps
drahnr Jul 9, 2020
1aa1842
bandaid commit
drahnr Jul 10, 2020
ad0555b
unordered futures magic
drahnr Jul 10, 2020
7966769
chore
drahnr Jul 10, 2020
e8c901a
reword markdown
drahnr Jul 10, 2020
e33dafe
clarify
drahnr Jul 10, 2020
6594dee
lacks abortable processing impl details
drahnr Jul 14, 2020
4c6f9b4
slimify
drahnr Jul 14, 2020
af2849b
fix: warnings and avoid ctx.clone() improve comments
drahnr Jul 15, 2020
977102b
review comments
drahnr Jul 20, 2020
92431f3
fix details
drahnr Jul 20, 2020
23c050e
make sure outgoing messages are tracked
drahnr Jul 20, 2020
a431601
fix name
drahnr Jul 20, 2020
a5077ee
Merge remote-tracking branch 'origin/master' into bernhard-bitfield-d…
drahnr Jul 20, 2020
be2533e
fix subsystem
drahnr Jul 20, 2020
64b6ec1
partial test impl
drahnr Jul 20, 2020
52d9800
relax context bounds
drahnr Jul 21, 2020
df8fe1d
test
drahnr Jul 21, 2020
38661f7
X
drahnr Jul 21, 2020
f1f7cab
X
drahnr Jul 21, 2020
f94b8c5
initial test
drahnr Jul 21, 2020
7a4bd5e
fix relay_message not tracked when origin is self
drahnr Jul 21, 2020
b299ad3
fix/guide: grammar
drahnr Jul 21, 2020
09766c1
work around missing Eq+PartialEq
drahnr Jul 21, 2020
df7a6f0
fix: add missing message to provisioner
drahnr Jul 21, 2020
2398681
unify per_job to job_data
drahnr Jul 21, 2020
8ed8aba
fix/review: part one
drahnr Jul 22, 2020
65e8edb
fix/review: more grumbles
drahnr Jul 22, 2020
087784a
fix/review: track incoming messages per peer
drahnr Jul 22, 2020
d91bcbd
fix/review: extract fn, avoid nested matches
drahnr Jul 22, 2020
c957bf9
fix/review: more tests, simplify test
drahnr Jul 22, 2020
36a6672
fix/review: extend tests to cover more cases
drahnr Jul 22, 2020
5b7c963
chore/rename: Tracker -> ProtocolState
drahnr Jul 22, 2020
794c6c3
chore check and comment rewording
drahnr Jul 22, 2020
f3a718e
feat test: invalid peer message
drahnr Jul 22, 2020
e33e943
remove ignored test cases and unused macros
drahnr Jul 22, 2020
b8d3941
Merge remote-tracking branch 'origin/master' into bernhard-bitfield-d…
drahnr Jul 23, 2020
99f574e
fix master merge fallout + warnings
drahnr Jul 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review comments
  • Loading branch information
drahnr committed Jul 20, 2020
commit 977102b1630c06c59e270a6e48c18404621c4c20
152 changes: 106 additions & 46 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// Polkadot is free software: you can rerelay_message it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// Polkadot is relay_messaged in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
Expand Down Expand Up @@ -47,6 +47,17 @@ const COST_MESSAGE_NOT_DECODABLE: ReputationChange =
ReputationChange::new(-100, "Not intersted in that parent hash");


/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone)]
pub struct BitfieldGossipMessage {
/// The relay parent this message is relative to.
pub relay_parent: Hash,
/// The actual signed availability bitfield.
pub signed_availability: SignedAvailabilityBitfield,
}


/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone)]
Expand All @@ -72,9 +83,23 @@ struct PerRelayParentData {
validator_set: Vec<ValidatorId>,

/// Set of validators for a particular relay parent for which we
/// received a valid `BitfieldGossipMessage` and gossiped it to
/// interested peers.
one_per_validator: HashSet<ValidatorId>,
/// received a valid `BitfieldGossipMessage`.
/// Also serves as the list of known messages for peers connecting
/// after bitfield gossips were already received.
one_per_validator: HashMap<ValidatorId, BitfieldGossipMessage>,

/// which messages of which validators were already sent
message_sent_to_peer: HashMap<PeerId, HashSet<ValidatorId>>,
}

impl PerRelayParentData {
fn peer_already_was_sent_message_signed_by_validator(&self, peer: &PeerId, validator: &ValidatorId) -> bool {
if let Some(set) = self.message_sent_to_peer.get(peer) {
!set.contains(validator)
} else {
false
}
}
}

fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
Expand Down Expand Up @@ -108,7 +133,7 @@ impl BitfieldDistribution {
FromOverseer::Communication { msg } => {
// another subsystem created this signed availability bitfield messages
match msg {
// distribute a bitfield via gossip to other validators.
// relay_message a bitfield via gossip to other validators
BitfieldDistributionMessage::DistributeBitfield(
hash,
signed_availability,
Expand All @@ -119,12 +144,12 @@ impl BitfieldDistribution {
};
// @todo are we subject to sending something multiple times?
// @todo this also apply to ourself?
distribute(&mut ctx, &mut tracker, msg).await?;
relay_message(&mut ctx, &mut tracker, msg).await?;
}
BitfieldDistributionMessage::NetworkBridgeUpdate(event) => {
// a network message was received
if let Err(e) = handle_network_msg(&mut ctx, &mut tracker, event).await {
log::warn!("Failed to handle incomming network messages: {:?}", e);
log::warn!(target: "bitd", "Failed to handle incomming network messages: {:?}", e);
}
}
}
Expand All @@ -145,7 +170,7 @@ impl BitfieldDistribution {
);
}
FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => {
// @todo assumption: it is good enough to prevent addition work from being
// @todo assumption: it is good enough to prevent additional work from being
// scheduled, the individual futures are supposedly completed quickly
let _ = tracker.per_relay_parent.remove(&relay_parent);
}
Expand All @@ -159,8 +184,8 @@ impl BitfieldDistribution {
}
}

/// Modify the reputiation of peer based on their behaviour.
async fn modify_reputiation<Context>(
/// Modify the reputation of peer based on their behaviour.
async fn modify_reputation<Context>(
ctx: &mut Context,
peerid: PeerId,
rep: ReputationChange,
Expand All @@ -177,36 +202,32 @@ where
/// Distribute a given valid bitfield message.
///
/// Can be originated by another subsystem or received via network from another peer.
async fn distribute<Context>(
async fn relay_message<Context>(
ctx: &mut Context,
tracker: &mut Tracker,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
let BitfieldGossipMessage {
relay_parent,
signed_availability,
} = message;
// concurrently pass on the bitfield distribution to all interested peers
let interested_peers = tracker
.peer_views
.iter()
.filter(|(peerid, view)| {
// @todo
true
})
.filter_map(|(peerid, view)| {
if view.contains(&relay_parent) {
if view.contains(&message.relay_parent) {
Some(peerid.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also have a check that the peer hasn't yet seen anything by that validator, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(a malicious validator could issue 2+ bitfields and introduce them at separate points of the network. if we lie on that boundary, the peer may send us 1 and we would send them the other). I think we need to track if the peer has already sent us a bitfield per validator so we can punish them if they ever send another. And then we should track if we've sent a bitfield per validator.

We don't do equivocation reporting or anything like that for bitfields, though.

} else {
None
}
})
.collect::<Vec<PeerId>>();

let message = BitfieldGossipMessage {
relay_parent,
signed_availability,
};
let bytes = message.encode();
let bytes = Encode::encode(&message);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendMessage(
interested_peers,
Expand All @@ -218,32 +239,33 @@ where
Ok(())
}


/// Handle an incoming message from a peer.
async fn process_incoming_peer_message<Context>(
ctx: &mut Context,
tracker: &mut Tracker,
peerid: PeerId,
origin: PeerId,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
// we don't care about this, not part of our view
if !tracker.view.contains(&message.relay_parent) {
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
return modify_reputation(ctx, origin, COST_NOT_INTERESTED).await;
}

// Ignore anything the overseer did not tell this subsystem to work on
let mut job_data = tracker.per_relay_parent.get_mut(&message.relay_parent);
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
job_data
} else {
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
return modify_reputation(ctx, origin, COST_NOT_INTERESTED).await;
};

let validator_set = &job_data.validator_set;
if validator_set.len() == 0 {
return modify_reputiation(ctx, peerid, COST_MISSING_PEER_SESSION_KEY).await;
return modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
}

// check all validators that could have signed this message
Expand All @@ -256,28 +278,32 @@ where
.is_ok()
}) {
let one_per_validator = &mut (job_data.one_per_validator);
// only distribute a message of a validator once
if one_per_validator.contains(validator) {
// only relay_message a message of a validator once
if one_per_validator.get(validator).is_some() {
return Ok(());
}
one_per_validator.insert(validator.clone());
one_per_validator.insert(validator.clone(), message.clone());


// track which messages that peer already received
let message_sent_to_peer = &mut (job_data.message_sent_to_peer);
message_sent_to_peer
.entry(origin)
.or_insert_with(|| {
HashSet::with_capacity(16)
})
.insert(validator.clone());

} else {
return modify_reputiation(ctx, peerid, COST_SIGNATURE_INVALID).await;
return modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await;
}

// passed all conditions, distribute!
distribute(ctx, tracker, message).await?;
// passed all conditions, distribute to peers!
relay_message(ctx, tracker, message).await?;

Ok(())
}

/// A gossiped or gossipable signed availability bitfield for a particular relay parent.
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub struct BitfieldGossipMessage {
relay_parent: Hash,
signed_availability: SignedAvailabilityBitfield,
}

/// Deal with network bridge updates and track what needs to be tracked
async fn handle_network_msg<Context>(
ctx: &mut Context,
Expand All @@ -297,32 +323,66 @@ where
tracker.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
tracker
.peer_views
.entry(peerid)
.and_modify(|val| *val = view);
catch_up_messages(ctx, tracker, peerid, view).await?;
}
NetworkBridgeEvent::OurViewChange(view) => {
let old_view = std::mem::replace(&mut (tracker.view), view);

for new in tracker.view.difference(&old_view) {
if !tracker.per_relay_parent.contains_key(&new) {
log::warn!("Our view contains {} but the overseer never told use we should work on this", &new);
log::warn!(target: "bitd", "Our view contains {} but the overseer never told use we should work on this", &new);
}
}
}
NetworkBridgeEvent::PeerMessage(remote, bytes) => {
log::info!("Got a peer message from {:?}", &remote);
if let Ok(gossiped_bitfield) = BitfieldGossipMessage::decode(&mut (bytes.as_slice())) {
log::trace!(target: "bitd", "Received bitfield gossip from peer {:?}", &remote);
process_incoming_peer_message(ctx, tracker, remote, gossiped_bitfield).await?;
} else {
return modify_reputiation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await;
return modify_reputation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await;
}
}
}
Ok(())
}

// Send the difference between two views which were not sent
// to that particular peer.
async fn catch_up_messages<Context>(
ctx: &mut Context,
tracker: &mut Tracker,
origin: PeerId,
view: View,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
use std::collections::hash_map::Entry;

match tracker
.peer_views
.entry(origin) {
Entry::Occupied(ref mut occupied) => {
let current = occupied.get_mut();

// send all messages we've seen before for this peer
for new_relay_parent_interest in (*current).difference(&view) {
if let Some(data) = tracker.per_relay_parent.get(new_relay_parent_interest) {
// if !data.peer_ {
// } @todo
todo!("XXX");
}
}

*current = view;
},
Entry::Vacant(vacant) => { let _ = vacant.insert(view.clone()); } ,
}

Ok(())
}


impl<C> Subsystem<C> for BitfieldDistribution
where
C: SubsystemContext<Message = BitfieldDistributionMessage> + Clone + Sync + Send,
Expand Down
3 changes: 2 additions & 1 deletion node/subsystem/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use polkadot_primitives::v1::{
use polkadot_node_primitives::{
MisbehaviorReport, SignedFullStatement, View, ProtocolId, ValidationResult,
};

use std::sync::Arc;

pub use sc_network::{ObservedRole, ReputationChange, PeerId};
Expand Down Expand Up @@ -398,6 +397,8 @@ pub enum AllMessages {
AvailabilityDistribution(AvailabilityDistributionMessage),
/// Message for the bitfield distribution subsystem.
BitfieldDistribution(BitfieldDistributionMessage),
/// Message for the block authorship provisioning subsystem.
BlockAuthorshipProvisioning(BlockAuthorshipProvisioningMessage),
/// Message for the Provisioner subsystem.
Provisioner(ProvisionerMessage),
/// Message for the PoV Distribution subsystem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@ Input:
Output:

- `NetworkBridge::RegisterEventProducer(ProtocolId)` in order to register ourselfs as an event provide for the protocol.
- `NetworkBridge::SendMessage([PeerId], ProtocolId, Bytes)`
- `NetworkBridge::ReportPeer(PeerId, cost_or_benefit)` improve or penalize the reputiation of peer based on the message we received relative to our state.
- `BitfieldDistributionMessage::DistributeBitfield(relay_parent, SignedAvailabilityBitfield)` gossip a verified incoming bitfield on to interested subsystems within this validator node.
- `NetworkBridge::SendMessage([PeerId], ProtocolId, Bytes)` gossip a verified incoming bitfield on to interested subsystems within this validator node.
- `NetworkBridge::ReportPeer(PeerId, cost_or_benefit)` improve or penalize the reputation of peers based on the messages that are received relative to the current view.
- `BlockAuthorshipProvisioning::Bitfield(relay_parent, SignedAvailabilityBitfield)` pass
on the bitfield to the other submodules via the overseer.

## Functionality

This is implemented as a gossip system. Register a [network bridge](../utility/network-bridge.md) event producer on startup.

It is necessary to track peer connection, view change, and disconnection events, in order to maintain an index of which peers are interested in which relay parent bitfields.
Before gossiping incoming bitfields on, they must be checked to be signed by one of the validators


Before gossiping incoming bitfields, they must be checked to be signed by one of the validators
of the validator set relevant to the current relay parent.
Only accept bitfields relevant to our current view and only distribute bitfields to other peers when relevant to their most recent view.
Accept and distribute only one bitfield per validator.


When receiving a bitfield either from the network or from a `DistributeBitfield` message, forward it along to the block authorship (provisioning) subsystem for potential inclusion in a block.

Peers connecting after a set of valid bitfield gossip messages was received, those messages must be cached and sent upon connection of new peers or re-connecting peers.