Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
54873ea
network bridge skeleton
rphmeier Jun 17, 2020
5eea3bf
move some primitives around and add debug impls
rphmeier Jun 17, 2020
27456b5
protocol registration glue & abstract network interface
rphmeier Jun 17, 2020
3bf17fa
add send_msgs to subsystemctx
rphmeier Jun 18, 2020
e05abd2
select logic
rphmeier Jun 18, 2020
ba005c9
transform different events into actions and handle
rphmeier Jun 18, 2020
7572bba
implement remaining network bridge state machine
rphmeier Jun 18, 2020
24a91c4
start test skeleton
rphmeier Jun 22, 2020
064a00c
make network methods asynchronous
rphmeier Jun 22, 2020
c116061
extract subsystem out to subsystem crate
rphmeier Jun 24, 2020
e6162de
port over overseer to subsystem context trait
rphmeier Jun 24, 2020
844a9d1
fix minimal example
rphmeier Jun 24, 2020
cae1561
fix overseer doc test
rphmeier Jun 24, 2020
2c9be73
update network-bridge crate
rphmeier Jun 24, 2020
534535f
write a subsystem test-helpers crate
rphmeier Jun 25, 2020
8ac6269
write a network test helper for network-bridge
rphmeier Jun 25, 2020
51ff607
set up (broken) view test
rphmeier Jun 25, 2020
960029b
Revamp network to be more async-friendly and not require Sync
rphmeier Jun 25, 2020
eb52f9a
fix spacing
rphmeier Jun 25, 2020
f6526c4
fix test compilation
rphmeier Jun 25, 2020
0f5a1b1
insert side-channel for actions
rphmeier Jun 25, 2020
8f75746
Add some more message types to AllMessages
rphmeier Jun 25, 2020
940216b
introduce a test harness
rphmeier Jun 25, 2020
859bd8b
add some tests
rphmeier Jun 26, 2020
c504d12
Merge branch 'master' into rh-network-bridge
rphmeier Jun 26, 2020
4343f5c
ensure service compiles and passes tests
rphmeier Jun 26, 2020
98c4c54
fix typo
rphmeier Jun 26, 2020
d194838
fix service-new compilation
rphmeier Jun 26, 2020
d468a2b
Subsystem test helpers send messages synchronously
rphmeier Jun 27, 2020
b676adc
remove smelly action inspector
rphmeier Jun 27, 2020
7d44a62
remove superfluous let binding
rphmeier Jun 27, 2020
989e8b5
fix warnings
rphmeier Jun 27, 2020
64792f7
Update node/network/bridge/src/lib.rs
rphmeier Jun 29, 2020
50b51dd
fix compilation
rphmeier Jun 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
transform different events into actions and handle
  • Loading branch information
rphmeier committed Jun 18, 2020
commit ba005c9ca71d0fc8ab60763d97dfbfbe0b998bfd
168 changes: 137 additions & 31 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ use futures::prelude::*;
use futures::stream::BoxStream;

use sc_network::{
ObservedRole, ReputationChange, PeerId, config::ProtocolId as SubstrateProtocolId,
ObservedRole, ReputationChange, PeerId,
Event as NetworkEvent,
};
use sp_runtime::ConsensusEngineId;

use messages::{NetworkBridgeEvent, NetworkBridgeMessage, FromOverseer, OverseerSignal};
use messages::{
NetworkBridgeEvent, NetworkBridgeMessage, FromOverseer, OverseerSignal, AllMessages,
};
use overseer::{Subsystem, SubsystemContext, SpawnedSubsystem};
use node_primitives::{ProtocolId, View};
use polkadot_primitives::{Block, Hash};
Expand All @@ -43,7 +45,7 @@ pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2";

/// Messages received on the network.
#[derive(Encode, Decode)]
pub enum Message {
pub enum WireMessage {
/// A message from a peer on a specific protocol.
#[codec(index = "1")]
ProtocolMessage(ProtocolId, Vec<u8>),
Expand Down Expand Up @@ -110,47 +112,151 @@ struct PeerData {
view: View,
}

enum Action {
RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),
SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
ReportPeer(PeerId, ReputationChange),
StartWork(Hash),
StopWork(Hash),

PeerConnected(PeerId, ObservedRole),
PeerDisconnected(PeerId),
PeerMalformedMessage(PeerId),
PeerMessages(PeerId, Vec<WireMessage>),
PeerViewChange(PeerId, View),

Abort,
}

fn action_from_overseer_message(
res: overseer::SubsystemResult<FromOverseer<NetworkBridgeMessage>>,
) -> Action {
match res {
Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)))
=> Action::StartWork(relay_parent),
Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)))
=> Action::StopWork(relay_parent),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer)
=> Action::RegisterEventProducer(protocol_id, message_producer),
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
NetworkBridgeMessage::SendMessage(peers, protocol, message)
=> Action::SendMessage(peers, protocol, message),
},
Err(e) => {
log::warn!("Shutting down Network Bridge due to error {:?}", e);
Action::Abort
}
}
}

fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> {
match event {
None => {
log::warn!("Shutting down Network Bridge: underlying event stream concluded");
Some(Action::Abort)
}
Some(NetworkEvent::Dht(_)) => None,
Some(NetworkEvent::NotificationStreamOpened { remote, engine_id, role }) => {
if engine_id == POLKADOT_ENGINE_ID {
Some(Action::PeerConnected(remote, role))
} else {
None
}
}
Some(NetworkEvent::NotificationStreamClosed { remote, engine_id }) => {
if engine_id == POLKADOT_ENGINE_ID {
Some(Action::PeerDisconnected(remote))
} else {
None
}
}
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let v: Result<Vec<_>, _> = messages.iter()
.filter(|(engine_id, _)| engine_id == &POLKADOT_ENGINE_ID)
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();

match v {
Err(_) => Some(Action::PeerMalformedMessage(remote)),
Ok(v) => if v.is_empty() {
None
} else {
Some(Action::PeerMessages(remote, v))
}
}
}
}
}

async fn run_network(net: impl Network, mut ctx: SubsystemContext<NetworkBridgeMessage>) {
let mut event_stream = net.event_stream().fuse();
let mut local_view = Vec::with_capacity(MAX_VIEW_HEADS);

// TODO [now]
// let peers = HashMap::new();
// let event_listeners = HashMap::new();
//let mut peers = HashMap::new();
let mut event_producers = HashMap::new();

loop {
let subsystem_next = ctx.recv().fuse();
let mut net_event_next = event_stream.next().fuse();
futures::pin_mut!(subsystem_next);

futures::select! {
subsystem_msg = subsystem_next => match subsystem_msg {
Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))) => {
// TODO [now]: update local view and send view update to peers.
}
Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))) => {
// TODO [now]: update local view and send view update to peers.
let action = futures::select! {
subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)),
net_event = net_event_next => action_from_network_message(net_event),
};

let action = match action {
None => continue,
Some(a) => a,
};

match action {
Action::RegisterEventProducer(protocol_id, event_producer) => {
// insert only if none present.
event_producers.entry(protocol_id).or_insert(event_producer);
}
Action::SendMessage(peers, protocol, message) => {
let message = WireMessage::ProtocolMessage(protocol, message).encode();

for peer in peers.iter().skip(1).cloned() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for sending a message to all but the first one to only then send the same message to the first one?

Copy link
Contributor Author

@rphmeier rphmeier Jun 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It prevents a clone if we are sending only to one peer, and these messages can be pretty large in practical situations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious whether this loop unrolling would be more performant than the optimizations rustc and llvm do already.

I wrote a small benchmark https://github.com/mxinden/clone-trick/blob/master/benches/bench.rs distributing a message of 1 KB, 10 KB, 100 KB and 1 MB among 20 peers.

image

The manual loop unrolling does have a performance impact, but a small one. While the simple implementation takes 3.9 us to distribute a 10 KB message to 20 peers the optimized implementation takes 3.6 us (on a i7-8550U).

I would expect that if write_notification accepts some type that allows sharing of the underlying allocation (e.g. Bytes or Arc<Vec<u8>> like we do for NotificationsReceived the entire memory allocation overhead would vanish.

@rphmeier in case you want to keep the optimization in here (e.g. for the impact it has in case there is only a single peer and thus no clone needed), would you mind adding a comment describing why this is not a single for loop?

Copy link
Contributor Author

@rphmeier rphmeier Jun 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a memory optimization, not a performance one. Computers are pretty fast, but if I'm sending a 10MiB message to 20 peers I'd rather keep memory usage down by 5% while this is buffered. This would be solved by having the lower-level APIs take Bytes, BTW, and the comment I added yesterday (but didn't push) notes that.

Copy link
Contributor Author

@rphmeier rphmeier Jun 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be addressed by the lower level APIs accepting a set of peers to send a message to. I don't view this optimization as premature. PoVs are likely to be in the range of 1-10 MiB and we gossip them to all our peers (because peer set management and validator authentication still isn't in a state where we can do something more targeted).

The underlying network and discovery deficiencies are the forest for the trees on this optimization - get those working, and we won't need to use tricks like this. Unfortunately, rustc and LLVM are not smart enough to do that for us...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if I'm sending a 10MiB message to 20 peers I'd rather keep memory usage down by 5% while this is buffered

Are you suggesting that this optimization keeps the resident set size in memory low? How does it do that?

Say we send a message to 20 peers. With the un-optimized implementation the message would be cloned 20 times, thus for a short amount of time the message would exist 21 times in memory. But this 21st (useless) copy of the message is dropped right at the end of the function scope, thus this optimization only reduces the total resident set size for the duration of the function scope, which I would guess is negligible.

Copy link
Contributor Author

@rphmeier rphmeier Jun 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, every little bit counts. Given that we're awaiting here before dropping the last clone, there's no guarantee how long that will take.

net.write_notification(peer, message.clone());
}
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer) => {
// TODO [now]: add event producer.
}
NetworkBridgeMessage::ReportPeer(peer, rep) => {
// TODO [now]: report a peer to network service.
}
NetworkBridgeMessage::SendMessage(peers, protocol, message) => {
// TODO [now]: Send the message to all peers with `write_notification`.
}
},
Err(e) => {
// TODO [now]: log error.
return;

if let Some(peer) = peers.first() {
net.write_notification(peer.clone(), message);
}
}
Action::ReportPeer(peer, rep) => {
net.report_peer(peer, rep)
}
Action::StartWork(relay_parent) => {
local_view.push(relay_parent);
// TODO [now]: send view change.
}
Action::StopWork(relay_parent) => {
local_view.retain(|h| h != &relay_parent)
// TODO [now]: send view change.
}

Action::PeerConnected(peer, role) => {

}
Action::PeerDisconnected(peer) => {

},
Action::PeerMalformedMessage(peer) => {

},
net_event = net_event_next => {
// TODO [now]: Update peer tracker, filter out anything not to do with this
// engine, and transform all updates to be sent to the overseer.
Action::PeerMessages(peer, messages) => {

},
Action::PeerViewChange(peer, new_view) => {

},

Action::Abort => return,
}
}
}