Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Apply review comments
  • Loading branch information
altonen committed Mar 29, 2023
commit a3180f3948d0c72743fbb9d14812a43c6b20035c
6 changes: 3 additions & 3 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub use crate::{
};

use codec::Encode;
use futures::channel::oneshot;
use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
pub use sc_network_common::{
Expand All @@ -39,6 +38,7 @@ pub use sc_network_common::{
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;
use zeroize::Zeroize;

use std::{
Expand Down Expand Up @@ -695,7 +695,7 @@ impl NetworkConfiguration {
}

/// Network initialization parameters.
pub struct Params<Client> {
pub struct Params<Client, Block: BlockT> {
/// Assigned role for our node (full, light, ...).
pub role: Role,

Expand All @@ -722,7 +722,7 @@ pub struct Params<Client> {
pub block_announce_config: NonDefaultSetConfig,

/// TX channel for direct communication with `SyncingEngine` and `Protocol`.
pub tx: TracingUnboundedSender<crate::event::SyncEvent>,
pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>,

/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
Expand Down
10 changes: 6 additions & 4 deletions client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{core::PeerId, kad::record::Key};

use sc_network_common::role::ObservedRole;
use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake};
use sp_runtime::traits::Block as BlockT;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -92,16 +93,17 @@ pub enum Event {
},
}

// TODO: move to sc-network
pub enum SyncEvent {
/// Event sent to `SyncingEngine`
// TODO: remove once `NotificationService` is implemented.
pub enum SyncEvent<B: BlockT> {
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// Received handshake.
received_handshake: Vec<u8>,
received_handshake: BlockAnnouncesHandshake<B>,
/// Notification sink.
sink: NotificationsSink,
/// Channel for reporting accept/reject of the substream.
Expand Down
16 changes: 11 additions & 5 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{

use bytes::Bytes;
use codec::{DecodeAll, Encode};
use futures::{channel::oneshot, stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use libp2p::{
core::connection::ConnectionId,
swarm::{
Expand Down Expand Up @@ -95,7 +95,7 @@ pub struct Protocol<B: BlockT> {
/// Connected peers.
peers: HashMap<PeerId, Roles>,
sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>,
tx: TracingUnboundedSender<crate::event::SyncEvent>,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
_marker: std::marker::PhantomData<B>,
}

Expand All @@ -105,7 +105,7 @@ impl<B: BlockT> Protocol<B> {
roles: Roles,
network_config: &config::NetworkConfiguration,
block_announces_protocol: config::NonDefaultSetConfig,
tx: TracingUnboundedSender<crate::event::SyncEvent>,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let mut known_addresses = Vec::new();

Expand Down Expand Up @@ -463,12 +463,18 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
match <Message<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => {
let roles = handshake.roles;
let handshake = BlockAnnouncesHandshake::<B> {
roles: handshake.roles,
best_number: handshake.best_number,
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};

let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
Expand Down Expand Up @@ -509,7 +515,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ where
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new<Client: HeaderBackend<B> + 'static>(
mut params: Params<Client>,
mut params: Params<Client, B>,
) -> Result<Self, Error> {
// Private and public keys configuration.
let local_identity = params.network_config.node_key.clone().into_keypair()?;
Expand Down
52 changes: 16 additions & 36 deletions client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::{
ChainSync, ClientError, SyncingService,
};

use codec::{Decode, DecodeAll, Encode};
use futures::{channel::oneshot, FutureExt, Stream, StreamExt};
use codec::{Decode, Encode};
use futures::{FutureExt, StreamExt};
use futures_timer::Delay;
use libp2p::PeerId;
use lru::LruCache;
Expand All @@ -39,7 +39,6 @@ use sc_network::{
config::{
NetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode,
},
event::Event,
utils::LruHashSet,
NotificationsSink, ProtocolName,
};
Expand All @@ -63,7 +62,6 @@ use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero};
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Expand All @@ -79,8 +77,6 @@ const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead

mod rep {
use sc_peerset::ReputationChange as Rep;
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
/// Peer has different genesis.
pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
/// Peer send us a block announcement that failed at validation.
Expand Down Expand Up @@ -187,7 +183,7 @@ pub struct SyncingEngine<B: BlockT, Client> {
service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,

/// Channel for receiving inbound connections from `Protocol`.
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent>,
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,

/// Assigned roles.
roles: Roles,
Expand Down Expand Up @@ -259,7 +255,7 @@ where
block_request_protocol_name: ProtocolName,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent>,
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
) -> Result<(Self, SyncingService<B>, NonDefaultSetConfig), ClientError> {
let mode = match network_config.sync_mode {
SyncOperationMode::Full => SyncMode::Full,
Expand Down Expand Up @@ -678,33 +674,17 @@ where
received_handshake,
sink,
tx,
} => {
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(
&mut &received_handshake[..],
) {
Ok(decoded_handshake) => {
match self.on_sync_peer_connected(remote, decoded_handshake, sink) {
Ok(()) => {
let _ = tx.send(true);
},
Err(()) => {
log::debug!(
target: "sync",
"Failed to register peer {remote:?}: {received_handshake:?}",
);
let _ = tx.send(false);
},
}
},
Err(_) => {
let _ = tx.send(false);
log::debug!(
target: "sync",
"failed to decode handshake but it was decoded correctly by `Protocol`",
);
debug_assert!(false);
},
}
} => match self.on_sync_peer_connected(remote, &received_handshake, sink) {
Ok(()) => {
let _ = tx.send(true);
},
Err(()) => {
log::debug!(
target: "sync",
"Failed to register peer {remote:?}: {received_handshake:?}",
);
let _ = tx.send(false);
},
},
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
if self.on_sync_peer_disconnected(remote).is_err() {
Expand Down Expand Up @@ -780,7 +760,7 @@ where
pub fn on_sync_peer_connected(
&mut self,
who: PeerId,
status: BlockAnnouncesHandshake<B>,
status: &BlockAnnouncesHandshake<B>,
sink: NotificationsSink,
) -> Result<(), ()> {
log::trace!(target: "sync", "New peer {} {:?}", who, status);
Expand Down
4 changes: 2 additions & 2 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ use sc_network::{
},
request_responses::ProtocolConfig as RequestResponseConfig,
types::ProtocolName,
Multiaddr, NetworkBlock, NetworkEventStream, NetworkService, NetworkStateInfo,
NetworkSyncForkRequest, NetworkWorker,
Multiaddr, NetworkBlock, NetworkService, NetworkStateInfo, NetworkSyncForkRequest,
NetworkWorker,
};
use sc_network_common::{
role::Roles,
Expand Down
6 changes: 2 additions & 4 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ use sc_client_db::{Backend, DatabaseSettings};
use sc_consensus::import_queue::ImportQueue;
use sc_executor::RuntimeVersionOf;
use sc_keystore::LocalKeystore;
use sc_network::{
config::SyncMode, NetworkEventStream, NetworkService, NetworkStateInfo, NetworkStatusProvider,
};
use sc_network::{config::SyncMode, NetworkService, NetworkStateInfo, NetworkStatusProvider};
use sc_network_bitswap::BitswapRequestHandler;
use sc_network_common::{role::Roles, sync::warp::WarpSyncParams};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
Expand Down Expand Up @@ -906,7 +904,7 @@ where
)?;

spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run());
spawn_handle.spawn(
spawn_handle.spawn_blocking(
"chain-sync-network-service-provider",
Some("networking"),
chain_sync_network_provider.run(network.clone()),
Expand Down