Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b448b6d
Patch libp2p
kpp May 24, 2022
bb7abd9
Build QUIC transport
kpp May 24, 2022
450d260
Pass quic_port through cli args
kpp May 24, 2022
f052690
Build quic transport from cli args
kpp May 24, 2022
4d61feb
Review fixes: ipv6 :: addr
kpp May 26, 2022
5504f4c
Merge branch 'master' into kpp-quic
kpp Jul 12, 2022
f661801
Upgrade libp2p to 0.47.0
kpp Jul 12, 2022
0b56dc1
Merge branch 'master' into kpp-quic
kpp Jul 18, 2022
81bdbdf
upgrade libp2p to 0.50.0
melekes Nov 18, 2022
21cf90c
on_swarm_event and on_connection_handler_event
melekes Nov 21, 2022
8a227e4
replace `Swarm::new` with `Swarm::with_threadpool_executor`
melekes Nov 21, 2022
8a97a52
on_swarm_event and on_connection_handler_event part 2
melekes Nov 21, 2022
e9b731b
on_swarm_event and on_connection_handler_event part 3
melekes Nov 21, 2022
d27d73a
on_swarm_event and on_connection_handler_event part 4
melekes Nov 21, 2022
672cac2
update libp2p
melekes Nov 21, 2022
41ab633
Merge branch 'master' into anton/upgrade-libp2p-to-0.50.0
melekes Nov 28, 2022
2fa1b2b
libp2p 0.50.0
melekes Nov 28, 2022
b3c33b5
rename OutboundQueryCompleted to OutboundQueryProgressed
melekes Nov 28, 2022
20437d7
remove unused var
melekes Nov 28, 2022
dbb7bec
accumulate outbound_query_records until query is finished
melekes Nov 28, 2022
bc53171
Merge branch 'anton/upgrade-libp2p-to-0.50.0' into kpp-quic
kpp Nov 29, 2022
458c7aa
Use --experimental-quic flag for cli
kpp Nov 29, 2022
46c473e
Merge branch 'master' into kpp-quic
kpp Jan 31, 2023
1360b83
Review fixes
kpp Jan 31, 2023
fcc4b51
Update client/network/src/transport.rs
kpp Apr 25, 2023
62ff75e
Merge branch 'master' into kpp-quic
kpp Jun 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' into kpp-quic
  • Loading branch information
kpp committed Jan 31, 2023
commit 46c473ef045bb6b446f2c1fe4171fce5e4d7ec8e
1,748 changes: 664 additions & 1,084 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion client/cli/src/params/network_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct NetworkParams {
#[clap(long)]
pub experimental_quic: bool,

/// Always forbid connecting to private IPv4 addresses (as specified in
/// Always forbid connecting to private IPv4/IPv6 addresses (as specified in
/// [RFC1918](https://tools.ietf.org/html/rfc1918)), unless the address was passed with
/// `--reserved-nodes` or `--bootnodes`. Enabled by default for chains marked as "live" in
/// their chain specifications.
Expand Down
4 changes: 1 addition & 3 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ fnv = "1.0.6"
futures = "0.3.21"
futures-timer = "3.0.2"
ip_network = "0.4.1"
libp2p = { version = "0.50.0", features = ["async-std", "dns", "identify", "kad", "macros", "mdns", "mplex", "noise", "ping", "quic", "tcp", "yamux", "websocket"] }
linked_hash_set = "0.1.3"
linked-hash-map = "0.5.4"
libp2p = { version = "0.50.0", features = ["dns", "identify", "kad", "macros", "mdns", "mplex", "noise", "ping", "quic", "tcp", "tokio", "yamux", "websocket"] }
log = "0.4.17"
lru = "0.8.1"
parking_lot = "0.12.1"
Expand Down
88 changes: 59 additions & 29 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use libp2p::{
GetClosestPeersError, GetRecordOk, Kademlia, KademliaBucketInserts, KademliaConfig,
KademliaEvent, QueryId, QueryResult, Quorum, Record,
},
mdns::{async_io::Behaviour as Mdns, Config as MdnsConfig, Event as MdnsEvent},
mdns::{self, tokio::Behaviour as TokioMdns},
multiaddr::Protocol,
swarm::{
behaviour::{
Expand Down Expand Up @@ -249,7 +249,7 @@ impl DiscoveryConfig {
NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
.expect("value is a constant; constant is non-zero; qed."),
),
outbound_query_records: Vec::new(),
records_to_publish: Default::default(),
}
}
}
Expand Down Expand Up @@ -287,8 +287,12 @@ pub struct DiscoveryBehaviour {
allow_non_globals_in_dht: bool,
/// A cache of discovered external addresses. Only used for logging purposes.
known_external_addresses: LruHashSet<Multiaddr>,
/// A cache of outbound query records.
outbound_query_records: Vec<(record::Key, Vec<u8>)>,
/// Records to publish per QueryId.
///
/// After finishing a Kademlia query, libp2p will return us a list of the closest peers that
/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
/// to these peers.
records_to_publish: HashMap<QueryId, Record>,
}

impl DiscoveryBehaviour {
Expand Down Expand Up @@ -544,8 +548,8 @@ impl NetworkBehaviour for DiscoveryBehaviour {
FromSwarm::ListenerClosed(e) => {
self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
},
FromSwarm::ListenFailure(_) => {
// NetworkBehaviour::inject_listen_failure on Kademlia<MemoryStore> does nothing.
FromSwarm::ListenFailure(e) => {
self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
},
FromSwarm::ListenerError(e) => {
self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
Expand Down Expand Up @@ -692,33 +696,57 @@ impl NetworkBehaviour for DiscoveryBehaviour {
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetRecord(res),
stats,
step,
id,
..
} => {
let ev = match res {
Ok(ok) =>
if let GetRecordOk::FoundRecord(r) = ok {
self.outbound_query_records
.push((r.record.key, r.record.value));
continue
} else {
debug!(
target: "sub-libp2p",
"Libp2p => Query progressed to {:?} step (last: {:?})",
step.count,
step.last,
);
if step.last {
let records =
self.outbound_query_records.drain(..).collect();
DiscoveryOut::ValueFound(
records,
stats.duration().unwrap_or_default(),
)
} else {
Ok(GetRecordOk::FoundRecord(r)) => {
debug!(
target: "sub-libp2p",
"Libp2p => Found record ({:?}) with value: {:?}",
r.record.key,
r.record.value,
);

// Let's directly finish the query, as we are only interested in a
// quorum of 1.
if let Some(kad) = self.kademlia.as_mut() {
if let Some(mut query) = kad.query_mut(&id) {
query.finish();
}
}

// Will be removed below when we receive
// `FinishedWithNoAdditionalRecord`.
self.records_to_publish.insert(id, r.record.clone());

DiscoveryOut::ValueFound(
vec![(r.record.key, r.record.value)],
stats.duration().unwrap_or_default(),
)
},
Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
cache_candidates,
}) => {
// We always need to remove the record to not leak any data!
if let Some(record) = self.records_to_publish.remove(&id) {
if cache_candidates.is_empty() {
continue
}
},

// Put the record to the `cache_candidates` that are nearest to
// the record key from our point of view of the network.
if let Some(kad) = self.kademlia.as_mut() {
kad.put_record_to(
record,
cache_candidates.into_iter().map(|v| v.1),
Quorum::One,
);
}
}

continue
},
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(
target: "sub-libp2p",
Expand Down Expand Up @@ -927,10 +955,12 @@ mod tests {
config.finish()
};

let mut swarm = Swarm::with_threadpool_executor(
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut swarm = Swarm::with_executor(
transport,
behaviour,
keypair.public().to_peer_id(),
TokioExecutor(runtime),
);
let listen_addr: Multiaddr =
format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
Expand Down
7 changes: 5 additions & 2 deletions client/network/src/protocol/notifications/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ use libp2p::{
core::{connection::ConnectionId, transport::MemoryTransport, upgrade},
identity, noise,
swarm::{
behaviour::FromSwarm, ConnectionHandler, IntoConnectionHandler, NetworkBehaviour,
behaviour::FromSwarm, ConnectionHandler, Executor, IntoConnectionHandler, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, Swarm, SwarmEvent,
},
yamux, Multiaddr, PeerId, Transport,
};
use std::{
iter,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
Expand Down Expand Up @@ -103,10 +104,12 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
.collect(),
};

let mut swarm = Swarm::with_threadpool_executor(
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut swarm = Swarm::with_executor(
transport,
behaviour,
keypairs[index].public().to_peer_id(),
TokioExecutor(runtime),
);
swarm.listen_on(addrs[index].clone()).unwrap();
out.push(swarm);
Expand Down
39 changes: 27 additions & 12 deletions client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,21 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
)
}
},
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) =>
for (p, _) in self.protocols.values_mut() {
let handler = p.new_handler();
NetworkBehaviour::on_swarm_event(
p,
FromSwarm::DialFailure(DialFailure { peer_id, handler, error }),
);
FromSwarm::DialFailure(DialFailure { peer_id, error, handler }) =>
for (p_name, p_handler) in handler.into_iter() {
if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
proto.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler: p_handler,
error,
}));
} else {
log::error!(
target: "sub-libp2p",
"on_swarm_event/dial_failure: no request-response instance registered for protocol {:?}",
p_name,
)
}
},
FromSwarm::ListenerClosed(e) =>
for (p, _) in self.protocols.values_mut() {
Expand Down Expand Up @@ -414,9 +422,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
return proto.on_connection_handler_event(peer_id, connection_id, event)
}

log::warn!(target: "sub-libp2p",
"on_connection_handler_event: no request-response instance registered for protocol {:?}",
p_name)
log::warn!(
target: "sub-libp2p",
"on_connection_handler_event: no request-response instance registered for protocol {:?}",
p_name
);
}

fn poll(
Expand Down Expand Up @@ -951,8 +961,13 @@ mod tests {

let behaviour = RequestResponsesBehaviour::new(list, handle).unwrap();

let mut swarm =
Swarm::with_threadpool_executor(transport, behaviour, keypair.public().to_peer_id());
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut swarm = Swarm::with_executor(
transport,
behaviour,
keypair.public().to_peer_id(),
TokioExecutor(runtime),
);
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

swarm.listen_on(listen_addr.clone()).unwrap();
Expand Down
15 changes: 9 additions & 6 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
transport, ChainSyncInterface, ReputationChange,
};

use futures::{channel::oneshot, executor::ThreadPoolBuilder, prelude::*};
use futures::{channel::oneshot, prelude::*};
use libp2p::{
core::{either::EitherError, upgrade, ConnectedPoint},
identify::Info as IdentifyInfo,
Expand Down Expand Up @@ -375,17 +375,19 @@ where
}
};

let builder = if let Some(spawner) = params.executor {
let builder = {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
SwarmBuilder::with_executor(transport, behaviour, local_peer_id, SpawnImpl(spawner))
} else {
let tp = ThreadPoolBuilder::new().name_prefix("libp2p-swarm-task-").create()?;
SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp)
SwarmBuilder::with_executor(
transport,
behaviour,
local_peer_id,
SpawnImpl(params.executor),
)
};
let builder = builder
.connection_limits(
Expand All @@ -399,6 +401,7 @@ where
.notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
.connection_event_buffer_size(1024)
.max_negotiating_inbound_streams(2048);

(builder.build(), bandwidth)
};

Expand Down
19 changes: 10 additions & 9 deletions client/network/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn build_quic_transport(
keypair: &identity::Keypair,
) -> Boxed<(PeerId, StreamMuxerBox)> {
let config = QuicConfig::new(&keypair);
let transport = QuicTransport::<quic::async_std::Provider>::new(config)
let transport = QuicTransport::<quic::tokio::Provider>::new(config)
.map(|(p, c), _| (p, StreamMuxerBox::new(c)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Fixed

.boxed();

Expand Down Expand Up @@ -71,11 +71,11 @@ pub fn build_transport(
) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc<BandwidthSinks>) {
// Build the base layer of the transport.
let transport = if !memory_only {
// Main transport: DNS(TCP)
let tcp_config = tcp::Config::new().nodelay(true);
let desktop_trans = tcp::async_io::Transport::new(tcp_config.clone());
let desktop_trans = websocket::WsConfig::new(desktop_trans)
.or_transport(tcp::async_io::Transport::new(tcp_config.clone()));
let dns_init = futures::executor::block_on(dns::DnsConfig::system(desktop_trans));
let tcp_trans = tcp::tokio::Transport::new(tcp_config.clone());
let dns_init = dns::TokioDnsConfig::system(tcp_trans);

EitherTransport::Left(if let Ok(dns) = dns_init {
// WS + WSS transport
//
Expand All @@ -87,10 +87,11 @@ pub fn build_transport(
.expect("same system_conf & resolver to work");
EitherTransport::Left(websocket::WsConfig::new(dns_for_wss).or_transport(dns))
} else {
let desktop_trans = tcp::async_io::Transport::new(tcp_config.clone());
let desktop_trans = websocket::WsConfig::new(desktop_trans)
.or_transport(tcp::async_io::Transport::new(tcp_config));
EitherTransport::Right(desktop_trans.map_err(dns::DnsErr::Transport))
// In case DNS can't be constructed, fallback to TCP + WS (WSS won't work)
let tcp_trans = tcp::tokio::Transport::new(tcp_config.clone());
let desktop_trans = websocket::WsConfig::new(tcp_trans)
.or_transport(tcp::tokio::Transport::new(tcp_config));
EitherTransport::Right(desktop_trans)
})
} else {
EitherTransport::Right(OptionalTransport::some(
Expand Down
1 change: 0 additions & 1 deletion client/network/transactions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ targets = ["x86_64-unknown-linux-gnu"]
array-bytes = "4.1"
codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] }
futures = "0.3.21"
hex = "0.4.0"
libp2p = "0.50.0"
log = "0.4.17"
pin-project = "1.0.12"
Expand Down
2 changes: 1 addition & 1 deletion client/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
chrono = "0.4.19"
futures = "0.3.21"
libp2p = { version = "0.50.0", features = ["async-std", "dns", "tcp", "wasm-ext", "websocket"] }
libp2p = { version = "0.50.0", features = ["dns", "tcp", "tokio", "wasm-ext", "websocket"] }
log = "0.4.17"
parking_lot = "0.12.1"
pin-project = "1.0.12"
Expand Down
4 changes: 2 additions & 2 deletions client/telemetry/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const CONNECT_TIMEOUT: Duration = Duration::from_secs(20);

pub(crate) fn initialize_transport() -> Result<WsTrans, io::Error> {
let transport = {
let tcp_transport = libp2p::tcp::async_io::Transport::new(libp2p::tcp::Config::new());
let inner = block_on(libp2p::dns::DnsConfig::system(tcp_transport))?;
let tcp_transport = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new());
let inner = libp2p::dns::TokioDnsConfig::system(tcp_transport)?;
libp2p::websocket::framed::WsConfig::new(inner).and_then(|connec, _| {
let connec = connec
.with(|item| {
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.