Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
54873ea
network bridge skeleton
rphmeier Jun 17, 2020
5eea3bf
move some primitives around and add debug impls
rphmeier Jun 17, 2020
27456b5
protocol registration glue & abstract network interface
rphmeier Jun 17, 2020
3bf17fa
add send_msgs to subsystemctx
rphmeier Jun 18, 2020
e05abd2
select logic
rphmeier Jun 18, 2020
ba005c9
transform different events into actions and handle
rphmeier Jun 18, 2020
7572bba
implement remaining network bridge state machine
rphmeier Jun 18, 2020
24a91c4
start test skeleton
rphmeier Jun 22, 2020
064a00c
make network methods asynchronous
rphmeier Jun 22, 2020
c116061
extract subsystem out to subsystem crate
rphmeier Jun 24, 2020
e6162de
port over overseer to subsystem context trait
rphmeier Jun 24, 2020
844a9d1
fix minimal example
rphmeier Jun 24, 2020
cae1561
fix overseer doc test
rphmeier Jun 24, 2020
2c9be73
update network-bridge crate
rphmeier Jun 24, 2020
534535f
write a subsystem test-helpers crate
rphmeier Jun 25, 2020
8ac6269
write a network test helper for network-bridge
rphmeier Jun 25, 2020
51ff607
set up (broken) view test
rphmeier Jun 25, 2020
960029b
Revamp network to be more async-friendly and not require Sync
rphmeier Jun 25, 2020
eb52f9a
fix spacing
rphmeier Jun 25, 2020
f6526c4
fix test compilation
rphmeier Jun 25, 2020
0f5a1b1
insert side-channel for actions
rphmeier Jun 25, 2020
8f75746
Add some more message types to AllMessages
rphmeier Jun 25, 2020
940216b
introduce a test harness
rphmeier Jun 25, 2020
a20cfd5
impl ProvideInherent for InclusionInherent
coriolinus Jun 26, 2020
5c44804
reduce import churn; correct expect message
coriolinus Jun 26, 2020
6efd05a
move inclusion inherent identifier into primitives
coriolinus Jun 26, 2020
fedd931
bump kusama spec_version to placate CI
coriolinus Jun 26, 2020
d696e1b
copy sc_basic_authorship::{ProposerFactory, Proposer}
coriolinus Jun 26, 2020
9f05198
use polkadot custom proposer instead of basic-authorship one
coriolinus Jun 26, 2020
859bd8b
add some tests
rphmeier Jun 26, 2020
c504d12
Merge branch 'master' into rh-network-bridge
rphmeier Jun 26, 2020
4343f5c
ensure service compiles and passes tests
rphmeier Jun 26, 2020
98c4c54
fix typo
rphmeier Jun 26, 2020
d194838
fix service-new compilation
rphmeier Jun 26, 2020
d468a2b
Subsystem test helpers send messages synchronously
rphmeier Jun 27, 2020
b676adc
remove smelly action inspector
rphmeier Jun 27, 2020
7d44a62
remove superfluous let binding
rphmeier Jun 27, 2020
989e8b5
fix warnings
rphmeier Jun 27, 2020
dffb0ab
Merge remote-tracking branch 'origin/master' into prgn-custom-proposer
coriolinus Jun 29, 2020
da07b9e
add license header
coriolinus Jun 29, 2020
3110bf8
empty commit; maybe github will notice the one with changes
coriolinus Jun 29, 2020
64792f7
Update node/network/bridge/src/lib.rs
rphmeier Jun 29, 2020
6b88341
add sanity check to only include valid inherents
coriolinus Jun 30, 2020
2b6a7d0
stub: encapsulate block production mechanics instead of copying them
coriolinus Jun 30, 2020
01f2cd9
Merge remote-tracking branch 'origin/prgn-inclusioninherent-providein…
coriolinus Jun 30, 2020
d2af626
partial implementation of propose fn
coriolinus Jun 30, 2020
50b51dd
fix compilation
rphmeier Jun 30, 2020
fd1dea5
Merge remote-tracking branch 'origin/rh-network-bridge' into prgn-cus…
coriolinus Jun 30, 2020
2ab09a7
Merge remote-tracking branch 'origin/rh-network-bridge' into prgn-cus…
coriolinus Jun 30, 2020
ad5cbc8
clear a few more compile errors
coriolinus Jun 30, 2020
4577215
Merge remote-tracking branch 'origin/master' into prgn-custom-proposer
coriolinus Jul 1, 2020
5bcf147
finish fn propose
coriolinus Jul 1, 2020
2377744
broken: add timeout to proposal
coriolinus Jul 1, 2020
af699dd
add timeout to proposal
coriolinus Jul 1, 2020
3642dce
guide: provisioner is responsible for selecting parachain candidates
coriolinus Jul 1, 2020
75190d8
implement ProvisionerMessage::RequestInherentData & update fn propose
coriolinus Jul 1, 2020
3063303
Merge remote-tracking branch 'origin/master' into prgn-custom-proposer
coriolinus Jul 2, 2020
6e41882
impl CreateProposer::init; clean up
coriolinus Jul 2, 2020
cfec8c2
impl std::error::Error for Error
coriolinus Jul 2, 2020
6092ba0
document error-handling rationale
coriolinus Jul 2, 2020
c78dd28
cause polkadot-service-new to compile correctly
coriolinus Jul 2, 2020
d37e89e
Move potentially-blocking call from fn init -> fn propose
coriolinus Jul 3, 2020
b91be19
document struct Proposer
coriolinus Jul 3, 2020
9bae56e
extract provisioner data fetch
coriolinus Jul 3, 2020
0b0b86c
Merge branch 'master' into prgn-custom-proposer
gavofyork Jul 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
implement remaining network bridge state machine
  • Loading branch information
rphmeier committed Jun 18, 2020
commit 7572bbab43e8e35b2eb492bcfaa5bb20babe77e9
2 changes: 1 addition & 1 deletion node/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub enum CandidateValidationMessage {
}

/// Events from network.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum NetworkBridgeEvent {
/// A peer has connected.
PeerConnected(PeerId, ObservedRole),
Expand Down
182 changes: 153 additions & 29 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use overseer::{Subsystem, SubsystemContext, SpawnedSubsystem};
use node_primitives::{ProtocolId, View};
use polkadot_primitives::{Block, Hash};

use std::collections::HashMap;
use std::collections::hash_map::{HashMap, Entry};
use std::sync::Arc;

const MAX_VIEW_HEADS: usize = 5;
Expand All @@ -43,6 +43,13 @@ pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2";
/// The protocol name.
pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2";

const MALFORMED_MESSAGE_COST: ReputationChange
= ReputationChange::new(-500, "Malformed Network-bridge message");
const UNKNOWN_PROTO_COST: ReputationChange
= ReputationChange::new(-50, "Message sent to unknown protocol");
const MALFORMED_VIEW_COST: ReputationChange
= ReputationChange::new(-500, "Malformed view");

/// Messages received on the network.
#[derive(Encode, Decode)]
pub enum WireMessage {
Expand All @@ -61,7 +68,7 @@ pub fn notifications_protocol_info() -> (ConsensusEngineId, std::borrow::Cow<'st
}

/// An abstraction over networking for the purposes of this subsystem.
pub trait Network: Clone + Send + 'static {
pub trait Network: Clone + Send + Sync + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID).
Expand Down Expand Up @@ -121,9 +128,7 @@ enum Action {

PeerConnected(PeerId, ObservedRole),
PeerDisconnected(PeerId),
PeerMalformedMessage(PeerId),
PeerMessages(PeerId, Vec<WireMessage>),
PeerViewChange(PeerId, View),

Abort,
}
Expand Down Expand Up @@ -179,7 +184,7 @@ fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> {
.collect();

match v {
Err(_) => Some(Action::PeerMalformedMessage(remote)),
Err(_) => Some(Action::ReportPeer(remote, MALFORMED_MESSAGE_COST)),
Ok(v) => if v.is_empty() {
None
} else {
Expand All @@ -190,26 +195,61 @@ fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> {
}
}

fn construct_view(live_heads: &[Hash]) -> View {
View(live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect())
}

async fn dispatch_update_to_all(
update: NetworkBridgeEvent,
event_producers: impl IntoIterator<Item=&fn(NetworkBridgeEvent) -> AllMessages>,
ctx: &mut SubsystemContext<NetworkBridgeMessage>,
) -> overseer::SubsystemResult<()> {
// collect messages here to avoid the borrow lasting across await boundary.
let messages: Vec<_> = event_producers.into_iter()
.map(|producer| producer(update.clone()))
.collect();

ctx.send_msgs(messages).await
}

async fn run_network(net: impl Network, mut ctx: SubsystemContext<NetworkBridgeMessage>) {
let mut event_stream = net.event_stream().fuse();
let mut local_view = Vec::with_capacity(MAX_VIEW_HEADS);

//let mut peers = HashMap::new();
// Most recent heads are at the back.
let mut live_heads = Vec::with_capacity(MAX_VIEW_HEADS);
let mut local_view = View(Vec::new());

let mut peers = HashMap::new();
let mut event_producers = HashMap::new();

loop {
let subsystem_next = ctx.recv().fuse();
let mut net_event_next = event_stream.next().fuse();
futures::pin_mut!(subsystem_next);

let action = futures::select! {
subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)),
net_event = net_event_next => action_from_network_message(net_event),
let action = {
let subsystem_next = ctx.recv().fuse();
let mut net_event_next = event_stream.next().fuse();
futures::pin_mut!(subsystem_next);

let action = futures::select! {
subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)),
net_event = net_event_next => action_from_network_message(net_event),
};

match action {
Some(a) => a,
None => continue,
}
};

let action = match action {
None => continue,
Some(a) => a,
let update_view = |peers: &HashMap<PeerId, PeerData>, live_heads, local_view: &mut View| {
let new_view = construct_view(live_heads);
if *local_view == new_view { return None }
*local_view = new_view.clone();

let message = WireMessage::ViewUpdate(new_view).encode();
for peer in peers.keys().cloned() {
net.write_notification(peer, message.clone())
}

Some(NetworkBridgeEvent::OurViewChange(local_view.clone()))
};

match action {
Expand All @@ -232,28 +272,112 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext<NetworkBridgeM
net.report_peer(peer, rep)
}
Action::StartWork(relay_parent) => {
local_view.push(relay_parent);
// TODO [now]: send view change.
live_heads.push(relay_parent);
if let Some(view_update) = update_view(&peers, &live_heads, &mut local_view) {
if let Err(_) = dispatch_update_to_all(
view_update,
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return
}
}
}
Action::StopWork(relay_parent) => {
local_view.retain(|h| h != &relay_parent)
// TODO [now]: send view change.
live_heads.retain(|h| h != &relay_parent);
if let Some(view_update) = update_view(&peers, &live_heads, &mut local_view) {
if let Err(_) = dispatch_update_to_all(
view_update,
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return
}
}
}

Action::PeerConnected(peer, role) => {

match peers.entry(peer.clone()) {
Entry::Occupied(_) => continue,
Entry::Vacant(vacant) => {
vacant.insert(PeerData {
view: View(Vec::new()),
});

if let Err(_) = dispatch_update_to_all(
NetworkBridgeEvent::PeerConnected(peer, role),
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return
}
}
}
}
Action::PeerDisconnected(peer) => {

},
Action::PeerMalformedMessage(peer) => {

if peers.remove(&peer).is_some() {
if let Err(_) = dispatch_update_to_all(
NetworkBridgeEvent::PeerDisconnected(peer),
event_producers.values(),
&mut ctx,
).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return
}
}
},
Action::PeerMessages(peer, messages) => {
let peer_data = match peers.get_mut(&peer) {
None => continue,
Some(d) => d,
};

let mut outgoing_messages = Vec::with_capacity(messages.len());
for message in messages {
match message {
WireMessage::ViewUpdate(new_view) => {
if new_view.0.len() > MAX_VIEW_HEADS {
net.report_peer(peer.clone(), MALFORMED_VIEW_COST);
continue
}

if new_view == peer_data.view { continue }
peer_data.view = new_view;

let update = NetworkBridgeEvent::PeerViewChange(
peer.clone(),
peer_data.view.clone(),
);

outgoing_messages.extend(
event_producers.values().map(|producer| producer(update.clone()))
);
}
WireMessage::ProtocolMessage(protocol, message) => {
let message = match event_producers.get(&protocol) {
Some(producer) => Some(producer(
NetworkBridgeEvent::PeerMessage(peer.clone(), message)
)),
None => {
net.report_peer(peer.clone(), UNKNOWN_PROTO_COST);
None
}
};

if let Some(message) = message {
outgoing_messages.push(message);
}
}
}
}

},
Action::PeerViewChange(peer, new_view) => {

if let Err(_) = ctx.send_msgs(outgoing_messages).await {
log::warn!("Aborting - Failure to dispatch messages to overseer");
return
}
},

Action::Abort => return,
Expand Down