Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
54873ea
network bridge skeleton
rphmeier Jun 17, 2020
5eea3bf
move some primitives around and add debug impls
rphmeier Jun 17, 2020
27456b5
protocol registration glue & abstract network interface
rphmeier Jun 17, 2020
3bf17fa
add send_msgs to subsystemctx
rphmeier Jun 18, 2020
e05abd2
select logic
rphmeier Jun 18, 2020
ba005c9
transform different events into actions and handle
rphmeier Jun 18, 2020
7572bba
implement remaining network bridge state machine
rphmeier Jun 18, 2020
24a91c4
start test skeleton
rphmeier Jun 22, 2020
064a00c
make network methods asynchronous
rphmeier Jun 22, 2020
c116061
extract subsystem out to subsystem crate
rphmeier Jun 24, 2020
e6162de
port over overseer to subsystem context trait
rphmeier Jun 24, 2020
844a9d1
fix minimal example
rphmeier Jun 24, 2020
cae1561
fix overseer doc test
rphmeier Jun 24, 2020
2c9be73
update network-bridge crate
rphmeier Jun 24, 2020
534535f
write a subsystem test-helpers crate
rphmeier Jun 25, 2020
8ac6269
write a network test helper for network-bridge
rphmeier Jun 25, 2020
51ff607
set up (broken) view test
rphmeier Jun 25, 2020
960029b
Revamp network to be more async-friendly and not require Sync
rphmeier Jun 25, 2020
eb52f9a
fix spacing
rphmeier Jun 25, 2020
f6526c4
fix test compilation
rphmeier Jun 25, 2020
0f5a1b1
insert side-channel for actions
rphmeier Jun 25, 2020
8f75746
Add some more message types to AllMessages
rphmeier Jun 25, 2020
940216b
introduce a test harness
rphmeier Jun 25, 2020
859bd8b
add some tests
rphmeier Jun 26, 2020
c504d12
Merge branch 'master' into rh-network-bridge
rphmeier Jun 26, 2020
4343f5c
ensure service compiles and passes tests
rphmeier Jun 26, 2020
98c4c54
fix typo
rphmeier Jun 26, 2020
d194838
fix service-new compilation
rphmeier Jun 26, 2020
d468a2b
Subsystem test helpers send messages synchronously
rphmeier Jun 27, 2020
b676adc
remove smelly action inspector
rphmeier Jun 27, 2020
7d44a62
remove superfluous let binding
rphmeier Jun 27, 2020
989e8b5
fix warnings
rphmeier Jun 27, 2020
64792f7
Update node/network/bridge/src/lib.rs
rphmeier Jun 29, 2020
50b51dd
fix compilation
rphmeier Jun 30, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make network methods asynchronous
  • Loading branch information
rphmeier committed Jun 22, 2020
commit 064a00c244c6c1c6a240e6998850fbe5ddc52edc
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 43 additions & 28 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
use futures::future::BoxFuture;
use futures::stream::BoxStream;

use sc_network::{
Expand Down Expand Up @@ -75,23 +76,25 @@ pub trait Network: Clone + Send + Sync + 'static {
fn event_stream(&self) -> BoxStream<NetworkEvent>;

/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange);
fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()>;

/// Write a notification to a peer on the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID) topic.
fn write_notification(&self, who: PeerId, message: Vec<u8>);
fn write_notification(&self, who: PeerId, message: Vec<u8>) -> BoxFuture<()>;
}

impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
fn event_stream(&self) -> BoxStream<NetworkEvent> {
sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}

fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
sc_network::NetworkService::report_peer(self, who, cost_benefit)
fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()> {
sc_network::NetworkService::report_peer(self, who, cost_benefit);
future::ready(()).boxed()
}

fn write_notification(&self, who: PeerId, message: Vec<u8>) {
sc_network::NetworkService::write_notification(self, who, POLKADOT_ENGINE_ID, message)
fn write_notification(&self, who: PeerId, message: Vec<u8>) -> BoxFuture<()> {
sc_network::NetworkService::write_notification(self, who, POLKADOT_ENGINE_ID, message);
future::ready(()).boxed()
}
}

Expand Down Expand Up @@ -212,6 +215,27 @@ async fn dispatch_update_to_all(
ctx.send_msgs(messages).await
}

async fn update_view(
peers: &HashMap<PeerId, PeerData>,
live_heads: &[Hash],
net: &impl Network,
local_view: &mut View,
) -> Option<NetworkBridgeEvent> {
let new_view = construct_view(live_heads);
if *local_view == new_view { return None }
*local_view = new_view.clone();

let message = WireMessage::ViewUpdate(new_view.clone()).encode();

let write_all = peers.keys().cloned().map(|peer| {
net.write_notification(peer, message.clone())
});

future::join_all(write_all).await;

Some(NetworkBridgeEvent::OurViewChange(local_view.clone()))
}

async fn run_network(net: impl Network, mut ctx: SubsystemContext<NetworkBridgeMessage>) {
let mut event_stream = net.event_stream().fuse();

Expand Down Expand Up @@ -239,19 +263,6 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext<NetworkBridgeM
}
};

let update_view = |peers: &HashMap<PeerId, PeerData>, live_heads, local_view: &mut View| {
let new_view = construct_view(live_heads);
if *local_view == new_view { return None }
*local_view = new_view.clone();

let message = WireMessage::ViewUpdate(new_view).encode();
for peer in peers.keys().cloned() {
net.write_notification(peer, message.clone())
}

Some(NetworkBridgeEvent::OurViewChange(local_view.clone()))
};

match action {
Action::RegisterEventProducer(protocol_id, event_producer) => {
// insert only if none present.
Expand All @@ -261,19 +272,21 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext<NetworkBridgeM
let message = WireMessage::ProtocolMessage(protocol, message).encode();

for peer in peers.iter().skip(1).cloned() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for sending a message to all but the first one to only then send the same message to the first one?

Copy link
Contributor Author

@rphmeier rphmeier Jun 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It prevents a clone if we are sending only to one peer, and these messages can be pretty large in practical situations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious whether this loop unrolling would be more performant than the optimizations rustc and llvm do already.

I wrote a small benchmark https://github.com/mxinden/clone-trick/blob/master/benches/bench.rs distributing a message of 1 KB, 10 KB, 100 KB and 1 MB among 20 peers.

image

The manual loop unrolling does have a performance impact, but a small one. While the simple implementation takes 3.9 us to distribute a 10 KB message to 20 peers the optimized implementation takes 3.6 us (on a i7-8550U).

I would expect that if write_notification accepts some type that allows sharing of the underlying allocation (e.g. Bytes or Arc<Vec<u8>> like we do for NotificationsReceived the entire memory allocation overhead would vanish.

@rphmeier in case you want to keep the optimization in here (e.g. for the impact it has in case there is only a single peer and thus no clone needed), would you mind adding a comment describing why this is not a single for loop?

Copy link
Contributor Author

@rphmeier rphmeier Jun 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a memory optimization, not a performance one. Computers are pretty fast, but if I'm sending a 10MiB message to 20 peers I'd rather keep memory usage down by 5% while this is buffered. This would be solved by having the lower-level APIs take Bytes, BTW, and the comment I added yesterday (but didn't push) notes that.

Copy link
Contributor Author

@rphmeier rphmeier Jun 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be addressed by the lower level APIs accepting a set of peers to send a message to. I don't view this optimization as premature. PoVs are likely to be in the range of 1-10 MiB and we gossip them to all our peers (because peer set management and validator authentication still isn't in a state where we can do something more targeted).

The underlying network and discovery deficiencies are the forest for the trees on this optimization - get those working, and we won't need to use tricks like this. Unfortunately, rustc and LLVM are not smart enough to do that for us...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if I'm sending a 10MiB message to 20 peers I'd rather keep memory usage down by 5% while this is buffered

Are you suggesting that this optimization keeps the resident set size in memory low? How does it do that?

Say we send a message to 20 peers. With the un-optimized implementation the message would be cloned 20 times, thus for a short amount of time the message would exist 21 times in memory. But this 21st (useless) copy of the message is dropped right at the end of the function scope, thus this optimization only reduces the total resident set size for the duration of the function scope, which I would guess is negligible.

Copy link
Contributor Author

@rphmeier rphmeier Jun 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, every little bit counts. Given that we're awaiting here before dropping the last clone, there's no guarantee how long that will take.

net.write_notification(peer, message.clone());
net.write_notification(peer, message.clone()).await;
}

if let Some(peer) = peers.first() {
net.write_notification(peer.clone(), message);
net.write_notification(peer.clone(), message).await;
}
}
Action::ReportPeer(peer, rep) => {
net.report_peer(peer, rep)
net.report_peer(peer, rep).await
}
Action::StartWork(relay_parent) => {
live_heads.push(relay_parent);
if let Some(view_update) = update_view(&peers, &live_heads, &mut local_view) {
if let Some(view_update)
= update_view(&peers, &live_heads, &net, &mut local_view).await
{
if let Err(_) = dispatch_update_to_all(
view_update,
event_producers.values(),
Expand All @@ -286,7 +299,9 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext<NetworkBridgeM
}
Action::StopWork(relay_parent) => {
live_heads.retain(|h| h != &relay_parent);
if let Some(view_update) = update_view(&peers, &live_heads, &mut local_view) {
if let Some(view_update)
= update_view(&peers, &live_heads, &net, &mut local_view).await
{
if let Err(_) = dispatch_update_to_all(
view_update,
event_producers.values(),
Expand Down Expand Up @@ -340,7 +355,7 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext<NetworkBridgeM
match message {
WireMessage::ViewUpdate(new_view) => {
if new_view.0.len() > MAX_VIEW_HEADS {
net.report_peer(peer.clone(), MALFORMED_VIEW_COST);
net.report_peer(peer.clone(), MALFORMED_VIEW_COST).await;
continue
}

Expand All @@ -362,7 +377,7 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext<NetworkBridgeM
NetworkBridgeEvent::PeerMessage(peer.clone(), message)
)),
None => {
net.report_peer(peer.clone(), UNKNOWN_PROTO_COST);
net.report_peer(peer.clone(), UNKNOWN_PROTO_COST).await;
None
}
};
Expand Down Expand Up @@ -411,7 +426,7 @@ mod tests {
TestNetwork,
mpsc::UnboundedSender<NetworkEvent>,
mpsc::UnboundedReceiver<OutgoingEvent>,
{
) {
let (net_tx, net_rx) = mpsc::unbounded();
let (outgoing_tx, outgoing_rx) = mpsc::unbounded();

Expand All @@ -421,7 +436,7 @@ mod tests {
};

(net, net_tx, outgoing_rx)
})
}

impl TestNetwork {

Expand Down