Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
54873ea
network bridge skeleton
rphmeier Jun 17, 2020
5eea3bf
move some primitives around and add debug impls
rphmeier Jun 17, 2020
27456b5
protocol registration glue & abstract network interface
rphmeier Jun 17, 2020
3bf17fa
add send_msgs to subsystemctx
rphmeier Jun 18, 2020
e05abd2
select logic
rphmeier Jun 18, 2020
ba005c9
transform different events into actions and handle
rphmeier Jun 18, 2020
7572bba
implement remaining network bridge state machine
rphmeier Jun 18, 2020
24a91c4
start test skeleton
rphmeier Jun 22, 2020
064a00c
make network methods asynchronous
rphmeier Jun 22, 2020
c116061
extract subsystem out to subsystem crate
rphmeier Jun 24, 2020
e6162de
port over overseer to subsystem context trait
rphmeier Jun 24, 2020
844a9d1
fix minimal example
rphmeier Jun 24, 2020
cae1561
fix overseer doc test
rphmeier Jun 24, 2020
2c9be73
update network-bridge crate
rphmeier Jun 24, 2020
534535f
write a subsystem test-helpers crate
rphmeier Jun 25, 2020
8ac6269
write a network test helper for network-bridge
rphmeier Jun 25, 2020
51ff607
set up (broken) view test
rphmeier Jun 25, 2020
960029b
Revamp network to be more async-friendly and not require Sync
rphmeier Jun 25, 2020
eb52f9a
fix spacing
rphmeier Jun 25, 2020
f6526c4
fix test compilation
rphmeier Jun 25, 2020
0f5a1b1
insert side-channel for actions
rphmeier Jun 25, 2020
8f75746
Add some more message types to AllMessages
rphmeier Jun 25, 2020
940216b
introduce a test harness
rphmeier Jun 25, 2020
859bd8b
add some tests
rphmeier Jun 26, 2020
c504d12
Merge branch 'master' into rh-network-bridge
rphmeier Jun 26, 2020
4343f5c
ensure service compiles and passes tests
rphmeier Jun 26, 2020
98c4c54
fix typo
rphmeier Jun 26, 2020
d194838
fix service-new compilation
rphmeier Jun 26, 2020
d468a2b
Subsystem test helpers send messages synchronously
rphmeier Jun 27, 2020
b676adc
remove smelly action inspector
rphmeier Jun 27, 2020
7d44a62
remove superfluous let binding
rphmeier Jun 27, 2020
989e8b5
fix warnings
rphmeier Jun 27, 2020
64792f7
Update node/network/bridge/src/lib.rs
rphmeier Jun 29, 2020
50b51dd
fix compilation
rphmeier Jun 30, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
insert side-channel for actions
  • Loading branch information
rphmeier committed Jun 25, 2020
commit 0f5a1b109e85cf4041da163c468ab8c299801c5b
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/network/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
95 changes: 83 additions & 12 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const MALFORMED_VIEW_COST: ReputationChange
= ReputationChange::new(-500, "Malformed view");

/// Messages received on the network.
#[derive(Encode, Decode)]
#[derive(Debug, Encode, Decode, Clone)]
pub enum WireMessage {
/// A message from a peer on a specific protocol.
#[codec(index = "1")]
Expand Down Expand Up @@ -178,7 +178,7 @@ impl<N: Network, C> Subsystem<C> for NetworkBridge<N>
fn start(&mut self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
SpawnedSubsystem(run_network(self.0.clone(), ctx).map(|_| ()).boxed())
SpawnedSubsystem(run_network(self.0.clone(), ctx, |_| ()).map(|_| ()).boxed())
}
}

Expand All @@ -187,6 +187,7 @@ struct PeerData {
view: View,
}

#[derive(Debug)]
enum Action {
RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),
SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
Expand Down Expand Up @@ -227,7 +228,7 @@ fn action_from_overseer_message(
fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> {
match event {
None => {
log::warn!("Shutting down Network Bridge: underlying event stream concluded");
log::info!("Shutting down Network Bridge: underlying event stream concluded");
Some(Action::Abort)
}
Some(NetworkEvent::Dht(_)) => None,
Expand Down Expand Up @@ -303,6 +304,7 @@ async fn update_view(
async fn run_network<N: Network>(
mut net: N,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
action_inspector: impl Fn(&Action), // side-channel for tests to inspect internals
) -> SubsystemResult<()> {
let mut event_stream = net.event_stream().fuse();

Expand Down Expand Up @@ -330,6 +332,8 @@ async fn run_network<N: Network>(
}
};

action_inspector(&action);

match action {
Action::RegisterEventProducer(protocol_id, event_producer) => {
// insert only if none present.
Expand Down Expand Up @@ -505,6 +509,7 @@ mod tests {

use std::sync::Arc;
use parking_lot::Mutex;
use assert_matches::assert_matches;

// The subsystem's view of the network - only supports a single call to `event_stream`.
#[derive(Clone)]
Expand All @@ -520,6 +525,47 @@ mod tests {
net_tx: mpsc::UnboundedSender<NetworkEvent>,
}

// a record of an action internal to the network.
#[derive(Debug)]
enum InternalActionRecord {
RegisterEventProducer(ProtocolId),
SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
ReportPeer(PeerId, ReputationChange),
StartWork(Hash),
StopWork(Hash),

PeerConnected(PeerId, ObservedRole),
PeerDisconnected(PeerId),
PeerMessages(PeerId, Vec<WireMessage>),

Abort,
}

impl<'a> From<&'a Action> for InternalActionRecord {
fn from(action: &'a Action) -> Self {
match *action {
Action::RegisterEventProducer(protocol, _)
=> InternalActionRecord::RegisterEventProducer(protocol),
Action::SendMessage(ref peers, protocol, ref message)
=> InternalActionRecord::SendMessage(peers.clone(), protocol, message.clone()),
Action::ReportPeer(ref peer, rep)
=> InternalActionRecord::ReportPeer(peer.clone(), rep.clone()),

Action::StartWork(hash) => InternalActionRecord::StartWork(hash),
Action::StopWork(hash) => InternalActionRecord::StopWork(hash),

Action::PeerConnected(ref peer, ref role)
=> InternalActionRecord::PeerConnected(peer.clone(), role.clone()),
Action::PeerDisconnected(ref peer)
=> InternalActionRecord::PeerDisconnected(peer.clone()),
Action::PeerMessages(ref peer, ref messages)
=> InternalActionRecord::PeerMessages(peer.clone(), messages.clone()),

Action::Abort => InternalActionRecord::Abort,
}
}
}

fn new_test_network() -> (
TestNetwork,
TestNetworkHandle,
Expand Down Expand Up @@ -597,6 +643,8 @@ mod tests {
}
}

// network actions are sensitive to ordering of `PeerId`s within a `HashMap`, so
// we need to use this to prevent fragile reliance on peer ordering.
fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool {
actions.iter().find(|&x| x == action).is_some()
}
Expand All @@ -606,20 +654,38 @@ mod tests {
let pool = ThreadPool::new().unwrap();

let (network, mut network_handle) = new_test_network();
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone());
pool.spawn_ok(
run_network(network, context)
.map_err(|_| panic!("subsystem execution failed"))
.map(|_| ())
);

executor::block_on(async move {
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool);
let (action_tx, mut action_rx) = mpsc::unbounded::<InternalActionRecord>();

let network_bridge = run_network(
network,
context,
move |action| { let _ = action_tx.unbounded_send(action.into()); },
)
.map_err(|_| panic!("subsystem execution failed"))
.map(|_| ());

let test_fut = async move {
let peer_a = PeerId::random();
let peer_b = PeerId::random();

network_handle.connect_peer(peer_a.clone(), ObservedRole::Full);
network_handle.connect_peer(peer_b.clone(), ObservedRole::Full);

assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::PeerConnected(p, ObservedRole::Full) => {
assert_eq!(p, peer_a);
}
);

assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::PeerConnected(p, ObservedRole::Full) => {
assert_eq!(p, peer_b);
}
);

let hash_a = Hash::from([1; 32]);

virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a)));
Expand All @@ -635,7 +701,12 @@ mod tests {
&actions,
&NetworkAction::WriteNotification(peer_b, wire_message.clone()),
));
});
};

futures::pin_mut!(test_fut);
futures::pin_mut!(network_bridge);

executor::block_on(future::select(test_fut, network_bridge));
}


Expand Down