Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_l
[workspace.lints.clippy]
unused-async = "warn"


[patch.crates-io]
netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "feat-multipath" }
portmapper = { git = "https://github.com/n0-computer/net-tools", branch = "feat-multipath" }
netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "Frando/on_closed" }
portmapper = { git = "https://github.com/n0-computer/net-tools", branch = "Frando/on_closed" }

[patch."https://github.com/n0-computer/quinn"]
# [patch."https://github.com/n0-computer/quinn"]
# iroh-quinn = { path = "../iroh-quinn/quinn" }
# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" }
# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" }
4 changes: 2 additions & 2 deletions iroh-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ postcard = { version = "1", default-features = false, features = [
"use-std",
"experimental-derive",
] }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh", default-features = false, features = ["rustls-ring"] }
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed", default-features = false, features = ["rustls-ring"] }
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
rand = "0.9.2"
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
Expand Down
8 changes: 4 additions & 4 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ n0-watcher = "0.5"
netwatch = { version = "0.12" }
pin-project = "1"
pkarr = { version = "5", default-features = false, features = ["relays"] }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh", default-features = false, features = ["rustls-ring"] }
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
quinn-udp = { package = "iroh-quinn-udp", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed", default-features = false, features = ["rustls-ring"] }
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
quinn-udp = { package = "iroh-quinn-udp", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
rand = "0.9.2"
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
Expand Down Expand Up @@ -90,7 +90,7 @@ hickory-resolver = "0.25.1"
igd-next = { version = "0.16", features = ["aio_tokio"] }
netdev = { version = "0.38.1" }
portmapper = { version = "0.12", default-features = false }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh", default-features = false, features = ["runtime-tokio", "rustls-ring"] }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed", default-features = false, features = ["runtime-tokio", "rustls-ring"] }
tokio = { version = "1", features = [
"io-util",
"macros",
Expand Down
2 changes: 1 addition & 1 deletion iroh/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ iroh = { path = ".." }
iroh-metrics = "0.37"
n0-future = "0.3.0"
n0-error = "0.1.0"
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
rand = "0.9.2"
rcgen = "0.14"
rustls = { version = "0.23.33", default-features = false, features = ["ring"] }
Expand Down
135 changes: 135 additions & 0 deletions iroh/examples/monitor-connections.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::{sync::Arc, time::Duration};

use iroh::{
Endpoint, RelayMode,
endpoint::{ConnectionInfo, ConnectionMonitor},
};
use n0_error::{Result, StackResultExt, StdResultExt, ensure_any};
use n0_future::task::AbortOnDropHandle;
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
task::JoinSet,
};
use tracing::{Instrument, info, info_span};

const ALPN: &[u8] = b"iroh/test";

#[tokio::main]
async fn main() -> Result {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.init();

let monitor = Monitor::new();
let server = Endpoint::empty_builder(RelayMode::Disabled)
.alpns(vec![ALPN.to_vec()])
.monitor_connections(monitor.clone())
.bind()
.instrument(info_span!("server"))
.await?;
let server_addr = server.addr();

let count = 2;

let client_task = tokio::spawn(
async move {
let client = Endpoint::empty_builder(RelayMode::Disabled)
.bind()
.instrument(info_span!("client"))
.await?;
for _i in 0..count {
let conn = client.connect(server_addr.clone(), ALPN).await?;
let mut s = conn.accept_uni().await.anyerr()?;
let data = s.read_to_end(2).await.anyerr()?;
ensure_any!(data == b"hi", "unexpected data");
conn.close(23u32.into(), b"bye");
}
client.close().await;
n0_error::Ok(client)
}
.instrument(info_span!("client")),
);

let server_task = tokio::spawn(
async move {
for _i in 0..count {
let conn = server
.accept()
.await
.context("server endpoint closed")?
.await?;
let mut s = conn.open_uni().await.anyerr()?;
s.write_all(b"hi").await.anyerr()?;
conn.closed().await;
}
info!("done");
server.close().await;
n0_error::Ok(())
}
.instrument(info_span!("server")),
);
client_task.await.std_context("client")?.context("client")?;
server_task.await.std_context("server")?.context("server")?;
tokio::time::sleep(Duration::from_secs(1)).await;
drop(monitor);
Ok(())
}

/// Our connection monitor impl.
///
/// This here only logs connection open and close events via tracing.
/// It could also maintain a datastructure of all connections, or send the stats to some metrics service.
#[derive(Clone)]
struct Monitor {
tx: UnboundedSender<ConnectionInfo>,
_task: Arc<AbortOnDropHandle<()>>,
}

impl ConnectionMonitor for Monitor {
fn on_connection(&self, connection: ConnectionInfo) {
self.tx.send(connection).ok();
}
}

impl Monitor {
fn new() -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let task = tokio::spawn(Self::run(rx).instrument(info_span!("watcher")));
Self {
tx,
_task: Arc::new(AbortOnDropHandle::new(task)),
}
}

async fn run(mut rx: UnboundedReceiver<ConnectionInfo>) {
let mut tasks = JoinSet::new();
loop {
tokio::select! {
Some(conn) = rx.recv() => {
let alpn = String::from_utf8_lossy(conn.alpn()).to_string();
let remote = conn.remote_id().fmt_short();
info!(%remote, %alpn, rtt=?conn.rtt(), "new connection");
tasks.spawn(async move {
match conn.closed().await {
Some((close_reason, stats)) => {
// We have access to the final stats of the connection!
info!(%remote, %alpn, ?close_reason, udp_rx=stats.udp_rx.bytes, udp_tx=stats.udp_tx.bytes, "connection closed");
}
None => {
// The connection was closed before we could register our stats-on-close listener.
info!(%remote, %alpn, "connection closed before tracking started");
}
}
}.instrument(tracing::Span::current()));
}
Some(res) = tasks.join_next(), if !tasks.is_empty() => res.expect("conn close task panicked"),
else => break,
}
while let Some(res) = tasks.join_next().await {
res.expect("conn close task panicked");
}
}
}
}
42 changes: 42 additions & 0 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub struct Builder {
#[cfg(any(test, feature = "test-utils"))]
path_selection: PathSelection,
max_tls_tickets: usize,
#[debug("{}", connection_monitor.as_ref().map(|_| "Some(Box<dyn ConnectionMonitor>)").unwrap_or("None"))]
connection_monitor: Option<Box<dyn ConnectionMonitor>>,
}

impl Builder {
Expand Down Expand Up @@ -158,6 +160,7 @@ impl Builder {
#[cfg(any(test, feature = "test-utils"))]
path_selection: PathSelection::default(),
max_tls_tickets: DEFAULT_MAX_TLS_TICKETS,
connection_monitor: None,
}
}

Expand Down Expand Up @@ -208,6 +211,7 @@ impl Builder {
// #[cfg(any(test, feature = "test-utils"))]
// path_selection: self.path_selection,
metrics,
connection_monitor: self.connection_monitor,
};

let msock = magicsock::MagicSock::spawn(msock_opts).await?;
Expand Down Expand Up @@ -432,6 +436,44 @@ impl Builder {
self.max_tls_tickets = n;
self
}

// TODO docs
/// Register a handler that is invoked for each connection the endpoint accepts or initiates.
///
/// The [`ConnectionMonitor::on_connection`] method is invoked synchronosuly, from within a tokio
/// context. So you can spawn tasks if needed.
/// Make sure that whatever you do with the connection info here is non-blocking.
/// Usually you'd want to send the info over a broadcast or unbounded channel,
/// or insert it into some persistent datastructure.
///
/// The `ConnectionInfo` internally contains a weak reference to the connection,
/// so keeping the struct alive does not keep the connection alive.
/// Note however that `ConnectionInfo` does keep an allocation per connection alive
/// so to not leak memory you should drop the `ConnectionInfo` eventually
///
/// [`ConnectionMonitor`] is implemented for `Fn(ConnectionInfo)`, so you can
/// also pass a closure that takes [`ConnectionInfo`] to this function.
pub fn monitor_connections(mut self, monitor: impl ConnectionMonitor) -> Self {
self.connection_monitor = Some(Box::new(monitor));
self
}
}

/// Monitor each connection accepted or initiated by the endpoint.
pub trait ConnectionMonitor: Send + Sync + 'static {
/// Called for each new connection the endpoint accepts or initiates.
///
/// This is only called when a connection is fully established.
fn on_connection(&self, connection: ConnectionInfo);
}

impl<T> ConnectionMonitor for T
where
T: Fn(ConnectionInfo) + Send + Sync + 'static,
{
fn on_connection(&self, connection: ConnectionInfo) {
(self)(connection)
}
}

/// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime.
Expand Down
8 changes: 8 additions & 0 deletions iroh/src/endpoint/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,14 @@ impl ConnectionInfo {
pub fn side(&self) -> Side {
self.side
}

/// Waits for the connection to be closed, and returns the close reason and final connection stats.
///
/// Returns `None` if the connection has been dropped already before this call.
pub async fn closed(&self) -> Option<(ConnectionError, ConnectionStats)> {
let fut = self.inner.upgrade()?.on_closed();
Some(fut.await)
}
}

#[cfg(test)]
Expand Down
Loading
Loading