Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/network/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ polkadot-primitives = { path = "../../../primitives" }
parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] }
sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-network-protocol = { path = "../protocol" }

Expand Down
51 changes: 23 additions & 28 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use futures::stream::BoxStream;
use futures::channel::{mpsc, oneshot};

use sc_network::Event as NetworkEvent;
use sp_runtime::ConsensusEngineId;

use polkadot_subsystem::{
ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
Expand Down Expand Up @@ -56,12 +55,8 @@ mod validator_discovery;
/// We use the same limit to compute the view sent to peers locally.
const MAX_VIEW_HEADS: usize = 5;

/// The engine ID of the validation protocol.
pub const VALIDATION_PROTOCOL_ID: ConsensusEngineId = *b"pvn1";
/// The protocol name for the validation peer-set.
pub const VALIDATION_PROTOCOL_NAME: &'static str = "/polkadot/validation/1";
/// The engine ID of the collation protocol.
pub const COLLATION_PROTOCOL_ID: ConsensusEngineId = *b"pcn1";
/// The protocol name for the collation peer-set.
pub const COLLATION_PROTOCOL_NAME: &'static str = "/polkadot/collation/1";

Expand All @@ -88,10 +83,10 @@ pub enum WireMessage<M> {

/// Information about the notifications protocol. Should be used during network configuration
/// or shortly after startup to register the protocol with the network service.
pub fn notifications_protocol_info() -> Vec<(ConsensusEngineId, std::borrow::Cow<'static, str>)> {
pub fn notifications_protocol_info() -> Vec<std::borrow::Cow<'static, str>> {
vec![
(VALIDATION_PROTOCOL_ID, VALIDATION_PROTOCOL_NAME.into()),
(COLLATION_PROTOCOL_ID, COLLATION_PROTOCOL_NAME.into()),
VALIDATION_PROTOCOL_NAME.into(),
COLLATION_PROTOCOL_NAME.into(),
]
}

Expand All @@ -108,8 +103,8 @@ pub enum NetworkAction {
pub trait Network: Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`VALIDATION_PROTOCOL_ID`](VALIDATION_PROTOCOL_ID)
/// or [`COLLATION_PROTOCOL_ID`](COLLATION_PROTOCOL_ID)
/// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
/// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;

/// Get access to an underlying sink for all network actions.
Expand Down Expand Up @@ -166,12 +161,12 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
match peer_set {
PeerSet::Validation => self.0.write_notification(
peer,
VALIDATION_PROTOCOL_ID,
VALIDATION_PROTOCOL_NAME.into(),
message,
),
PeerSet::Collation => self.0.write_notification(
peer,
COLLATION_PROTOCOL_ID,
COLLATION_PROTOCOL_NAME.into(),
message,
),
}
Expand Down Expand Up @@ -304,28 +299,28 @@ fn action_from_network_message(event: Option<NetworkEvent>) -> Action {
Action::Abort
}
Some(NetworkEvent::Dht(_)) => Action::Nop,
Some(NetworkEvent::NotificationStreamOpened { remote, engine_id, role }) => {
Some(NetworkEvent::NotificationStreamOpened { remote, protocol, role }) => {
let role = role.into();
match engine_id {
x if x == VALIDATION_PROTOCOL_ID
match protocol {
x if x == VALIDATION_PROTOCOL_NAME
=> Action::PeerConnected(PeerSet::Validation, remote, role),
x if x == COLLATION_PROTOCOL_ID
x if x == COLLATION_PROTOCOL_NAME
=> Action::PeerConnected(PeerSet::Collation, remote, role),
_ => Action::Nop,
}
}
Some(NetworkEvent::NotificationStreamClosed { remote, engine_id }) => {
match engine_id {
x if x == VALIDATION_PROTOCOL_ID
Some(NetworkEvent::NotificationStreamClosed { remote, protocol }) => {
match protocol {
x if x == VALIDATION_PROTOCOL_NAME
=> Action::PeerDisconnected(PeerSet::Validation, remote),
x if x == COLLATION_PROTOCOL_ID
x if x == COLLATION_PROTOCOL_NAME
=> Action::PeerDisconnected(PeerSet::Collation, remote),
_ => Action::Nop,
}
}
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let v_messages: Result<Vec<_>, _> = messages.iter()
.filter(|(engine_id, _)| engine_id == &VALIDATION_PROTOCOL_ID)
.filter(|(protocol, _)| protocol == &VALIDATION_PROTOCOL_NAME)
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();

Expand All @@ -335,7 +330,7 @@ fn action_from_network_message(event: Option<NetworkEvent>) -> Action {
};

let c_messages: Result<Vec<_>, _> = messages.iter()
.filter(|(engine_id, _)| engine_id == &COLLATION_PROTOCOL_ID)
.filter(|(protocol, _)| protocol == &COLLATION_PROTOCOL_NAME)
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();

Expand Down Expand Up @@ -827,10 +822,10 @@ mod tests {
)
}

fn peer_set_engine_id(peer_set: PeerSet) -> ConsensusEngineId {
fn peer_set_protocol(peer_set: PeerSet) -> std::borrow::Cow<'static, str> {
match peer_set {
PeerSet::Validation => VALIDATION_PROTOCOL_ID,
PeerSet::Collation => COLLATION_PROTOCOL_ID,
PeerSet::Validation => VALIDATION_PROTOCOL_NAME.into(),
PeerSet::Collation => COLLATION_PROTOCOL_NAME.into(),
}
}

Expand Down Expand Up @@ -890,22 +885,22 @@ mod tests {
async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) {
self.send_network_event(NetworkEvent::NotificationStreamOpened {
remote: peer,
engine_id: peer_set_engine_id(peer_set),
protocol: peer_set_protocol(peer_set),
role: role.into(),
}).await;
}

async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) {
self.send_network_event(NetworkEvent::NotificationStreamClosed {
remote: peer,
engine_id: peer_set_engine_id(peer_set),
protocol: peer_set_protocol(peer_set),
}).await;
}

async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec<u8>) {
self.send_network_event(NetworkEvent::NotificationsReceived {
remote: peer,
messages: vec![(peer_set_engine_id(peer_set), message.into())],
messages: vec![(peer_set_protocol(peer_set), message.into())],
}).await;
}

Expand Down