diff --git a/Cargo.lock b/Cargo.lock index 6a92f8e35f055..ccecc302c2eed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,9 +244,9 @@ dependencies = [ [[package]] name = "async-io" -version = "1.2.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40a0b2bb8ae20fede194e779150fe283f65a4a08461b496de546ec366b174ad9" +checksum = "9315f8f07556761c3e48fec2e6b276004acf426e6dc068b2c2251854d65ee0fd" dependencies = [ "concurrent-queue", "fastrand", @@ -306,15 +306,15 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" [[package]] name = "async-tls" -version = "0.10.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d85a97c4a0ecce878efd3f945f119c78a646d8975340bca0398f9bb05c30cc52" +checksum = "dd0d8b6fc362bebff7502479fb5e9aed00c8cc3abc5af755536e73a128f0cb88" dependencies = [ "futures-core", "futures-io", - "rustls", + "rustls 0.19.0", "webpki", - "webpki-roots 0.20.0", + "webpki-roots", ] [[package]] @@ -2364,7 +2364,7 @@ dependencies = [ "futures-util", "hyper 0.13.9", "log", - "rustls", + "rustls 0.18.1", "rustls-native-certs", "tokio 0.2.23", "tokio-rustls", @@ -2414,6 +2414,22 @@ dependencies = [ "libc", ] +[[package]] +name = "if-watch" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d7c5e361e6b05c882b4847dd98992534cebc6fcde7f4bc98225bcf10fd6d0d" +dependencies = [ + "async-io", + "futures 0.3.8", + "futures-lite", + "if-addrs", + "ipnet", + "libc", + "log", + "winapi 0.3.9", +] + [[package]] name = "impl-codec" version = "0.4.2" @@ -2780,9 +2796,9 @@ checksum = "3576a87f2ba00f6f106fdfcd16db1d698d648a26ad8e0573cad8537c3c362d2a" [[package]] name = "libc" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" +checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" [[package]] name = "libloading" @@ -2802,9 +2818,9 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" [[package]] name = "libp2p" -version = "0.31.2" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "724846a3194368fefcac7ebdab12e01b8ac382e3efe399ddbd28851ab34f396f" +checksum = "fac71e0cd4ba56b06464c3669bdfe893dd6c14f05f7ed1ba0965b1bc5933ee71" dependencies = [ "atomic", "bytes 0.5.6", @@ -2993,24 +3009,23 @@ dependencies = [ [[package]] name = "libp2p-mdns" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4458ec36b5ab2662fb4d5c8bb9b6e1591da0ab6efe8881c7a7670ef033bc8937" +checksum = "7b934ee03a361f317df7d75defa4177b285534c58f49d5e6e240278e13ef3f65" dependencies = [ - "async-std", + "async-io", "data-encoding", "dns-parser", - "either", "futures 0.3.8", + "if-watch", "lazy_static", "libp2p-core", "libp2p-swarm", "log", - "net2", "rand 0.7.3", "smallvec 1.5.0", + "socket2", "void", - "wasm-timer", ] [[package]] @@ -3101,9 +3116,9 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e952dcc9d2d7e7e45ae8bfcff255723091bd43e3e9a7741a0af8a17fe55b3ed" +checksum = "bd96c3580fe59a9379ac7906c2f61c7f5ad3b7515362af0e72153a7cc9a45550" dependencies = [ "async-trait", "bytes 0.5.6", @@ -3179,9 +3194,9 @@ dependencies = [ [[package]] name = "libp2p-websocket" -version = "0.26.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5736e2fccdcea6e728bbaf903bddc113be223313ce2c756ad9fe43b5a2b0f06" +checksum = "046031ad8ade16f2f0547350e4b2cea36c78cb10426e9c0d9eab35fa9943b969" dependencies = [ "async-tls", "either", @@ -3189,12 +3204,12 @@ dependencies = [ "libp2p-core", "log", "quicksink", - "rustls", + "rustls 0.19.0", "rw-stream-sink", "soketto", "url 2.2.0", "webpki", - "webpki-roots 0.21.0", + "webpki-roots", ] [[package]] @@ -6258,6 +6273,19 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "064fd21ff87c6e87ed4506e68beb42459caa4a0e2eb144932e6776768556980b" +dependencies = [ + "base64 0.13.0", + "log", + "ring", + "sct", + "webpki", +] + [[package]] name = "rustls-native-certs" version = "0.4.0" @@ -6265,7 +6293,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "629d439a7672da82dd955498445e496ee2096fe2117b9f796558a43fdb9e59b8" dependencies = [ "openssl-probe", - "rustls", + "rustls 0.18.1", "schannel", "security-framework", ] @@ -7892,11 +7920,11 @@ dependencies = [ [[package]] name = "socket2" -version = "0.3.16" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fd8b795c389288baa5f355489c65e71fd48a02104600d15c4cfbc561e9e429d" +checksum = "2c29947abdee2a218277abeca306f25789c938e500ea5a9d4b12a5a504466902" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "libc", "redox_syscall", "winapi 0.3.9", @@ -9399,7 +9427,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12831b255bcfa39dc0436b01e19fea231a37db570686c06ee72c423479f889a" dependencies = [ "futures-core", - "rustls", + "rustls 0.18.1", "tokio 0.2.23", "webpki", ] @@ -10255,15 +10283,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "webpki-roots" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f20dea7535251981a9670857150d571846545088359b28e4951d350bdaf179f" -dependencies = [ - "webpki", -] - [[package]] name = "webpki-roots" version = "0.21.0" diff --git a/bin/node/browser-testing/Cargo.toml b/bin/node/browser-testing/Cargo.toml index f1cad30aede17..53f83b8fcaa72 100644 --- a/bin/node/browser-testing/Cargo.toml +++ b/bin/node/browser-testing/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" [dependencies] futures-timer = "3.0.2" -libp2p = { version = "0.31.1", default-features = false } +libp2p = { version = "0.32.0", default-features = false } jsonrpc-core = "15.0.0" serde = "1.0.106" serde_json = "1.0.48" diff --git a/client/authority-discovery/Cargo.toml b/client/authority-discovery/Cargo.toml index 4cd2dae1388a1..90f8f229b4da9 100644 --- a/client/authority-discovery/Cargo.toml +++ b/client/authority-discovery/Cargo.toml @@ -23,7 +23,7 @@ derive_more = "0.99.2" either = "1.5.3" futures = "0.3.4" futures-timer = "3.0.1" -libp2p = { version = "0.31.2", default-features = false, features = ["kad"] } +libp2p = { version = "0.32.0", default-features = false, features = ["kad"] } log = "0.4.8" prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0"} prost = "0.6.1" diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index f323f1940b181..ece015cc4bc36 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -19,7 +19,7 @@ regex = "1.4.2" tokio = { version = "0.2.21", features = [ "signal", "rt-core", "rt-threaded", "blocking" ] } futures = "0.3.4" fdlimit = "0.2.1" -libp2p = "0.31.2" +libp2p = "0.32.0" parity-scale-codec = "1.3.0" hex = "0.4.2" rand = "0.7.3" diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index bbbb83f206166..b1ae4aa31c934 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -17,7 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] futures = "0.3.4" futures-timer = "3.0.1" -libp2p = { version = "0.31.2", default-features = false } +libp2p = { version = "0.32.0", default-features = false } log = "0.4.8" lru = "0.6.1" sc-network = { version = "0.8.0", path = "../network" } diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 0b8d3da928f5e..6c40b08ed848a 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -64,13 +64,13 @@ wasm-timer = "0.2" zeroize = "1.0.0" [dependencies.libp2p] -version = "0.31.2" +version = "0.32.0" default-features = false -features = ["identify", "kad", "mdns-async-std", "mplex", "noise", "ping", "request-response", "tcp-async-std", "websocket", "yamux"] +features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "request-response", "tcp-async-std", "websocket", "yamux"] [dev-dependencies] assert_matches = "1.3" -libp2p = { version = "0.31.2", default-features = false } +libp2p = { version = "0.32.0", default-features = false } quickcheck = "0.9.0" rand = "0.7.2" sp-keyring = { version = "2.0.0", path = "../../primitives/keyring" } diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index b2914a5e0a72b..8b9e321ca599b 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -88,7 +88,7 @@ pub enum BehaviourOut { protocol: Cow<'static, str>, /// If `Ok`, contains the time elapsed between when we received the request and when we /// sent back the response. If `Err`, the error that happened. - result: Result, + result: Result, ResponseFailure>, }, /// A request initiated using [`Behaviour::send_request`] has succeeded or failed. @@ -417,7 +417,7 @@ impl NetworkBehaviourEventProcess { diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index e65d557a7bdbc..b2517efb6607e 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -59,8 +59,6 @@ use libp2p::kad::handler::KademliaHandlerProto; use libp2p::kad::QueryId; use libp2p::kad::record::{self, store::{MemoryStore, RecordStore}}; #[cfg(not(target_os = "unknown"))] -use libp2p::swarm::toggle::Toggle; -#[cfg(not(target_os = "unknown"))] use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::multiaddr::Protocol; use log::{debug, info, trace, warn}; @@ -206,15 +204,9 @@ impl DiscoveryConfig { discovery_only_if_under_num, #[cfg(not(target_os = "unknown"))] mdns: if enable_mdns { - match Mdns::new() { - Ok(mdns) => Some(mdns).into(), - Err(err) => { - warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err); - None.into() - } - } + MdnsWrapper::Instantiating(Mdns::new().boxed()) } else { - None.into() + MdnsWrapper::Disabled }, allow_non_globals_in_dht, known_external_addresses: LruHashSet::new( @@ -234,7 +226,7 @@ pub struct DiscoveryBehaviour { kademlias: HashMap>, /// Discovers nodes on the local network. #[cfg(not(target_os = "unknown"))] - mdns: Toggle, + mdns: MdnsWrapper, /// Stream that fires when we need to perform the next random Kademlia query. next_kad_random_query: Delay, /// After `next_kad_random_query` triggers, the next one triggers after this duration. @@ -785,6 +777,48 @@ fn protocol_name_from_protocol_id(id: &ProtocolId) -> Vec { v } +/// [`Mdns::new`] returns a future. Instead of forcing [`DiscoveryConfig::finish`] and all its +/// callers to be async, lazily instantiate [`Mdns`]. +#[cfg(not(target_os = "unknown"))] +enum MdnsWrapper { + Instantiating(futures::future::BoxFuture<'static, std::io::Result>), + Ready(Mdns), + Disabled, +} + +#[cfg(not(target_os = "unknown"))] +impl MdnsWrapper { + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + match self { + MdnsWrapper::Instantiating(_) => Vec::new(), + MdnsWrapper::Ready(mdns) => mdns.addresses_of_peer(peer_id), + MdnsWrapper::Disabled => Vec::new(), + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll> { + loop { + match self { + MdnsWrapper::Instantiating(fut) => { + *self = match futures::ready!(fut.as_mut().poll(cx)) { + Ok(mdns) => MdnsWrapper::Ready(mdns), + Err(err) => { + warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err); + MdnsWrapper::Disabled + }, + } + } + MdnsWrapper::Ready(mdns) => return mdns.poll(cx, params), + MdnsWrapper::Disabled => return Poll::Pending, + } + } + } +} + #[cfg(test)] mod tests { use crate::config::ProtocolId; diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 69a2ffda1c89a..a410ae0dff559 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -48,9 +48,10 @@ use libp2p::{ PollParameters, ProtocolsHandler, }, }; +use lru::LruCache; use std::{ borrow::Cow, collections::{hash_map::Entry, HashMap}, convert::TryFrom as _, io, iter, - pin::Pin, task::{Context, Poll}, time::Duration, + pin::Pin, task::{Context, Poll}, time::{Duration, Instant}, }; pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; @@ -128,7 +129,10 @@ pub enum Event { protocol: Cow<'static, str>, /// If `Ok`, contains the time elapsed between when we received the request and when we /// sent back the response. If `Err`, the error that happened. - result: Result, + /// + /// Note: Given that response time is tracked on a best-effort basis only, `Ok(time)` can be + /// `None`. + result: Result, ResponseFailure>, }, /// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or @@ -154,21 +158,19 @@ pub struct RequestResponsesBehaviour { /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the /// response to send back to the remote. pending_responses: stream::FuturesUnordered< - Pin + Send>> + Pin> + Send>> >, + + /// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here. + pending_responses_arrival_time: LruCache, } /// Generated by the response builder and waiting to be processed. -enum RequestProcessingOutcome { - Response { - protocol: Cow<'static, str>, - inner_channel: ResponseChannel, ()>>, - response: Vec, - }, - Busy { - peer: PeerId, - protocol: Cow<'static, str>, - }, +struct RequestProcessingOutcome { + request_id: RequestId, + protocol: Cow<'static, str>, + inner_channel: ResponseChannel, ()>>, + response: Vec, } impl RequestResponsesBehaviour { @@ -201,7 +203,8 @@ impl RequestResponsesBehaviour { Ok(Self { protocols, - pending_responses: stream::FuturesUnordered::new(), + pending_responses: Default::default(), + pending_responses_arrival_time: LruCache::new(1_000), }) } @@ -347,22 +350,31 @@ impl NetworkBehaviour for RequestResponsesBehaviour { > { 'poll_all: loop { // Poll to see if any response is ready to be sent back. - while let Poll::Ready(Some(result)) = self.pending_responses.poll_next_unpin(cx) { - match result { - RequestProcessingOutcome::Response { - protocol, inner_channel, response - } => { - if let Some((protocol, _)) = self.protocols.get_mut(&*protocol) { - protocol.send_response(inner_channel, Ok(response)); - } - } - RequestProcessingOutcome::Busy { peer, protocol } => { - let out = Event::InboundRequest { - peer, - protocol, - result: Err(ResponseFailure::Busy), - }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { + let RequestProcessingOutcome { + request_id, + protocol: protocol_name, + inner_channel, + response + } = match outcome { + Some(outcome) => outcome, + // The response builder was too busy and thus the request was dropped. This is + // later on reported as a `InboundFailure::Omission`. + None => continue, + }; + + if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { + if let Err(_) = protocol.send_response(inner_channel, Ok(response)) { + // Note: In case this happened due to a timeout, the corresponding + // `RequestResponse` behaviour will emit an `InboundFailure::Timeout` event. + self.pending_responses_arrival_time.pop(&request_id); + log::debug!( + target: "sub-libp2p", + "Failed to send response for {:?} on protocol {:?} due to a \ + timeout or due to the connection to the peer being closed. \ + Dropping response", + request_id, protocol_name, + ); } } } @@ -409,15 +421,21 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Received a request from a remote. RequestResponseEvent::Message { peer, - message: RequestResponseMessage::Request { request, channel, .. }, + message: RequestResponseMessage::Request { request_id, request, channel, .. }, } => { + self.pending_responses_arrival_time.put( + request_id.clone(), + Instant::now(), + ); + let (tx, rx) = oneshot::channel(); // Submit the request to the "response builder" passed by the user at // initialization. if let Some(resp_builder) = resp_builder { - // If the response builder is too busy, silently drop `tx`. - // This will be reported as a `Busy` error. + // If the response builder is too busy, silently drop `tx`. This + // will be reported by the corresponding `RequestResponse` through + // an `InboundFailure::Omission` event. let _ = resp_builder.try_send(IncomingRequest { peer: peer.clone(), payload: request, @@ -428,13 +446,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour { let protocol = protocol.clone(); self.pending_responses.push(Box::pin(async move { // The `tx` created above can be dropped if we are not capable of - // processing this request, which is reflected as a "Busy" error. + // processing this request, which is reflected as a + // `InboundFailure::Omission` event. if let Ok(response) = rx.await { - RequestProcessingOutcome::Response { - protocol, inner_channel: channel, response - } + Some(RequestProcessingOutcome { + request_id, protocol, inner_channel: channel, response + }) } else { - RequestProcessingOutcome::Busy { peer, protocol } + None } })); @@ -445,11 +464,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Received a response from a remote to one of our requests. RequestResponseEvent::Message { - message: - RequestResponseMessage::Response { - request_id, - response, - }, + message: RequestResponseMessage::Response { + request_id, + response, + }, .. } => { let out = Event::RequestFinished { @@ -472,8 +490,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); } - // Remote has tried to send a request but failed. - RequestResponseEvent::InboundFailure { peer, error, .. } => { + // An inbound request failed, either while reading the request or due to failing + // to send a response. + RequestResponseEvent::InboundFailure { request_id, peer, error, .. } => { + self.pending_responses_arrival_time.pop(&request_id); let out = Event::InboundRequest { peer, protocol: protocol.clone(), @@ -481,6 +501,24 @@ impl NetworkBehaviour for RequestResponsesBehaviour { }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); } + RequestResponseEvent::ResponseSent { request_id, peer } => { + let arrival_time = self.pending_responses_arrival_time.pop(&request_id) + .map(|t| t.elapsed()); + if arrival_time.is_none() { + log::debug!( + "Expected to find arrival time for sent response. Is the LRU \ + cache size set too small?", + ); + } + + let out = Event::InboundRequest { + peer, + protocol: protocol.clone(), + result: Ok(arrival_time), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + + } }; } } @@ -520,8 +558,6 @@ pub enum RequestFailure { /// Error when processing a request sent by a remote. #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum ResponseFailure { - /// Internal response builder is too busy to process this request. - Busy, /// Problem on the network. #[display(fmt = "Problem on the network")] Network(#[error(ignore)] InboundFailure), @@ -655,7 +691,10 @@ impl RequestResponseCodec for GenericCodec { #[cfg(test)] mod tests { - use futures::{channel::mpsc, prelude::*}; + use futures::channel::mpsc; + use futures::executor::LocalPool; + use futures::prelude::*; + use futures::task::Spawn; use libp2p::identity::Keypair; use libp2p::Multiaddr; use libp2p::core::upgrade; @@ -666,7 +705,8 @@ mod tests { #[test] fn basic_request_response_works() { - let protocol_name = "/test/req-rep/1"; + let protocol_name = "/test/req-resp/1"; + let mut pool = LocalPool::new(); // Build swarms whose behaviour is `RequestResponsesBehaviour`. let mut swarms = (0..2) @@ -694,12 +734,12 @@ mod tests { inbound_queue: Some(tx), })).unwrap(); - async_std::task::spawn(async move { + pool.spawner().spawn_obj(async move { while let Some(rq) = rx.next().await { assert_eq!(rq.payload, b"this is a request"); let _ = rq.pending_response.send(b"this is a response".to_vec()); } - }); + }.boxed().into()).unwrap(); b }; @@ -719,26 +759,24 @@ mod tests { Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap(); } - // Running `swarm[0]` in the background until a `InboundRequest` event happens, - // which is a hint about the test having ended. - async_std::task::spawn({ + // Running `swarm[0]` in the background. + pool.spawner().spawn_obj({ let (mut swarm, _) = swarms.remove(0); async move { loop { match swarm.next_event().await { SwarmEvent::Behaviour(super::Event::InboundRequest { result, .. }) => { - assert!(result.is_ok()); - break + result.unwrap(); }, _ => {} } } - } - }); + }.boxed().into() + }).unwrap(); // Remove and run the remaining swarm. let (mut swarm, _) = swarms.remove(0); - async_std::task::block_on(async move { + pool.run_until(async move { let mut sent_request_id = None; loop { @@ -769,7 +807,8 @@ mod tests { #[test] fn max_response_size_exceeded() { - let protocol_name = "/test/req-rep/1"; + let protocol_name = "/test/req-resp/1"; + let mut pool = LocalPool::new(); // Build swarms whose behaviour is `RequestResponsesBehaviour`. let mut swarms = (0..2) @@ -797,12 +836,12 @@ mod tests { inbound_queue: Some(tx), })).unwrap(); - async_std::task::spawn(async move { + pool.spawner().spawn_obj(async move { while let Some(rq) = rx.next().await { assert_eq!(rq.payload, b"this is a request"); let _ = rq.pending_response.send(b"this response exceeds the limit".to_vec()); } - }); + }.boxed().into()).unwrap(); b }; @@ -824,7 +863,7 @@ mod tests { // Running `swarm[0]` in the background until a `InboundRequest` event happens, // which is a hint about the test having ended. - async_std::task::spawn({ + pool.spawner().spawn_obj({ let (mut swarm, _) = swarms.remove(0); async move { loop { @@ -836,12 +875,12 @@ mod tests { _ => {} } } - } - }); + }.boxed().into() + }).unwrap(); // Remove and run the remaining swarm. let (mut swarm, _) = swarms.remove(0); - async_std::task::block_on(async move { + pool.run_until(async move { let mut sent_request_id = None; loop { diff --git a/client/network/src/service.rs b/client/network/src/service.rs index c59aeb412298f..3a368088e5392 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -1373,19 +1373,21 @@ impl Future for NetworkWorker { Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. })) => { if let Some(metrics) = this.metrics.as_ref() { match result { - Ok(serve_time) => { + Ok(Some(serve_time)) => { metrics.requests_in_success_total .with_label_values(&[&protocol]) .observe(serve_time.as_secs_f64()); } + // Response time tracking is happening on a best-effort basis. Ignore + // the event in case response time could not be provided. + Ok(None) => {}, Err(err) => { let reason = match err { - ResponseFailure::Busy => "busy", ResponseFailure::Network(InboundFailure::Timeout) => "timeout", ResponseFailure::Network(InboundFailure::UnsupportedProtocols) => "unsupported", - ResponseFailure::Network(InboundFailure::ConnectionClosed) => - "connection-closed", + ResponseFailure::Network(InboundFailure::ResponseOmission) => + "busy-omitted", }; metrics.requests_in_failure_total diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index 9640ca9ae8ccf..fea6ead4f7079 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -19,7 +19,7 @@ parking_lot = "0.10.0" futures = "0.3.4" futures-timer = "3.0.1" rand = "0.7.2" -libp2p = { version = "0.31.2", default-features = false } +libp2p = { version = "0.32.0", default-features = false } sp-consensus = { version = "0.8.0", path = "../../../primitives/consensus/common" } sc-consensus = { version = "0.8.0", path = "../../../client/consensus/common" } sc-client-api = { version = "2.0.0", path = "../../api" } diff --git a/client/peerset/Cargo.toml b/client/peerset/Cargo.toml index d3f782bb94514..1b74a8099b69a 100644 --- a/client/peerset/Cargo.toml +++ b/client/peerset/Cargo.toml @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] futures = "0.3.4" -libp2p = { version = "0.31.2", default-features = false } +libp2p = { version = "0.32.0", default-features = false } sp-utils = { version = "2.0.0", path = "../../primitives/utils"} log = "0.4.8" serde_json = "1.0.41" diff --git a/client/telemetry/Cargo.toml b/client/telemetry/Cargo.toml index 58f2a06629361..aa585fb8f4d82 100644 --- a/client/telemetry/Cargo.toml +++ b/client/telemetry/Cargo.toml @@ -19,7 +19,7 @@ parking_lot = "0.10.0" futures = "0.3.4" futures-timer = "3.0.1" wasm-timer = "0.2.5" -libp2p = { version = "0.31.2", default-features = false, features = ["dns", "tcp-async-std", "wasm-ext", "websocket"] } +libp2p = { version = "0.32.0", default-features = false, features = ["dns", "tcp-async-std", "wasm-ext", "websocket"] } log = "0.4.8" pin-project = "0.4.6" rand = "0.7.2" diff --git a/primitives/consensus/common/Cargo.toml b/primitives/consensus/common/Cargo.toml index 375e976ce5b71..cf9804f86f260 100644 --- a/primitives/consensus/common/Cargo.toml +++ b/primitives/consensus/common/Cargo.toml @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] thiserror = "1.0.21" -libp2p = { version = "0.31.2", default-features = false } +libp2p = { version = "0.32.0", default-features = false } log = "0.4.8" sp-core = { path= "../../core", version = "2.0.0"} sp-inherents = { version = "2.0.0", path = "../../inherents" }