Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
31cb1ad
feat bitfield distribution
drahnr Jul 7, 2020
ddb6228
feat bitfield distribution part 2
drahnr Jul 7, 2020
6ac2c62
pair programming with rustc & cargo
drahnr Jul 7, 2020
4b100df
lets go
drahnr Jul 7, 2020
a3bd8c0
move bitfield-distribution to the node/network folder
drahnr Jul 8, 2020
7909d47
shape shifting
drahnr Jul 8, 2020
e3c8248
lunchtime
drahnr Jul 8, 2020
86f4fdb
ignore the two fn recursion for now
drahnr Jul 8, 2020
3cf0878
step by step
drahnr Jul 9, 2020
6a6d7bc
triplesteps
drahnr Jul 9, 2020
1aa1842
bandaid commit
drahnr Jul 10, 2020
ad0555b
unordered futures magic
drahnr Jul 10, 2020
7966769
chore
drahnr Jul 10, 2020
e8c901a
reword markdown
drahnr Jul 10, 2020
e33dafe
clarify
drahnr Jul 10, 2020
6594dee
lacks abortable processing impl details
drahnr Jul 14, 2020
4c6f9b4
slimify
drahnr Jul 14, 2020
af2849b
fix: warnings and avoid ctx.clone() improve comments
drahnr Jul 15, 2020
977102b
review comments
drahnr Jul 20, 2020
92431f3
fix details
drahnr Jul 20, 2020
23c050e
make sure outgoing messages are tracked
drahnr Jul 20, 2020
a431601
fix name
drahnr Jul 20, 2020
a5077ee
Merge remote-tracking branch 'origin/master' into bernhard-bitfield-d…
drahnr Jul 20, 2020
be2533e
fix subsystem
drahnr Jul 20, 2020
64b6ec1
partial test impl
drahnr Jul 20, 2020
52d9800
relax context bounds
drahnr Jul 21, 2020
df8fe1d
test
drahnr Jul 21, 2020
38661f7
X
drahnr Jul 21, 2020
f1f7cab
X
drahnr Jul 21, 2020
f94b8c5
initial test
drahnr Jul 21, 2020
7a4bd5e
fix relay_message not tracked when origin is self
drahnr Jul 21, 2020
b299ad3
fix/guide: grammar
drahnr Jul 21, 2020
09766c1
work around missing Eq+PartialEq
drahnr Jul 21, 2020
df7a6f0
fix: add missing message to provisioner
drahnr Jul 21, 2020
2398681
unify per_job to job_data
drahnr Jul 21, 2020
8ed8aba
fix/review: part one
drahnr Jul 22, 2020
65e8edb
fix/review: more grumbles
drahnr Jul 22, 2020
087784a
fix/review: track incoming messages per peer
drahnr Jul 22, 2020
d91bcbd
fix/review: extract fn, avoid nested matches
drahnr Jul 22, 2020
c957bf9
fix/review: more tests, simplify test
drahnr Jul 22, 2020
36a6672
fix/review: extend tests to cover more cases
drahnr Jul 22, 2020
5b7c963
chore/rename: Tracker -> ProtocolState
drahnr Jul 22, 2020
794c6c3
chore check and comment rewording
drahnr Jul 22, 2020
f3a718e
feat test: invalid peer message
drahnr Jul 22, 2020
e33e943
remove ignored test cases and unused macros
drahnr Jul 22, 2020
b8d3941
Merge remote-tracking branch 'origin/master' into bernhard-bitfield-d…
drahnr Jul 23, 2020
99f574e
fix master merge fallout + warnings
drahnr Jul 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: warnings and avoid ctx.clone() improve comments
  • Loading branch information
drahnr committed Jul 15, 2020
commit af2849b247473a5d7ef3f347a54c7a3451b8b1c9
122 changes: 64 additions & 58 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! The bitfield distribution subsystem spreading @todo .
//! The bitfield distribution
//!
//! In case this node is a validator, gossips its own signed availability bitfield
//! for a particular relay parent.
//! Independently of that, gossips on received messages from peers to other interested peers.

use codec::{Decode, Encode};
use futures::{
channel::oneshot,
future::{abortable, AbortHandle, Abortable},
stream::{FuturesUnordered, Stream, StreamExt},
Future, FutureExt,
FutureExt,
};

use node_primitives::{ProtocolId, View};
Expand All @@ -30,14 +32,10 @@ use polkadot_node_subsystem::messages::*;
use polkadot_node_subsystem::{
FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
};
use polkadot_primitives::parachain::SignedAvailabilityBitfield;
use polkadot_primitives::parachain::{SigningContext, ValidatorId};
use polkadot_primitives::Hash;
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield};
use polkadot_primitives::v0::{SigningContext, ValidatorId};
use sc_network::ReputationChange;
use std::{
collections::{HashMap, HashSet},
pin::Pin,
};
use std::collections::{HashMap, HashSet};

const COST_SIGNATURE_INVALID: ReputationChange =
ReputationChange::new(-100, "Bitfield signature invalid");
Expand All @@ -49,45 +47,48 @@ const COST_MESSAGE_NOT_DECODABLE: ReputationChange =
ReputationChange::new(-100, "Not intersted in that parent hash");


/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone)]
struct Tracker {
// track all active peers and their views
// to determine what is relevant to them
/// track all active peers and their views
/// to determine what is relevant to them.
peer_views: HashMap<PeerId, View>,

// our current view
/// Our current view.
view: View,

// set of validators for a particular relay_parent
/// Additional data particular to a relay parent.
per_relay_parent: HashMap<Hash, PerRelayParentData>,
}

/// Data for each relay parent
/// Data for a particular relay parent.
#[derive(Debug, Clone, Default)]
struct PerRelayParentData {
// set of validators which already sent a message
validator_bitset_received: HashSet<ValidatorId>,

// signing context for a particular relay_parent
/// Signing context for a particular relay parent.
signing_context: SigningContext<Hash>,

// set of validators for a particular relay_parent
/// Set of validators for a particular relay parent.
validator_set: Vec<ValidatorId>,

// set of validators for a particular relay_parent and the number of messages
// received authored by them
/// Set of validators for a particular relay parent for which we
/// received a valid `BitfieldGossipMessage` and gossiped it to
/// interested peers.
one_per_validator: HashSet<ValidatorId>,
}

fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
AllMessages::BitfieldDistribution(BitfieldDistributionMessage::NetworkBridgeUpdate(n))
}

/// The bitfield distribution subsystem.
pub struct BitfieldDistribution;

impl BitfieldDistribution {
/// The protocol identifier for bitfield distribution.
const PROTOCOL_ID: ProtocolId = *b"bitd";

/// Start processing work as passed on from the Overseer.
async fn run<Context>(mut ctx: Context) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
Expand All @@ -98,20 +99,16 @@ impl BitfieldDistribution {
))
.await?;

// set of active heads the overseer told us to work on with the connected
// tasks abort handles
// work: process incoming messages from the overseer and process accordingly.
let mut tracker = Tracker::default();
loop {
{
let message = {
let mut ctx = ctx.clone();
ctx.recv().await?
};
let message = ctx.recv().await?;
match message {
FromOverseer::Communication { msg } => {
// we signed this bitfield
// another subsystem created this signed availability bitfield messages
match msg {
// Distribute a bitfield via gossip to other validators.
// distribute a bitfield via gossip to other validators.
BitfieldDistributionMessage::DistributeBitfield(
hash,
signed_availability,
Expand All @@ -120,31 +117,36 @@ impl BitfieldDistribution {
relay_parent: hash,
signed_availability,
};
// @todo do we also make sure to not send it twice if we are the source
// @todo are we subject to sending something multiple times?
// @todo this also apply to ourself?
distribute(ctx.clone(), &mut tracker, msg).await?;
distribute(&mut ctx, &mut tracker, msg).await?;
}
BitfieldDistributionMessage::NetworkBridgeUpdate(event) => {
handle_network_msg(ctx.clone(), &mut tracker, event).await;
// a network message was received
if let Err(e) = handle_network_msg(&mut ctx, &mut tracker, event).await {
log::warn!("Failed to handle incomming network messages: {:?}", e);
}
}
}
}
FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => {
// query basic system parameters once
// @todo assumption: these cannot change within a session
let (validator_set, signing_context) =
query_basics(ctx.clone(), relay_parent).await?;
query_basics(&mut ctx, relay_parent).await?;

let _ = tracker.per_relay_parent.insert(
relay_parent,
PerRelayParentData {
signing_context,
validator_set: validator_set,
validator_set,
..Default::default()
},
);
// futurama.insert(relay_parent.clone(), Box::pin(future.map(|_| ())));
// active_jobs.insert(relay_parent.clone(), abort_handle);
}
FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => {
// @todo assumption: it is good enough to prevent addition work from being
// scheduled, the individual futures are supposedly completed quickly
let _ = tracker.per_relay_parent.remove(&relay_parent);
}
FromOverseer::Signal(OverseerSignal::Conclude) => {
Expand All @@ -153,15 +155,13 @@ impl BitfieldDistribution {
}
}
}
// active_jobs
// .retain(|_, future| future.poll().is_pending());
}
}
}

/// modify the reputiation, good or bad
/// Modify the reputiation of peer based on their behaviour.
async fn modify_reputiation<Context>(
mut ctx: Context,
ctx: &mut Context,
peerid: PeerId,
rep: ReputationChange,
) -> SubsystemResult<()>
Expand All @@ -174,16 +174,17 @@ where
.await
}

/// Distribute a checked message, either originated by us or gossiped on from other peers.
/// Distribute a given valid bitfield message.
///
/// Can be originated by another subsystem or received via network from another peer.
async fn distribute<Context>(
mut ctx: Context,
ctx: &mut Context,
tracker: &mut Tracker,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{

let BitfieldGossipMessage {
relay_parent,
signed_availability,
Expand All @@ -192,11 +193,17 @@ where
let interested_peers = tracker
.peer_views
.iter()
.filter(|(_peerid, view)| view.contains(&relay_parent))
.map(|(peerid, _)| peerid.clone())
.filter_map(|(peerid, view)| {
if view.contains(&relay_parent) {
Some(peerid.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also have a check that the peer hasn't yet seen anything by that validator, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(a malicious validator could issue 2+ bitfields and introduce them at separate points of the network. if we lie on that boundary, the peer may send us 1 and we would send them the other). I think we need to track if the peer has already sent us a bitfield per validator so we can punish them if they ever send another. And then we should track if we've sent a bitfield per validator.

We don't do equivocation reporting or anything like that for bitfields, though.

} else {
None
}
})
.collect::<Vec<PeerId>>();

let message = BitfieldGossipMessage {
relay_parent: relay_parent,
relay_parent,
signed_availability,
};
let bytes = message.encode();
Expand All @@ -211,9 +218,9 @@ where
Ok(())
}

/// Handle an incoming message from a peer
/// Handle an incoming message from a peer.
async fn process_incoming_peer_message<Context>(
ctx: Context,
ctx: &mut Context,
tracker: &mut Tracker,
peerid: PeerId,
message: BitfieldGossipMessage,
Expand All @@ -228,7 +235,7 @@ where

// Ignore anything the overseer did not tell this subsystem to work on
let mut job_data = tracker.per_relay_parent.get_mut(&message.relay_parent);
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
job_data
} else {
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
Expand All @@ -251,10 +258,10 @@ where
let one_per_validator = &mut (job_data.one_per_validator);
// only distribute a message of a validator once
if one_per_validator.contains(validator) {
return Ok(())
return Ok(());
}
one_per_validator.insert(validator.clone());
} else {
} else {
return modify_reputiation(ctx, peerid, COST_SIGNATURE_INVALID).await;
}

Expand All @@ -264,7 +271,7 @@ where
Ok(())
}

/// A gossiped or gossipable signed availability bitfield for a particular relay hash
/// A gossiped or gossipable signed availability bitfield for a particular relay parent.
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub struct BitfieldGossipMessage {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a bit more than an implementation detail; it should probably be mentioned in the guide, and possibly moved to messages.rs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other networking subsystems have a WireMessage type which serves this rough purpose. However, it is not exposed publicly beyond the crate.

relay_parent: Hash,
Expand All @@ -273,14 +280,13 @@ pub struct BitfieldGossipMessage {

/// Deal with network bridge updates and track what needs to be tracked
async fn handle_network_msg<Context>(
ctx: Context,
ctx: &mut Context,
tracker: &mut Tracker,
bridge_message: NetworkBridgeEvent,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
let ego = &((*tracker).view);
match bridge_message {
NetworkBridgeEvent::PeerConnected(peerid, _role) => {
// insert if none already present
Expand Down Expand Up @@ -308,7 +314,7 @@ where
NetworkBridgeEvent::PeerMessage(remote, bytes) => {
log::info!("Got a peer message from {:?}", &remote);
if let Ok(gossiped_bitfield) = BitfieldGossipMessage::decode(&mut (bytes.as_slice())) {
process_incoming_peer_message(ctx, tracker, remote, gossiped_bitfield).await;
process_incoming_peer_message(ctx, tracker, remote, gossiped_bitfield).await?;
} else {
return modify_reputiation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await;
}
Expand All @@ -328,7 +334,7 @@ where

/// query the validator set and signing context
async fn query_basics<Context>(
mut ctx: Context,
ctx: &mut Context,
relay_parent: Hash,
) -> SubsystemResult<(Vec<ValidatorId>, SigningContext)>
where
Expand Down