Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
54873ea
network bridge skeleton
rphmeier Jun 17, 2020
5eea3bf
move some primitives around and add debug impls
rphmeier Jun 17, 2020
27456b5
protocol registration glue & abstract network interface
rphmeier Jun 17, 2020
3bf17fa
add send_msgs to subsystemctx
rphmeier Jun 18, 2020
e05abd2
select logic
rphmeier Jun 18, 2020
ba005c9
transform different events into actions and handle
rphmeier Jun 18, 2020
7572bba
implement remaining network bridge state machine
rphmeier Jun 18, 2020
24a91c4
start test skeleton
rphmeier Jun 22, 2020
064a00c
make network methods asynchronous
rphmeier Jun 22, 2020
c116061
extract subsystem out to subsystem crate
rphmeier Jun 24, 2020
e6162de
port over overseer to subsystem context trait
rphmeier Jun 24, 2020
844a9d1
fix minimal example
rphmeier Jun 24, 2020
cae1561
fix overseer doc test
rphmeier Jun 24, 2020
2c9be73
update network-bridge crate
rphmeier Jun 24, 2020
534535f
write a subsystem test-helpers crate
rphmeier Jun 25, 2020
8ac6269
write a network test helper for network-bridge
rphmeier Jun 25, 2020
51ff607
set up (broken) view test
rphmeier Jun 25, 2020
960029b
Revamp network to be more async-friendly and not require Sync
rphmeier Jun 25, 2020
eb52f9a
fix spacing
rphmeier Jun 25, 2020
f6526c4
fix test compilation
rphmeier Jun 25, 2020
0f5a1b1
insert side-channel for actions
rphmeier Jun 25, 2020
8f75746
Add some more message types to AllMessages
rphmeier Jun 25, 2020
940216b
introduce a test harness
rphmeier Jun 25, 2020
859bd8b
add some tests
rphmeier Jun 26, 2020
c504d12
Merge branch 'master' into rh-network-bridge
rphmeier Jun 26, 2020
4343f5c
ensure service compiles and passes tests
rphmeier Jun 26, 2020
98c4c54
fix typo
rphmeier Jun 26, 2020
d194838
fix service-new compilation
rphmeier Jun 26, 2020
d468a2b
Subsystem test helpers send messages synchronously
rphmeier Jun 27, 2020
b676adc
remove smelly action inspector
rphmeier Jun 27, 2020
7d44a62
remove superfluous let binding
rphmeier Jun 27, 2020
989e8b5
fix warnings
rphmeier Jun 27, 2020
64792f7
Update node/network/bridge/src/lib.rs
rphmeier Jun 29, 2020
50b51dd
fix compilation
rphmeier Jun 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove smelly action inspector
  • Loading branch information
rphmeier committed Jun 27, 2020
commit b676adcfa19c895c38f39a6c2dec9d7745fb8f49
172 changes: 42 additions & 130 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub enum NetworkAction {
}

/// An abstraction over networking for the purposes of this subsystem.
pub trait Network: Clone + Send + 'static {
pub trait Network: Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID).
Expand Down Expand Up @@ -161,25 +161,33 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
}

/// The network bridge subsystem.
pub struct NetworkBridge<N>(N);
pub struct NetworkBridge<N>(Option<N>);

impl<N> NetworkBridge<N> {
/// Create a new network bridge subsystem with underlying network service.
///
/// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info).
pub fn new(net_service: N) -> Self {
NetworkBridge(net_service)
NetworkBridge(Some(net_service))
}
}

impl<N: Network, C> Subsystem<C> for NetworkBridge<N>
where C: SubsystemContext<Message=NetworkBridgeMessage>
{
fn start(&mut self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
SpawnedSubsystem(run_network(self.0.clone(), ctx, |_| ()).map(|_| ()).boxed())
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(match self.0.take() {
None => async move { for _ in ctx.recv().await { } }.boxed(),
Some(net) => {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
run_network(net, ctx).map(|_| ()).boxed()
}
})



}
}

Expand Down Expand Up @@ -307,7 +315,6 @@ async fn update_view(
async fn run_network<N: Network>(
mut net: N,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
action_inspector: impl Fn(&Action), // side-channel for tests to inspect internals
) -> SubsystemResult<()> {
let mut event_stream = net.event_stream().fuse();

Expand Down Expand Up @@ -335,8 +342,6 @@ async fn run_network<N: Network>(
}
};

action_inspector(&action);

match action {
Action::RegisterEventProducer(protocol_id, event_producer) => {
// insert only if none present.
Expand Down Expand Up @@ -530,67 +535,26 @@ mod tests {
use assert_matches::assert_matches;

use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage};
use subsystem_test::{SingleItemSink, SingleItemStream};

// The subsystem's view of the network - only supports a single call to `event_stream`.
#[derive(Clone)]
struct TestNetwork {
net_events: Arc<Mutex<Option<mpsc::UnboundedReceiver<NetworkEvent>>>>,
net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
action_tx: mpsc::UnboundedSender<NetworkAction>,
}

// The test's view of the network. This receives updates from the subsystem in the form
// of `NetworkAction`s.
struct TestNetworkHandle {
action_rx: mpsc::UnboundedReceiver<NetworkAction>,
net_tx: mpsc::UnboundedSender<NetworkEvent>,
}

// a record of an action internal to the network.
#[derive(Debug)]
enum InternalActionRecord {
RegisterEventProducer(ProtocolId),
SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
ReportPeer(PeerId, ReputationChange),
StartWork(Hash),
StopWork(Hash),

PeerConnected(PeerId, ObservedRole),
PeerDisconnected(PeerId),
PeerMessages(PeerId, Vec<WireMessage>),

Abort,
}

impl<'a> From<&'a Action> for InternalActionRecord {
fn from(action: &'a Action) -> Self {
match *action {
Action::RegisterEventProducer(protocol, _)
=> InternalActionRecord::RegisterEventProducer(protocol),
Action::SendMessage(ref peers, protocol, ref message)
=> InternalActionRecord::SendMessage(peers.clone(), protocol, message.clone()),
Action::ReportPeer(ref peer, rep)
=> InternalActionRecord::ReportPeer(peer.clone(), rep.clone()),

Action::StartWork(hash) => InternalActionRecord::StartWork(hash),
Action::StopWork(hash) => InternalActionRecord::StopWork(hash),

Action::PeerConnected(ref peer, ref role)
=> InternalActionRecord::PeerConnected(peer.clone(), role.clone()),
Action::PeerDisconnected(ref peer)
=> InternalActionRecord::PeerDisconnected(peer.clone()),
Action::PeerMessages(ref peer, ref messages)
=> InternalActionRecord::PeerMessages(peer.clone(), messages.clone()),

Action::Abort => InternalActionRecord::Abort,
}
}
net_tx: SingleItemSink<NetworkEvent>,
}

fn new_test_network() -> (
TestNetwork,
TestNetworkHandle,
) {
let (net_tx, net_rx) = mpsc::unbounded();
let (net_tx, net_rx) = subsystem_test::single_item_sink();
let (action_tx, action_rx) = mpsc::unbounded();

(
Expand Down Expand Up @@ -636,30 +600,30 @@ mod tests {
v
}

fn connect_peer(&self, peer: PeerId, role: ObservedRole) {
async fn connect_peer(&mut self, peer: PeerId, role: ObservedRole) {
self.send_network_event(NetworkEvent::NotificationStreamOpened {
remote: peer,
engine_id: POLKADOT_ENGINE_ID,
role,
});
}).await;
}

fn disconnect_peer(&self, peer: PeerId) {
async fn disconnect_peer(&mut self, peer: PeerId) {
self.send_network_event(NetworkEvent::NotificationStreamClosed {
remote: peer,
engine_id: POLKADOT_ENGINE_ID,
});
}).await;
}

fn peer_message(&self, peer: PeerId, message: Vec<u8>) {
async fn peer_message(&mut self, peer: PeerId, message: Vec<u8>) {
self.send_network_event(NetworkEvent::NotificationsReceived {
remote: peer,
messages: vec![(POLKADOT_ENGINE_ID, message.into())],
});
}).await;
}

fn send_network_event(&self, event: NetworkEvent) {
self.net_tx.unbounded_send(event).expect("subsystem concluded early");
async fn send_network_event(&mut self, event: NetworkEvent) {
self.net_tx.send(event).await.expect("subsystem concluded early");
}
}

Expand All @@ -672,28 +636,24 @@ mod tests {
struct TestHarness {
network_handle: TestNetworkHandle,
virtual_overseer: subsystem_test::TestSubsystemContextHandle<NetworkBridgeMessage>,
action_rx: mpsc::UnboundedReceiver<InternalActionRecord>,
}

fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
let pool = ThreadPool::new().unwrap();

let (network, network_handle) = new_test_network();
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool);
let (action_tx, action_rx) = mpsc::unbounded::<InternalActionRecord>();

let network_bridge = run_network(
network,
context,
move |action| { let _ = action_tx.unbounded_send(action.into()); },
)
.map_err(|_| panic!("subsystem execution failed"))
.map(|_| ());

let test_fut = test(TestHarness {
network_handle,
virtual_overseer,
action_rx,
});

futures::pin_mut!(test_fut);
Expand All @@ -705,31 +665,17 @@ mod tests {
#[test]
fn sends_view_updates_to_peers() {
test_harness(|test_harness| async move {
let TestHarness { mut network_handle, virtual_overseer, mut action_rx } = test_harness;
let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;

let peer_a = PeerId::random();
let peer_b = PeerId::random();

network_handle.connect_peer(peer_a.clone(), ObservedRole::Full);
network_handle.connect_peer(peer_b.clone(), ObservedRole::Full);

assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::PeerConnected(p, ObservedRole::Full) => {
assert_eq!(p, peer_a);
}
);

assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::PeerConnected(p, ObservedRole::Full) => {
assert_eq!(p, peer_b);
}
);
network_handle.connect_peer(peer_a.clone(), ObservedRole::Full).await;
network_handle.connect_peer(peer_b.clone(), ObservedRole::Full).await;

let hash_a = Hash::from([1; 32]);

virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a)));
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))).await;

let actions = network_handle.next_network_actions(2).await;
let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode();
Expand All @@ -749,23 +695,16 @@ mod tests {
fn peer_view_updates_sent_via_overseer() {
test_harness(|test_harness| async move {
let TestHarness {
network_handle,
mut network_handle,
mut virtual_overseer,
mut action_rx,
} = test_harness;

let peer = PeerId::random();

let proto_statement = *b"abcd";
let proto_bitfield = *b"wxyz";

network_handle.connect_peer(peer.clone(), ObservedRole::Full);
assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::PeerConnected(p, ObservedRole::Full) => {
assert_eq!(p, peer);
}
);
network_handle.connect_peer(peer.clone(), ObservedRole::Full).await;

virtual_overseer.send(FromOverseer::Communication {
msg: NetworkBridgeMessage::RegisterEventProducer(
Expand All @@ -774,7 +713,7 @@ mod tests {
StatementDistributionMessage::NetworkBridgeUpdate(event)
)
),
});
}).await;

virtual_overseer.send(FromOverseer::Communication {
msg: NetworkBridgeMessage::RegisterEventProducer(
Expand All @@ -783,17 +722,7 @@ mod tests {
BitfieldDistributionMessage::NetworkBridgeUpdate(event)
)
),
});

assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::RegisterEventProducer(x) if x == proto_statement
);

assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::RegisterEventProducer(x) if x == proto_bitfield
);
}).await;

let view = View(vec![Hash::from([1u8; 32])]);

Expand Down Expand Up @@ -839,7 +768,7 @@ mod tests {
network_handle.peer_message(
peer.clone(),
WireMessage::ViewUpdate(view.clone()).encode(),
);
).await;

// statement distribution message comes first because handlers are ordered by
// protocol ID.
Expand Down Expand Up @@ -874,23 +803,16 @@ mod tests {
fn peer_messages_sent_via_overseer() {
test_harness(|test_harness| async move {
let TestHarness {
network_handle,
mut network_handle,
mut virtual_overseer,
mut action_rx,
} = test_harness;

let peer = PeerId::random();

let proto_statement = *b"abcd";
let proto_bitfield = *b"wxyz";

network_handle.connect_peer(peer.clone(), ObservedRole::Full);
assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::PeerConnected(p, ObservedRole::Full) => {
assert_eq!(p, peer);
}
);
network_handle.connect_peer(peer.clone(), ObservedRole::Full).await;

virtual_overseer.send(FromOverseer::Communication {
msg: NetworkBridgeMessage::RegisterEventProducer(
Expand All @@ -899,7 +821,7 @@ mod tests {
StatementDistributionMessage::NetworkBridgeUpdate(event)
)
),
});
}).await;

virtual_overseer.send(FromOverseer::Communication {
msg: NetworkBridgeMessage::RegisterEventProducer(
Expand All @@ -908,17 +830,7 @@ mod tests {
BitfieldDistributionMessage::NetworkBridgeUpdate(event)
)
),
});

assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::RegisterEventProducer(x) if x == proto_statement
);

assert_matches!(
action_rx.next().await.unwrap(),
InternalActionRecord::RegisterEventProducer(x) if x == proto_bitfield
);
}).await;

// bridge will inform about all previously-connected peers.
{
Expand Down Expand Up @@ -964,9 +876,9 @@ mod tests {
network_handle.peer_message(
peer.clone(),
WireMessage::ProtocolMessage(proto_statement, payload.clone()).encode(),
);
).await;

network_handle.disconnect_peer(peer.clone());
network_handle.disconnect_peer(peer.clone()).await;

// statement distribution message comes first because handlers are ordered by
// protocol ID, and then a disconnection event comes - indicating that the message
Expand Down
6 changes: 6 additions & 0 deletions node/subsystem/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ impl From<futures::task::SpawnError> for SubsystemError {
}
}

impl From<std::convert::Infallible> for SubsystemError {
fn from(e: std::convert::Infallible) -> Self {
match e {}
}
}

/// An asynchronous subsystem task..
///
/// In essence it's just a newtype wrapping a `BoxFuture`.
Expand Down