Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/beefy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ homepage = "https://substrate.io"

[dependencies]
array-bytes = "4.1"
async-channel = "1.8.0"
async-trait = "0.1.57"
codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] }
fnv = "1.0.6"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
//! Helper for handling (i.e. answering) BEEFY justifications requests from a remote peer.

use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use futures::{channel::oneshot, StreamExt};
use log::{debug, trace};
use sc_client_api::BlockBackend;
use sc_network::{
Expand Down Expand Up @@ -102,11 +99,11 @@ impl<B: Block> IncomingRequest<B> {
///
/// Takes care of decoding and handling of invalid encoded requests.
pub(crate) struct IncomingRequestReceiver {
raw: mpsc::Receiver<netconfig::IncomingRequest>,
raw: async_channel::Receiver<netconfig::IncomingRequest>,
}

impl IncomingRequestReceiver {
pub fn new(inner: mpsc::Receiver<netconfig::IncomingRequest>) -> Self {
pub fn new(inner: async_channel::Receiver<netconfig::IncomingRequest>) -> Self {
Self { raw: inner }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub(crate) mod outgoing_requests_engine;

pub use incoming_requests_handler::BeefyJustifsRequestHandler;

use futures::channel::mpsc;
use std::time::Duration;

use codec::{Decode, Encode, Error as CodecError};
Expand Down Expand Up @@ -54,7 +53,7 @@ pub(crate) fn on_demand_justifications_protocol_config<Hash: AsRef<[u8]>>(
) -> (IncomingRequestReceiver, RequestResponseConfig) {
let name = justifications_protocol_name(genesis_hash, fork_id);
let fallback_names = vec![];
let (tx, rx) = mpsc::channel(JUSTIF_CHANNEL_SIZE);
let (tx, rx) = async_channel::bounded(JUSTIF_CHANNEL_SIZE);
let rx = IncomingRequestReceiver::new(rx);
let cfg = RequestResponseConfig {
name,
Expand Down
1 change: 1 addition & 0 deletions client/network/bitswap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
prost-build = "0.11"

[dependencies]
async-channel = "1.8.0"
cid = "0.8.6"
futures = "0.3.21"
libp2p-identity = { version = "0.1.2", features = ["peerid"] }
Expand Down
8 changes: 4 additions & 4 deletions client/network/bitswap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! CID is expected to reference 256-bit Blake2b transaction hash.

use cid::{self, Version};
use futures::{channel::mpsc, StreamExt};
use futures::StreamExt;
use libp2p_identity::PeerId;
use log::{debug, error, trace};
use prost::Message;
Expand Down Expand Up @@ -93,13 +93,13 @@ impl Prefix {
/// Bitswap request handler
pub struct BitswapRequestHandler<B> {
client: Arc<dyn BlockBackend<B> + Send + Sync>,
request_receiver: mpsc::Receiver<IncomingRequest>,
request_receiver: async_channel::Receiver<IncomingRequest>,
}

impl<B: BlockT> BitswapRequestHandler<B> {
/// Create a new [`BitswapRequestHandler`].
pub fn new(client: Arc<dyn BlockBackend<B> + Send + Sync>) -> (Self, ProtocolConfig) {
let (tx, request_receiver) = mpsc::channel(MAX_REQUEST_QUEUE);
let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE);

let config = ProtocolConfig {
name: ProtocolName::from(PROTOCOL_NAME),
Expand Down Expand Up @@ -289,7 +289,7 @@ pub enum BitswapError {
#[cfg(test)]
mod tests {
use super::*;
use futures::{channel::oneshot, SinkExt};
use futures::channel::oneshot;
use sc_block_builder::BlockBuilderProvider;
use schema::bitswap::{
message::{wantlist::Entry, Wantlist},
Expand Down
1 change: 1 addition & 0 deletions client/network/light/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
prost-build = "0.11"

[dependencies]
async-channel = "1.8.0"
array-bytes = "4.1"
codec = { package = "parity-scale-codec", version = "3.2.2", features = [
"derive",
Expand Down
12 changes: 7 additions & 5 deletions client/network/light/src/light_client_requests/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

use crate::schema;
use codec::{self, Decode, Encode};
use futures::{channel::mpsc, prelude::*};
use futures::prelude::*;
use libp2p_identity::PeerId;
use log::{debug, trace};
use prost::Message;
Expand All @@ -43,9 +43,13 @@ use std::{marker::PhantomData, sync::Arc};

const LOG_TARGET: &str = "light-client-request-handler";

/// Incoming requests bounded queue size. For now due to lack of data on light client request
/// handling in production systems, this value is chosen to match the block request limit.
const MAX_LIGHT_REQUEST_QUEUE: usize = 20;

/// Handler for incoming light client requests from a remote peer.
pub struct LightClientRequestHandler<B, Client> {
request_receiver: mpsc::Receiver<IncomingRequest>,
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Blockchain client.
client: Arc<Client>,
_block: PhantomData<B>,
Expand All @@ -62,9 +66,7 @@ where
fork_id: Option<&str>,
client: Arc<Client>,
) -> (Self, ProtocolConfig) {
// For now due to lack of data on light client request handling in production systems, this
// value is chosen to match the block request limit.
let (tx, request_receiver) = mpsc::channel(20);
let (tx, request_receiver) = async_channel::bounded(MAX_LIGHT_REQUEST_QUEUE);

let mut protocol_config = super::generate_protocol_config(
protocol_id,
Expand Down
36 changes: 18 additions & 18 deletions client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@

use crate::{types::ProtocolName, ReputationChange};

use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use futures::{channel::oneshot, prelude::*};
use libp2p::{
core::{Endpoint, Multiaddr},
request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
Expand Down Expand Up @@ -126,7 +123,7 @@ pub struct ProtocolConfig {
/// other peers. If this is `Some` but the channel is closed, then the local node will
/// advertise support for this protocol, but any incoming request will lead to an error being
/// sent back.
pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
}

/// A single request received by a peer on a request-response protocol.
Expand Down Expand Up @@ -259,8 +256,10 @@ pub struct RequestResponsesBehaviour {
///
/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
/// "response builder" used to build responses for incoming requests.
protocols:
HashMap<ProtocolName, (Behaviour<GenericCodec>, Option<mpsc::Sender<IncomingRequest>>)>,
protocols: HashMap<
ProtocolName,
(Behaviour<GenericCodec>, Option<async_channel::Sender<IncomingRequest>>),
>,

/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
pending_requests:
Expand Down Expand Up @@ -295,7 +294,10 @@ struct MessageRequest {
request: Vec<u8>,
channel: ResponseChannel<Result<Vec<u8>, ()>>,
protocol: ProtocolName,
resp_builder: Option<futures::channel::mpsc::Sender<IncomingRequest>>,
// A builder used for building responses for incoming requests. Note that we use
// `async_channel` and not `mpsc` on purpose, because `mpsc::channel` allocates an extra
// message slot for every cloned `Sender` and this breaks a back-pressure mechanism.
resp_builder: Option<async_channel::Sender<IncomingRequest>>,
// Once we get incoming request we save all params, create an async call to Peerset
// to get the reputation of the peer.
get_peer_reputation: Pin<Box<dyn Future<Output = Result<i32, ()>> + Send>>,
Expand Down Expand Up @@ -618,10 +620,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour {

// Submit the request to the "response builder" passed by the user at
// initialization.
if let Some(mut resp_builder) = resp_builder {
if let Some(resp_builder) = resp_builder {
// If the response builder is too busy, silently drop `tx`. This
// will be reported by the corresponding request-response [`Behaviour`]
// through an `InboundFailure::Omission` event.
// Note that we use `async_channel::bounded` and not `mpsc::channel`
// because the latter allocates an extra slot for every cloned sender.
let _ = resp_builder.try_send(IncomingRequest {
peer,
payload: request,
Expand Down Expand Up @@ -1036,11 +1040,7 @@ impl Codec for GenericCodec {
mod tests {
use super::*;

use futures::{
channel::{mpsc, oneshot},
executor::LocalPool,
task::Spawn,
};
use futures::{channel::oneshot, executor::LocalPool, task::Spawn};
use libp2p::{
core::{
transport::{MemoryTransport, Transport},
Expand Down Expand Up @@ -1112,7 +1112,7 @@ mod tests {
// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
let mut swarms = (0..2)
.map(|_| {
let (tx, mut rx) = mpsc::channel::<IncomingRequest>(64);
let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);

pool.spawner()
.spawn_obj(
Expand Down Expand Up @@ -1215,7 +1215,7 @@ mod tests {
// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
let mut swarms = (0..2)
.map(|_| {
let (tx, mut rx) = mpsc::channel::<IncomingRequest>(64);
let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);

pool.spawner()
.spawn_obj(
Expand Down Expand Up @@ -1353,8 +1353,8 @@ mod tests {
};

let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2, peerset) = {
let (tx_1, rx_1) = mpsc::channel(64);
let (tx_2, rx_2) = mpsc::channel(64);
let (tx_1, rx_1) = async_channel::bounded(64);
let (tx_2, rx_2) = async_channel::bounded(64);

let protocol_configs = vec![
ProtocolConfig {
Expand Down
1 change: 1 addition & 0 deletions client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ prost-build = "0.11"

[dependencies]
array-bytes = "4.1"
async-channel = "1.8.0"
async-trait = "0.1.58"
codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] }
futures = "0.3.21"
Expand Down
9 changes: 3 additions & 6 deletions client/network/sync/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use crate::{
};

use codec::{Decode, Encode};
use futures::{
channel::{mpsc, oneshot},
stream::StreamExt,
};
use futures::{channel::oneshot, stream::StreamExt};
use libp2p::PeerId;
use log::debug;
use lru::LruCache;
Expand Down Expand Up @@ -136,7 +133,7 @@ enum SeenRequestsValue {
/// Handler for incoming block requests from a remote peer.
pub struct BlockRequestHandler<B: BlockT, Client> {
client: Arc<Client>,
request_receiver: mpsc::Receiver<IncomingRequest>,
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
/// This is used to check if a peer is spamming us with the same request.
Expand All @@ -157,7 +154,7 @@ where
) -> (Self, ProtocolConfig) {
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
let (tx, request_receiver) = mpsc::channel(num_peer_hint);
let (tx, request_receiver) = async_channel::bounded(num_peer_hint);

let mut protocol_config = generate_protocol_config(
protocol_id,
Expand Down
9 changes: 3 additions & 6 deletions client/network/sync/src/state_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
use crate::schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse};

use codec::{Decode, Encode};
use futures::{
channel::{mpsc, oneshot},
stream::StreamExt,
};
use futures::{channel::oneshot, stream::StreamExt};
use libp2p::PeerId;
use log::{debug, trace};
use lru::LruCache;
Expand Down Expand Up @@ -114,7 +111,7 @@ enum SeenRequestsValue {
/// Handler for incoming block requests from a remote peer.
pub struct StateRequestHandler<B: BlockT, Client> {
client: Arc<Client>,
request_receiver: mpsc::Receiver<IncomingRequest>,
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
/// This is used to check if a peer is spamming us with the same request.
Expand All @@ -135,7 +132,7 @@ where
) -> (Self, ProtocolConfig) {
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
let (tx, request_receiver) = mpsc::channel(num_peer_hint);
let (tx, request_receiver) = async_channel::bounded(num_peer_hint);

let mut protocol_config = generate_protocol_config(
protocol_id,
Expand Down
12 changes: 6 additions & 6 deletions client/network/sync/src/warp_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
//! Helper for handling (i.e. answering) grandpa warp sync requests from a remote peer.

use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
stream::StreamExt,
};
use futures::{channel::oneshot, stream::StreamExt};
use log::debug;

use sc_network::{
Expand All @@ -36,6 +33,9 @@ use std::{sync::Arc, time::Duration};

const MAX_RESPONSE_SIZE: u64 = 16 * 1024 * 1024;

/// Incoming warp requests bounded queue size.
const MAX_WARP_REQUEST_QUEUE: usize = 20;

/// Generates a [`RequestResponseConfig`] for the grandpa warp sync request protocol, refusing
/// incoming requests.
pub fn generate_request_response_config<Hash: AsRef<[u8]>>(
Expand Down Expand Up @@ -72,7 +72,7 @@ fn generate_legacy_protocol_name(protocol_id: ProtocolId) -> String {
/// Handler for incoming grandpa warp sync requests from a remote peer.
pub struct RequestHandler<TBlock: BlockT> {
backend: Arc<dyn WarpSyncProvider<TBlock>>,
request_receiver: mpsc::Receiver<IncomingRequest>,
request_receiver: async_channel::Receiver<IncomingRequest>,
}

impl<TBlock: BlockT> RequestHandler<TBlock> {
Expand All @@ -83,7 +83,7 @@ impl<TBlock: BlockT> RequestHandler<TBlock> {
fork_id: Option<&str>,
backend: Arc<dyn WarpSyncProvider<TBlock>>,
) -> (Self, RequestResponseConfig) {
let (tx, request_receiver) = mpsc::channel(20);
let (tx, request_receiver) = async_channel::bounded(MAX_WARP_REQUEST_QUEUE);

let mut request_response_config =
generate_request_response_config(protocol_id, genesis_hash, fork_id);
Expand Down