Skip to content

Commit 2f924d9

Browse files
authored
refactor: remove Endpoint::conn_type (#3647)
## Description * Remove `Endpoint::conn_type` * Update `transfer.rs` example to use `Connection::paths` instead * Change return type of `Connection::paths` to have more guarantees on the return type (Send, Unpin, 'static) ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist <!-- Remove any that are not relevant. --> - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. - [ ] List all breaking changes in the above "Breaking Changes" section. - [ ] Open an issue or PR on any number0 repos that are affected by this breaking change. Give guidance on how the updates should be handled or do the actual updates themselves. The major ones are: - [ ] [`quic-rpc`](https://github.com/n0-computer/quic-rpc) - [ ] [`iroh-gossip`](https://github.com/n0-computer/iroh-gossip) - [ ] [`iroh-blobs`](https://github.com/n0-computer/iroh-blobs) - [ ] [`dumbpipe`](https://github.com/n0-computer/dumbpipe) - [ ] [`sendme`](https://github.com/n0-computer/sendme)
1 parent 50fdda3 commit 2f924d9

File tree

6 files changed

+38
-96
lines changed

6 files changed

+38
-96
lines changed

iroh/examples/transfer.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,16 @@ use data_encoding::HEXLOWER;
1010
use indicatif::HumanBytes;
1111
use iroh::{
1212
Endpoint, EndpointAddr, EndpointId, RelayMap, RelayMode, RelayUrl, SecretKey, TransportAddr,
13+
Watcher,
1314
discovery::{
1415
dns::DnsDiscovery,
1516
pkarr::{N0_DNS_PKARR_RELAY_PROD, N0_DNS_PKARR_RELAY_STAGING, PkarrPublisher},
1617
},
1718
dns::{DnsResolver, N0_DNS_ENDPOINT_ORIGIN_PROD, N0_DNS_ENDPOINT_ORIGIN_STAGING},
18-
endpoint::ConnectionError,
19+
endpoint::{ConnectionError, PathInfoList},
1920
};
2021
use n0_error::{Result, StackResultExt, StdResultExt};
2122
use n0_future::task::AbortOnDropHandle;
22-
use n0_watcher::Watcher as _;
2323
use tokio_stream::StreamExt;
2424
use tracing::{info, warn};
2525
use url::Url;
@@ -337,7 +337,6 @@ async fn provide(endpoint: Endpoint, size: u64) -> Result<()> {
337337
}
338338
};
339339
// spawn a task to handle reading and writing off of the connection
340-
let endpoint_clone = endpoint.clone();
341340
tokio::spawn(async move {
342341
let conn = accepting.await.anyerr()?;
343342
let endpoint_id = conn.remote_id();
@@ -350,7 +349,7 @@ async fn provide(endpoint: Endpoint, size: u64) -> Result<()> {
350349
println!("[{remote}] Connected");
351350

352351
// Spawn a background task that prints connection type changes. Will be aborted on drop.
353-
let _guard = watch_conn_type(&endpoint_clone, endpoint_id);
352+
let _guard = watch_conn_type(conn.remote_id(), conn.paths());
354353

355354
// accept a bi-directional QUIC connection
356355
// use the `quinn` APIs to send and recv content
@@ -404,7 +403,7 @@ async fn fetch(endpoint: Endpoint, remote_addr: EndpointAddr) -> Result<()> {
404403
let conn = endpoint.connect(remote_addr, TRANSFER_ALPN).await?;
405404
println!("Connected to {}", remote_id);
406405
// Spawn a background task that prints connection type changes. Will be aborted on drop.
407-
let _guard = watch_conn_type(&endpoint, remote_id);
406+
let _guard = watch_conn_type(conn.remote_id(), conn.paths());
408407

409408
// Use the Quinn API to send and recv content.
410409
let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
@@ -521,14 +520,36 @@ fn parse_byte_size(s: &str) -> std::result::Result<u64, parse_size::Error> {
521520
cfg.parse_size(s)
522521
}
523522

524-
fn watch_conn_type(endpoint: &Endpoint, endpoint_id: EndpointId) -> AbortOnDropHandle<()> {
525-
let mut stream = endpoint.conn_type(endpoint_id).unwrap().stream();
523+
fn watch_conn_type(
524+
endpoint_id: EndpointId,
525+
paths_watcher: impl Watcher<Value = PathInfoList> + Send + Unpin + 'static,
526+
) -> AbortOnDropHandle<()> {
527+
let id = endpoint_id.fmt_short();
526528
let task = tokio::task::spawn(async move {
527-
while let Some(conn_type) = stream.next().await {
528-
println!(
529-
"[{}] Connection type changed to: {conn_type}",
530-
endpoint_id.fmt_short()
531-
);
529+
let mut stream = paths_watcher.stream();
530+
let mut previous = None;
531+
while let Some(paths) = stream.next().await {
532+
if let Some(path) = paths.iter().find(|p| p.is_selected()) {
533+
// We can get path updates without the selected path changing. We don't want to log again in that case.
534+
if Some(path) == previous.as_ref() {
535+
continue;
536+
}
537+
println!(
538+
"[{id}] Connection type changed to: {:?} (RTT: {:?})",
539+
path.remote_addr(),
540+
path.rtt()
541+
);
542+
previous = Some(path.clone());
543+
} else if !paths.is_empty() {
544+
println!(
545+
"[{id}] Connection type changed to: mixed ({} paths)",
546+
paths.len()
547+
);
548+
previous = None;
549+
} else {
550+
println!("[{id}] Connection type changed to none (no active transmission paths)",);
551+
previous = None;
552+
}
532553
}
533554
});
534555
AbortOnDropHandle::new(task)

iroh/src/endpoint.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use tracing::{debug, instrument, trace, warn};
2525
use url::Url;
2626

2727
pub use super::magicsock::{
28-
AddEndpointAddrError, ConnectionType, DirectAddr, DirectAddrType, PathInfo,
28+
AddEndpointAddrError, DirectAddr, DirectAddrType, PathInfo,
2929
endpoint_map::{PathInfoList, Source},
3030
};
3131
#[cfg(wasm_browser)]
@@ -963,33 +963,6 @@ impl Endpoint {
963963
//
964964
// Partially they return things passed into the builder.
965965

966-
/// Returns a [`Watcher`] that reports the current connection type and any changes for
967-
/// given remote endpoint.
968-
///
969-
/// This watcher allows observing a stream of [`ConnectionType`] items by calling
970-
/// [`Watcher::stream()`]. If the underlying connection to a remote endpoint changes, it will
971-
/// yield a new item. These connection changes are when the connection switches between
972-
/// using the Relay server and a direct connection.
973-
///
974-
/// Note that this does not guarantee each connection change is yielded in the stream.
975-
/// If the connection type changes several times before this stream is polled, only the
976-
/// last recorded state is returned. This can be observed e.g. right at the start of a
977-
/// connection when the switch from a relayed to a direct connection can be so fast that
978-
/// the relayed state is never exposed.
979-
///
980-
/// If there is currently a connection with the remote endpoint, then using [`Watcher::get`]
981-
/// will immediately return either [`ConnectionType::Relay`], [`ConnectionType::Direct`]
982-
/// or [`ConnectionType::Mixed`].
983-
///
984-
/// It is possible for the connection type to be [`ConnectionType::None`] if you've
985-
/// recently connected to this endpoint id but previous methods of reaching the endpoint have
986-
/// become inaccessible.
987-
///
988-
/// Will return `None` if we do not have any address information for the given `endpoint_id`.
989-
pub fn conn_type(&self, endpoint_id: EndpointId) -> Option<n0_watcher::Direct<ConnectionType>> {
990-
self.msock.conn_type(endpoint_id)
991-
}
992-
993966
/// Returns the currently lowest latency for this endpoint.
994967
///
995968
/// Will return `None` if we do not have any address information for the given `endpoint_id`.

iroh/src/endpoint/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1462,7 +1462,7 @@ impl Connection {
14621462
///
14631463
/// [`PathInfo::is_selected`]: crate::magicsock::PathInfo::is_selected
14641464
/// [`PathInfo`]: crate::magicsock::PathInfo
1465-
pub fn paths(&self) -> impl Watcher<Value = PathInfoList> {
1465+
pub fn paths(&self) -> impl Watcher<Value = PathInfoList> + Unpin + Send + Sync + 'static {
14661466
self.paths.watch(self.inner.weak_handle())
14671467
}
14681468

iroh/src/magicsock.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,7 @@ pub(crate) mod transports;
7777

7878
use mapped_addrs::{EndpointIdMappedAddr, MappedAddr};
7979

80-
pub use self::{
81-
endpoint_map::{ConnectionType, PathInfo},
82-
metrics::Metrics,
83-
};
80+
pub use self::{endpoint_map::PathInfo, metrics::Metrics};
8481

8582
// TODO: Use this
8683
// /// How long we consider a QAD-derived endpoint valid for. UDP NAT mappings typically
@@ -394,20 +391,6 @@ impl MagicSock {
394391
})
395392
}
396393

397-
/// Returns a [`n0_watcher::Direct`] that reports the [`ConnectionType`] we have to the
398-
/// given `endpoint_id`.
399-
///
400-
/// This gets us a copy of the [`n0_watcher::Direct`] for the [`Watchable`] with a
401-
/// [`ConnectionType`] that the `EndpointMap` stores for each `endpoint_id`'s endpoint.
402-
///
403-
/// # Errors
404-
///
405-
/// Will return `None` if there is no address information known about the
406-
/// given `endpoint_id`.
407-
pub(crate) fn conn_type(&self, eid: EndpointId) -> Option<n0_watcher::Direct<ConnectionType>> {
408-
self.endpoint_map.conn_type(eid)
409-
}
410-
411394
// TODO: Build better info to expose to the user about remote nodes. We probably want
412395
// to expose this as part of path information instead.
413396
pub(crate) async fn latency(&self, eid: EndpointId) -> Option<Duration> {

iroh/src/magicsock/endpoint_map.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ mod path_state;
2525

2626
pub(super) use endpoint_state::EndpointStateMessage;
2727
pub(crate) use endpoint_state::PathsWatchable;
28-
pub use endpoint_state::{ConnectionType, PathInfo, PathInfoList};
2928
use endpoint_state::{EndpointStateActor, EndpointStateHandle};
29+
pub use endpoint_state::{PathInfo, PathInfoList};
3030

3131
// TODO: use this
3232
// /// Number of endpoints that are inactive for which we keep info about. This limit is enforced
@@ -107,19 +107,6 @@ impl EndpointMap {
107107
self.endpoint_mapped_addrs.get(&eid)
108108
}
109109

110-
/// Returns a [`n0_watcher::Direct`] for given endpoint's [`ConnectionType`].
111-
///
112-
/// # Errors
113-
///
114-
/// Will return `None` if there is not an entry in the [`EndpointMap`] for
115-
/// the `endpoint_id`
116-
pub(super) fn conn_type(
117-
&self,
118-
_endpoint_id: EndpointId,
119-
) -> Option<n0_watcher::Direct<ConnectionType>> {
120-
todo!();
121-
}
122-
123110
/// Returns the sender for the [`EndpointStateActor`].
124111
///
125112
/// If needed a new actor is started on demand.

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use n0_watcher::{Watchable, Watcher};
1616
use quinn::{PathStats, WeakConnectionHandle};
1717
use quinn_proto::{PathError, PathEvent, PathId, PathStatus};
1818
use rustc_hash::FxHashMap;
19-
use serde::{Deserialize, Serialize};
2019
use smallvec::SmallVec;
2120
use tokio::sync::{mpsc, oneshot};
2221
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
@@ -1055,27 +1054,6 @@ struct HolepunchAttempt {
10551054
remote_addrs: BTreeSet<SocketAddr>,
10561055
}
10571056

1058-
/// The type of connection we have to the endpoint.
1059-
#[derive(derive_more::Display, Default, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
1060-
pub enum ConnectionType {
1061-
/// Direct UDP connection
1062-
#[display("direct({_0})")]
1063-
Direct(SocketAddr),
1064-
/// Relay connection over relay
1065-
#[display("relay({_0})")]
1066-
Relay(RelayUrl),
1067-
/// Both a UDP and a relay connection are used.
1068-
///
1069-
/// This is the case if we do have a UDP address, but are missing a recent confirmation that
1070-
/// the address works.
1071-
#[display("mixed(udp: {_0}, relay: {_1})")]
1072-
Mixed(SocketAddr, RelayUrl),
1073-
/// We have no verified connection to this PublicKey
1074-
#[default]
1075-
#[display("none")]
1076-
None,
1077-
}
1078-
10791057
/// Newtype to track Connections.
10801058
///
10811059
/// The wrapped value is the [`quinn::Connection::stable_id`] value, and is thus only valid
@@ -1167,7 +1145,7 @@ impl PathsWatchable {
11671145
pub(crate) fn watch(
11681146
&self,
11691147
conn_handle: WeakConnectionHandle,
1170-
) -> impl Watcher<Value = PathInfoList> {
1148+
) -> impl Watcher<Value = PathInfoList> + Unpin + Send + Sync + 'static {
11711149
let joined_watcher = (self.open_paths.watch(), self.selected_path.watch());
11721150
joined_watcher.map(move |(open_paths, selected_path)| {
11731151
let selected_path: Option<TransportAddr> = selected_path.map(Into::into);

0 commit comments

Comments
 (0)