Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
0402ecc
Adds try_state to elections pallet
gpestana Mar 27, 2023
af0e6a5
Finishes elections try-state hook; starts EPM hook impl
gpestana Mar 28, 2023
788ddf4
Adds scaffolding for EPM try_state checks
gpestana Mar 28, 2023
b721463
implements checks for EPM
gpestana Mar 30, 2023
3103262
Update frame/election-provider-multi-phase/src/lib.rs
gpestana Apr 20, 2023
6cbf9ab
Update frame/elections-phragmen/src/lib.rs
gpestana Apr 20, 2023
a483470
chore(sc-cli): improve runner and signals (#13688)
yjhmelody Mar 26, 2023
8077b19
add a new ci job to fuzz sp-arithmetic (#13673)
Mar 27, 2023
19e9edc
sc-slots: Forward block size limit (#13716)
bkchr Mar 27, 2023
322d31b
FRAME: Fix the Referenda confirming alarm (#13704)
gavofyork Mar 27, 2023
c04f812
Update pallet licenses to Apache-2.0 (#13467)
aaronbassett Mar 27, 2023
f0bcd0d
Reward pool migration fix (#13715)
Mar 27, 2023
f94fce0
contracts: Add host function tracing (#13648)
pgherveou Mar 27, 2023
67e1cab
Change license of node-template and FRAME examples to MIT-0 (#13465)
aaronbassett Mar 27, 2023
08e037a
updating labels descriptions (#13557)
the-right-joyce Mar 27, 2023
9522b75
CI: Investigate why `RUSTFLAGS` differs on `master` and PRs (#13686)
rcny Mar 28, 2023
998d768
Remove HeaderBackend requirement from AuthorityDiscovery and NetworkW…
skunert Mar 28, 2023
f34444b
CI: rephrase `RUSTFLAGS` (#13735)
rcny Mar 28, 2023
f799177
contracts: proper event link in docs (#13729)
kziemianek Mar 28, 2023
9bc5215
Support stable rust for compiling the runtime (#13580)
bkchr Mar 29, 2023
6618041
provide a default value for RELENG_SCRIPTS_BRANCH (#13743)
Mar 29, 2023
42ee907
roll out new debian 11 ci image (#13744)
Mar 29, 2023
2482f20
Swap 'base58' with 'bs58' (#13739)
davxy Mar 29, 2023
204296a
proc-macro: check for non-args runtime calls added (#13742)
michalkucharczyk Mar 29, 2023
08a35e1
Clean up after debian 11 rollout (#13762)
Mar 29, 2023
17a5d9e
[Fix] Bump tuple element number in frame-support. (#13760)
ruseinov Mar 29, 2023
57dddd2
Generic keystore internals (#13749)
davxy Mar 29, 2023
8929804
Application Crypto cleanup (#13746)
davxy Mar 30, 2023
cb3869d
Attempt to relieve pressure on `mpsc_network_worker` (#13725)
altonen Mar 30, 2023
daf303f
[Enhancement] Throw an error when there are too many pallets (#13763)
ruseinov Mar 30, 2023
6e4e5f5
Build wasm for mvp cpu (#13758)
athei Mar 30, 2023
2fbf914
BEEFY: gossip finality proofs (#13727)
acatangiu Mar 30, 2023
6c1b1ad
Fix nomiantion pools doc render (#13748)
kianenigma Mar 30, 2023
e070908
sp-runtime-interface-test: Fix flaky test (#13770)
bkchr Mar 30, 2023
83d9516
bump zombienet version (#13772)
pepoviola Mar 30, 2023
7756bb1
[Contracts] Overflowing bounded `DeletionQueue` allows DoS against co…
pgherveou Mar 31, 2023
668d61b
Refactor: extract most aura logic out to standalone module, make use …
rphmeier Mar 31, 2023
c9a1ff0
contracts: make test work with debugger (#13776)
pgherveou Mar 31, 2023
19fe15a
add claim_commission weight (#13774)
Apr 1, 2023
76adcec
FRAME: Minor fix for failsafe. (#13741)
gavofyork Apr 1, 2023
81ea064
Adjustments to RPC-query docstrings. (#13698)
DamianStraszak Apr 3, 2023
e1b12eb
Force upgrade snow to 0.9.2 (#13806)
bkchr Apr 3, 2023
a066028
Scheduler pre block limit note (#13231)
muharem Apr 3, 2023
6d38833
Disable `sign-ext` WASM feature when building runtimes (#13804)
koute Apr 3, 2023
71eaef9
refactor: inconsistent BalanceConversion fn (#13610)
wischli Apr 4, 2023
fae245d
Remove deprecated batch verification (#13799)
bkchr Apr 4, 2023
d5ddea6
Uniform pallet warnings (#13798)
ggwpez Apr 4, 2023
cc3f56a
Update documentation for uniques (This PR renames classes and instanc…
simonsso Apr 4, 2023
ee40229
Implement #[pallet::composite_enum] (#13722)
KiChjang Apr 4, 2023
f6f7ced
Expose WASM extensions in executor semantics (#13811)
s0me0ne-unkn0wn Apr 4, 2023
6c45413
Deprecate V1 Weights (#13699)
ggwpez Apr 4, 2023
32f601b
update links to ink! (#13819)
agryaznov Apr 5, 2023
d1e309f
Move registration of `ReadRuntimeVersionExt` to `ExecutionExtension` …
bkchr Apr 5, 2023
5b508e9
Mention `on_idle` round-robin logic to `trait Hooks` cargo doc (#13797)
gpestana Apr 5, 2023
a930258
ProofRecorder: Implement transactional support (#13769)
bkchr Apr 5, 2023
a98e3c2
remove duplicate sync option documentation (#13828)
liamaharon Apr 6, 2023
74c0cf6
Add HoldReason to the NIS pallet (#13823)
KiChjang Apr 6, 2023
3c139bc
Fix `try-runtime follow-chain`, try-runtime upgrade tuple tests, cli …
liamaharon Apr 6, 2023
37479ad
Use proper weights in the `pallet-template` (#13817)
AlexD10S Apr 6, 2023
359d86d
Addresses PR review comments
gpestana Apr 23, 2023
c6633c8
Merge branch 'master' into gpestana/try_state_elections
gpestana Apr 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Attempt to relieve pressure on mpsc_network_worker (#13725)
* Attempt to relieve pressure on `mpsc_network_worker`

`SyncingEngine` interacting with `NetworkWorker` can put a lot of strain
on the channel if the number of inbound connections is high. This is
because `SyncingEngine` is notified of each inbound substream which it
then can either accept or reject and this causes a lot of message
exchange on the already busy channel.

Use a direct channel pair between `Protocol` and `SyncingEngine`
to exchange notification events. It is a temporary change to alleviate
the problems caused by syncing being an independent protocol and the
fix will be removed once `NotificationService` is implemented.

* Apply review comments

* fixes

* trigger ci

* Fix tests

Verify that both peers have a connection now that the validation goes
through `SyncingEngine`. Depending on how the tasks are scheduled,
one of them might not have the peer registered in `SyncingEngine` at which
point the test won't make any progress because block announcement received
from an unknown peer is discarded.

Move polling of `ChainSync` at the end of the function so that if a block
announcement causes a block request to be sent, that can be sent in the
same call to `SyncingEngine::poll()`.

---------

Co-authored-by: parity-processbot <>
  • Loading branch information
altonen authored and gpestana committed Apr 20, 2023
commit cb3869d35e749c412b9555bdf890a68aecd0c3c7
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! See the documentation of [`Params`].

pub use crate::{
protocol::NotificationsSink,
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
Expand All @@ -31,7 +32,12 @@ pub use crate::{
use codec::Encode;
use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT};
pub use sc_network_common::{
role::{Role, Roles},
sync::warp::WarpSyncProvider,
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use zeroize::Zeroize;

use sp_runtime::traits::Block as BlockT;
Expand Down Expand Up @@ -714,6 +720,9 @@ pub struct Params<Block: BlockT> {
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,

/// TX channel for direct communication with `SyncingEngine` and `Protocol`.
pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>,

/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
}
Expand Down
47 changes: 45 additions & 2 deletions client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
//! Network event types. These are are not the part of the protocol, but rather
//! events that happen on the network like DHT get/put results received.

use crate::types::ProtocolName;
use crate::{types::ProtocolName, NotificationsSink};

use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{core::PeerId, kad::record::Key};

use sc_network_common::role::ObservedRole;
use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake};
use sp_runtime::traits::Block as BlockT;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -90,3 +92,44 @@ pub enum Event {
messages: Vec<(ProtocolName, Bytes)>,
},
}

/// Event sent to `SyncingEngine`
// TODO: remove once `NotificationService` is implemented.
pub enum SyncEvent<B: BlockT> {
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// Received handshake.
received_handshake: BlockAnnouncesHandshake<B>,
/// Notification sink.
sink: NotificationsSink,
/// Channel for reporting accept/reject of the substream.
tx: oneshot::Sender<bool>,
},

/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
},

/// Notification sink was replaced.
NotificationSinkReplaced {
/// Node we closed the substream with.
remote: PeerId,
/// Notification sink.
sink: NotificationsSink,
},

/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<Bytes>,
},
}
6 changes: 3 additions & 3 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub mod request_responses;
pub mod types;
pub mod utils;

pub use event::{DhtEvent, Event};
pub use event::{DhtEvent, Event, SyncEvent};
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig};
Expand All @@ -278,8 +278,8 @@ pub use service::{
NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT,
NotificationSenderError, NotificationSenderReady,
},
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure,
PublicKey,
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink,
OutboundFailure, PublicKey,
};
pub use types::ProtocolName;

Expand Down
108 changes: 90 additions & 18 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{

use bytes::Bytes;
use codec::{DecodeAll, Encode};
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use libp2p::{
core::connection::ConnectionId,
swarm::{
Expand All @@ -35,11 +36,14 @@ use libp2p::{
use log::{debug, error, warn};

use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;

use std::{
collections::{HashMap, HashSet, VecDeque},
future::Future,
iter,
pin::Pin,
task::Poll,
};

Expand Down Expand Up @@ -68,6 +72,9 @@ mod rep {
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
}

type PendingSyncSubstreamValidation =
Pin<Box<dyn Future<Output = Result<(PeerId, Roles), PeerId>> + Send>>;

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> {
/// Pending list of messages to return from `poll` as a priority.
Expand All @@ -87,6 +94,8 @@ pub struct Protocol<B: BlockT> {
bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>,
/// Connected peers.
peers: HashMap<PeerId, Roles>,
sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
_marker: std::marker::PhantomData<B>,
}

Expand All @@ -96,6 +105,7 @@ impl<B: BlockT> Protocol<B> {
roles: Roles,
network_config: &config::NetworkConfiguration,
block_announces_protocol: config::NonDefaultSetConfig,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let mut known_addresses = Vec::new();

Expand Down Expand Up @@ -179,6 +189,8 @@ impl<B: BlockT> Protocol<B> {
.collect(),
bad_handshake_substreams: Default::default(),
peers: HashMap::new(),
sync_substream_validations: FuturesUnordered::new(),
tx,
// TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol`
_marker: Default::default(),
};
Expand Down Expand Up @@ -418,6 +430,23 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }),
};

while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}

let outcome = match event {
NotificationsOut::CustomProtocolOpen {
peer_id,
Expand All @@ -440,16 +469,29 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};
self.peers.insert(peer_id, roles);

CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
negotiated_fallback,
received_handshake: handshake.encode(),
roles,
notifications_sink,
}
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));

CustomMessageOutcome::None
},
Ok(msg) => {
debug!(
Expand All @@ -469,15 +511,27 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let roles = handshake.roles;
self.peers.insert(peer_id, roles);

CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)]
.clone(),
negotiated_fallback,
received_handshake,
roles,
notifications_sink,
}
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));
CustomMessageOutcome::None
},
Err(err2) => {
log::debug!(
Expand Down Expand Up @@ -535,6 +589,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced {
remote: peer_id,
sink: notifications_sink,
});
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
Expand All @@ -548,6 +608,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
// handshake. The outer layers have never received an opening event about this
// substream, and consequently shouldn't receive a closing event either.
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed {
remote: peer_id,
});
self.peers.remove(&peer_id);
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamClosed {
remote: peer_id,
Expand All @@ -558,6 +624,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
NotificationsOut::Notification { peer_id, set_id, message } => {
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived {
remote: peer_id,
messages: vec![message.freeze()],
});
CustomMessageOutcome::None
} else {
let protocol_name = self.notification_protocols[usize::from(set_id)].clone();
CustomMessageOutcome::NotificationsReceived {
Expand Down
6 changes: 4 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
protocol::{self, NotificationsSink, NotifsHandlerError, Protocol, Ready},
protocol::{self, NotifsHandlerError, Protocol, Ready},
request_responses::{IfDisconnected, RequestFailure},
service::{
signature::{Signature, SigningError},
Expand Down Expand Up @@ -91,6 +91,7 @@ use std::{

pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey};
pub use protocol::NotificationsSink;

mod metrics;
mod out_events;
Expand Down Expand Up @@ -146,7 +147,7 @@ where
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new<Block: BlockT>(mut params: Params<Block>) -> Result<Self, Error> {
pub fn new(mut params: Params<B>) -> Result<Self, Error> {
// Private and public keys configuration.
let local_identity = params.network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
Expand Down Expand Up @@ -227,6 +228,7 @@ where
From::from(&params.role),
&params.network_config,
params.block_announce_config,
params.tx,
)?;

// List of multiaddresses that we know in the network.
Expand Down
Loading