Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! See the documentation of [`Params`].

pub use crate::{
protocol::NotificationsSink,
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
Expand All @@ -31,7 +32,12 @@ pub use crate::{
use codec::Encode;
use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT};
pub use sc_network_common::{
role::{Role, Roles},
sync::warp::WarpSyncProvider,
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use zeroize::Zeroize;

use sp_runtime::traits::Block as BlockT;
Expand Down Expand Up @@ -714,6 +720,9 @@ pub struct Params<Block: BlockT> {
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,

/// TX channel for direct communication with `SyncingEngine` and `Protocol`.
pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>,

/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
}
Expand Down
47 changes: 45 additions & 2 deletions client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
//! Network event types. These are are not the part of the protocol, but rather
//! events that happen on the network like DHT get/put results received.

use crate::types::ProtocolName;
use crate::{types::ProtocolName, NotificationsSink};

use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{core::PeerId, kad::record::Key};

use sc_network_common::role::ObservedRole;
use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake};
use sp_runtime::traits::Block as BlockT;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -90,3 +92,44 @@ pub enum Event {
messages: Vec<(ProtocolName, Bytes)>,
},
}

/// Event sent to `SyncingEngine`
// TODO: remove once `NotificationService` is implemented.
pub enum SyncEvent<B: BlockT> {
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// Received handshake.
received_handshake: BlockAnnouncesHandshake<B>,
/// Notification sink.
sink: NotificationsSink,
/// Channel for reporting accept/reject of the substream.
tx: oneshot::Sender<bool>,
},

/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
},

/// Notification sink was replaced.
NotificationSinkReplaced {
/// Node we closed the substream with.
remote: PeerId,
/// Notification sink.
sink: NotificationsSink,
},

/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<Bytes>,
},
}
6 changes: 3 additions & 3 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub mod request_responses;
pub mod types;
pub mod utils;

pub use event::{DhtEvent, Event};
pub use event::{DhtEvent, Event, SyncEvent};
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig};
Expand All @@ -278,8 +278,8 @@ pub use service::{
NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT,
NotificationSenderError, NotificationSenderReady,
},
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure,
PublicKey,
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink,
OutboundFailure, PublicKey,
};
pub use types::ProtocolName;

Expand Down
108 changes: 90 additions & 18 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{

use bytes::Bytes;
use codec::{DecodeAll, Encode};
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use libp2p::{
core::connection::ConnectionId,
swarm::{
Expand All @@ -35,11 +36,14 @@ use libp2p::{
use log::{debug, error, warn};

use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;

use std::{
collections::{HashMap, HashSet, VecDeque},
future::Future,
iter,
pin::Pin,
task::Poll,
};

Expand Down Expand Up @@ -68,6 +72,9 @@ mod rep {
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
}

type PendingSyncSubstreamValidation =
Pin<Box<dyn Future<Output = Result<(PeerId, Roles), PeerId>> + Send>>;

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> {
/// Pending list of messages to return from `poll` as a priority.
Expand All @@ -87,6 +94,8 @@ pub struct Protocol<B: BlockT> {
bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>,
/// Connected peers.
peers: HashMap<PeerId, Roles>,
sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
_marker: std::marker::PhantomData<B>,
}

Expand All @@ -96,6 +105,7 @@ impl<B: BlockT> Protocol<B> {
roles: Roles,
network_config: &config::NetworkConfiguration,
block_announces_protocol: config::NonDefaultSetConfig,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let mut known_addresses = Vec::new();

Expand Down Expand Up @@ -179,6 +189,8 @@ impl<B: BlockT> Protocol<B> {
.collect(),
bad_handshake_substreams: Default::default(),
peers: HashMap::new(),
sync_substream_validations: FuturesUnordered::new(),
tx,
// TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol`
_marker: Default::default(),
};
Expand Down Expand Up @@ -418,6 +430,23 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }),
};

while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}

let outcome = match event {
NotificationsOut::CustomProtocolOpen {
peer_id,
Expand All @@ -440,16 +469,29 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};
self.peers.insert(peer_id, roles);

CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
negotiated_fallback,
received_handshake: handshake.encode(),
roles,
notifications_sink,
}
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));

CustomMessageOutcome::None
},
Ok(msg) => {
debug!(
Expand All @@ -469,15 +511,27 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let roles = handshake.roles;
self.peers.insert(peer_id, roles);

CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)]
.clone(),
negotiated_fallback,
received_handshake,
roles,
notifications_sink,
}
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));
CustomMessageOutcome::None
},
Err(err2) => {
log::debug!(
Expand Down Expand Up @@ -535,6 +589,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced {
remote: peer_id,
sink: notifications_sink,
});
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
Expand All @@ -548,6 +608,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
// handshake. The outer layers have never received an opening event about this
// substream, and consequently shouldn't receive a closing event either.
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed {
remote: peer_id,
});
self.peers.remove(&peer_id);
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamClosed {
remote: peer_id,
Expand All @@ -558,6 +624,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
NotificationsOut::Notification { peer_id, set_id, message } => {
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived {
remote: peer_id,
messages: vec![message.freeze()],
});
CustomMessageOutcome::None
} else {
let protocol_name = self.notification_protocols[usize::from(set_id)].clone();
CustomMessageOutcome::NotificationsReceived {
Expand Down
6 changes: 4 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
protocol::{self, NotificationsSink, NotifsHandlerError, Protocol, Ready},
protocol::{self, NotifsHandlerError, Protocol, Ready},
request_responses::{IfDisconnected, RequestFailure},
service::{
signature::{Signature, SigningError},
Expand Down Expand Up @@ -91,6 +91,7 @@ use std::{

pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey};
pub use protocol::NotificationsSink;

mod metrics;
mod out_events;
Expand Down Expand Up @@ -146,7 +147,7 @@ where
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new<Block: BlockT>(mut params: Params<Block>) -> Result<Self, Error> {
pub fn new(mut params: Params<B>) -> Result<Self, Error> {
// Private and public keys configuration.
let local_identity = params.network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
Expand Down Expand Up @@ -227,6 +228,7 @@ where
From::from(&params.role),
&params.network_config,
params.block_announce_config,
params.tx,
)?;

// List of multiaddresses that we know in the network.
Expand Down
Loading