Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
172 commits
Select commit Hold shift + click to select a range
c17a870
feat: use quinn multipath
dignifiedquire Jul 7, 2025
7fe570d
update iroh-quinn
dignifiedquire Jul 7, 2025
2f469ac
start opening paths
dignifiedquire Jul 8, 2025
870716f
add more paths
dignifiedquire Jul 8, 2025
346a7c2
set keep alive and idle timeouts for new paths
dignifiedquire Jul 8, 2025
68b1769
insert relay path
dignifiedquire Jul 9, 2025
0eb3fde
set relay path as backup
dignifiedquire Jul 11, 2025
79ec17f
start removing ping logic from the node_map
dignifiedquire Jul 11, 2025
c4baca8
start tracking path events
dignifiedquire Jul 11, 2025
549adee
start figuring out more details
dignifiedquire Jul 12, 2025
9ef5765
wip
dignifiedquire Jul 14, 2025
75d5525
get some stuff to work again
dignifiedquire Jul 18, 2025
4f78898
remove ip_mapped_addresses
dignifiedquire Jul 21, 2025
130710f
use correct relay addr on recv
dignifiedquire Jul 21, 2025
dba89df
ensure connection registration
dignifiedquire Jul 21, 2025
aab083d
remove rtt_actor, this is now done inside quinn on a per path basis
dignifiedquire Jul 21, 2025
d4484da
open additional paths after the initial connection
dignifiedquire Jul 21, 2025
6cb94e4
ensure path open
dignifiedquire Jul 22, 2025
998e283
some debugging
dignifiedquire Jul 23, 2025
99cee61
update quinn branch
dignifiedquire Jul 23, 2025
4b60a9a
fixups
dignifiedquire Jul 28, 2025
04b714c
update deps
dignifiedquire Aug 1, 2025
59efdcf
bunch of renames and doc updates, no functional changes
flub Aug 28, 2025
f9924cd
switch to main multipath branch
flub Aug 29, 2025
3058a8e
another rename
flub Aug 29, 2025
11dd04d
Set max_idle_time to a good value
flub Aug 29, 2025
6869faa
fix typo
flub Sep 1, 2025
539a514
Start hooking up a new NodeStateActor
flub Sep 16, 2025
a1a7d89
Rename AllPathsMappedAddr to NodeIdMappedAddr
flub Sep 16, 2025
4b00ad7
Move all mapped addrs to one module
flub Sep 16, 2025
31ab033
allow me to send via the NodeStateActor
flub Sep 16, 2025
5ae71ed
Unify NodeIdMappedAddr and RelayMappedAddr a bit more
flub Sep 16, 2025
599f25d
start implementing AddConnection
flub Sep 17, 2025
2417e6b
make add_node_addr async and send it to the NodeStateActor
flub Sep 18, 2025
9820e60
start handling AddNodeAddr message
flub Sep 18, 2025
ef7a6f3
start sending datagrams from the actor
flub Sep 18, 2025
76f3768
Start adding state for holepunching decisions
flub Sep 23, 2025
1366161
refactor to allow scheduling holepunching attempts
flub Sep 23, 2025
3436423
plug in DiscoState to the NodeStateActor
flub Sep 23, 2025
01bf15c
a sane method to send disco messages
flub Sep 23, 2025
001d16d
Implement starting of holepunching
flub Sep 24, 2025
7a9023f
handle receiving pings
flub Sep 24, 2025
92dd37c
handle receiving CallMeMaybe messages
flub Sep 24, 2025
ac2db4d
open a path when we receive a pong
flub Sep 25, 2025
6d69555
Move open path to not block the actor
flub Sep 29, 2025
12d12c0
select the right path
flub Sep 30, 2025
9e7be2d
close paths on all connections
flub Sep 30, 2025
c1bfdf5
send connections to the NodeStateActor
flub Sep 30, 2025
7b4f056
remove PingActions from magicsock Actor Message
flub Sep 30, 2025
d3b81e7
Kill a bunch of dead code
flub Sep 30, 2025
d9ed9be
use a better way to send to this channel
flub Sep 30, 2025
2c521b0
Hook up connecting check for a valid send addr
flub Oct 1, 2025
21894e7
Do not send incoming packets through the NodeMap
flub Oct 1, 2025
fc97009
delete a whole bunch of unused code
flub Oct 1, 2025
0b4214b
delete some more unused code
flub Oct 1, 2025
a0d5ba1
fine tune logging
flub Oct 2, 2025
65c5d30
try_send is removed. do not log poll_send span twice
flub Oct 2, 2025
e7d9268
Do not holepunch on sending the first message
flub Oct 2, 2025
924f41b
postpone solving this, make the test work
flub Oct 2, 2025
8b1fd0f
Add the path from a new connection, call select path
flub Oct 3, 2025
0522c0e
Do not close the last direct path on a connection
flub Oct 3, 2025
50cc92f
Open some more paths when needed
flub Oct 3, 2025
4a1375f
Add a first relay test
flub Oct 3, 2025
ee20734
Merge branch 'main' into feat-multipath
flub Oct 6, 2025
fdeb6f4
tidy up NodeMap creation
flub Oct 6, 2025
d4fc291
Simplify delayed discovery start
flub Oct 6, 2025
8ba3fd9
Remove try_send impls, no longer used
flub Oct 6, 2025
23ef6fe
remove dead code
flub Oct 6, 2025
7fbd5df
Merge branch 'main' into feat-multipath
flub Oct 6, 2025
3f58e95
Remove MagicStack and mesh_stacks
flub Oct 7, 2025
7ac4dec
dead code
flub Oct 7, 2025
f38e5c2
remove test and delete then unused function
flub Oct 7, 2025
1eeb4bb
start removing old nodestate
flub Oct 7, 2025
145cb18
delete more: path_validity mod is gone
flub Oct 7, 2025
ccb3287
remove more!
flub Oct 7, 2025
80157d4
and more gone
flub Oct 7, 2025
b0957cb
rename temporary name, now the name is free again
flub Oct 7, 2025
a3e58f8
slightly better logging
flub Oct 7, 2025
ce564f1
avoid unneeded mut when not testing
flub Oct 7, 2025
4eb0f07
Convert to canonical IP address in IpSender
flub Oct 7, 2025
28bf51c
remove duplicate adding
flub Oct 8, 2025
befde29
clearer bounds writing
flub Oct 8, 2025
2b2f3c2
Merge branch 'main' into feat-multipath
flub Oct 8, 2025
79cd50c
fix AddrMap impl to update both maps
flub Oct 8, 2025
c2a192e
Always use IPv6 addresses
flub Oct 8, 2025
04e1e3d
tweak logging, this is too noisy
flub Oct 8, 2025
660c39a
random lost import
flub Oct 8, 2025
9da6c1c
keep reducing redundant logging
flub Oct 8, 2025
85cedca
Make transports Addrs use the canonical form
flub Oct 9, 2025
7ee963f
insert PathId::ZERO in the path_id_map
flub Oct 9, 2025
9de1055
fix addr selection for holepunching
flub Oct 9, 2025
ae07d68
bunch of logging imporvements
flub Oct 9, 2025
0ce64c9
remove redundant logging
flub Oct 9, 2025
9a592ce
plug through a minimal PathInfo
flub Oct 9, 2025
eb92bb7
We don't pend CallMeMaybe anymore
flub Oct 9, 2025
6e31e4d
clippy, at last some code quality
flub Oct 9, 2025
008648c
care to enable multipath?
flub Oct 9, 2025
cdc0f90
refactor into more methods, just mechanical
flub Oct 10, 2025
b80de1a
Log transports::Addr a bit more compactly
flub Oct 14, 2025
4200059
some minimal docs
flub Oct 14, 2025
5988e94
rework the test a little to have easier spans
flub Oct 14, 2025
77dfc32
log the correct packet lengths
flub Oct 14, 2025
8b8321b
Set the path status for the initial path
flub Oct 14, 2025
50f98b8
next thing to work on
flub Oct 14, 2025
b4114fa
chore: update git deps
dignifiedquire Oct 14, 2025
583cf9e
Merge remote-tracking branch 'origin/main' into feat-multipath
dignifiedquire Oct 14, 2025
5071e56
Merge remote-tracking branch 'origin/main' into feat-multipath
dignifiedquire Oct 14, 2025
1976993
improve logging and reduce max concurrent paths to 16
flub Oct 15, 2025
e10ac11
Merge branch 'main' into feat-multipath
flub Oct 16, 2025
ce5bdd2
newtype the connection id
flub Oct 16, 2025
7839a4c
make sure that the transport addrs use canonical addresses
flub Oct 17, 2025
fcaea95
No longer need to patch rustls
flub Oct 18, 2025
7d86eff
Only select the path based on open paths
flub Oct 20, 2025
203204c
some fixes to path selection
flub Oct 20, 2025
de2074e
small cleanups, review comments
flub Oct 21, 2025
55d1c29
Merge branch 'main' into feat-multipath
flub Oct 21, 2025
bd680f1
Merge branch 'main' into feat-multipath
flub Oct 21, 2025
9fcdd71
Use TransportAddr to improve PathInfo exposed
flub Oct 21, 2025
de6eefc
compile on wasm again
flub Oct 22, 2025
c2b131f
add relay to a new connection that is direct (#3569)
flub Oct 25, 2025
5930eb5
Merge remote-tracking branch 'origin/main' into feat-multipath-mergemain
Frando Oct 29, 2025
a0edf73
target main-iroh branch
flub Oct 29, 2025
e5ad681
test improvements
flub Oct 31, 2025
e6625cd
Merge remote-tracking branch 'origin/main' into feat-multipath
dignifiedquire Nov 3, 2025
a5ca365
chore: fix typos
dignifiedquire Nov 3, 2025
a710f9d
cleanup docs and references
dignifiedquire Nov 3, 2025
a08af73
wasm fixes
dignifiedquire Nov 3, 2025
4ddc94f
Merge remote-tracking branch 'origin/main' into feat-multipath
dignifiedquire Nov 4, 2025
212e13a
Merge remote-tracking branch 'origin/main' into feat-multipath
Frando Nov 5, 2025
6bec028
Remove obsolete test
flub Nov 5, 2025
dc46e18
fix doc links and clippy
flub Nov 5, 2025
a59f62f
Remove EndpointStateMapInner
flub Nov 5, 2025
8f1cb97
fix: removal of path selection missed a cfg attribute
Frando Nov 6, 2025
0659765
Merge remote-tracking branch 'origin/main' into feat-multipath
Frando Nov 6, 2025
9921a35
bump quinn branch
flub Nov 6, 2025
1a5c4dd
chore: only patch quinn directly
dignifiedquire Nov 6, 2025
147e6bb
bench: add ipv6 option and metrics feature
dignifiedquire Nov 7, 2025
d1c1dab
refactor: improve path watching, add path stats (#3622)
Frando Nov 7, 2025
7887fb5
refactor: minor cleanups in endpoint state (#3626)
Frando Nov 10, 2025
1d5937c
refactor: use Connection::on_closed in endpoint state actor (#3627)
Frando Nov 10, 2025
e0f10ce
refactor(multipath): Make registering connections with the magicsock …
Frando Nov 10, 2025
9d795d3
Merge remote-tracking branch 'origin/main' into feat-multipath
dignifiedquire Nov 11, 2025
6380246
refactor: remove the TransportsSenderActor
dignifiedquire Nov 11, 2025
d0707e8
chore: fixup deny
dignifiedquire Nov 11, 2025
50fdda3
refactor: disallow certain Source variants to be constructed externally
dignifiedquire Nov 11, 2025
2f924d9
refactor: remove Endpoint::conn_type (#3647)
Frando Nov 11, 2025
492b74e
refactor: use boxed watcher, not watchable, on connection (#3632)
Frando Nov 11, 2025
25fe805
refactor(multipath): Stop inactive endpoint actors (#3643)
Frando Nov 12, 2025
e7bf47d
Bump quinn
flub Nov 12, 2025
4b6824c
fix(iroh): Clear `EndpointStateActor::selected_path` once the last co…
matheus23 Nov 12, 2025
8d819f0
perf: various improvements
dignifiedquire Nov 13, 2025
3745e7e
bump quinn
flub Nov 14, 2025
dab9d5f
fix(iroh)!: Correct the error structure (#3663)
flub Nov 14, 2025
c24c5d4
Cleanup some broken tests
flub Nov 14, 2025
103e3c5
fix
flub Nov 14, 2025
13fe787
fix(tests): Also run the tests in isolation in the default profile (#…
flub Nov 14, 2025
1f2db9f
fix test by calling stream.finish() (#3665)
flub Nov 14, 2025
7052392
Merge remote-tracking branch 'origin/main' into feat-multipath
Frando Nov 17, 2025
6ef582d
deps(multipath): bump netdev (#3667)
Frando Nov 17, 2025
7f17d98
feat: relay only configuration
dignifiedquire Nov 17, 2025
1a7a88b
refactor: remove Endpoint::path_selection (#3668)
Frando Nov 17, 2025
34f52c6
refactor(multipath): rename EndpointMap/EndpointState to RemoteMap/Re…
Frando Nov 18, 2025
d538b11
multipath: merge main (#3674)
Frando Nov 18, 2025
59a7e85
test(iroh): Fix `test_two_devices_setup_teardown` hanging (#3675)
matheus23 Nov 18, 2025
01545ee
refactor(multipath): move discovery into `EndpointStateActor` (#3645)
Frando Nov 18, 2025
d328bf2
fix(multipath): fix remote state actor termination (#3676)
Frando Nov 20, 2025
160d535
test(iroh): Fix `test_active_relay_inactive` test being flaky (#3680)
matheus23 Nov 20, 2025
c85bbbd
Merge remote-tracking branch 'origin/main' into feat-multipath-merge
dignifiedquire Nov 23, 2025
e620b5e
test: mark test_active_relay_inactive as non flaky
dignifiedquire Nov 23, 2025
d243702
Merge pull request #3696 from n0-computer/feat-multipath-merge
dignifiedquire Nov 23, 2025
94c150a
Switch to QUIC-NAT-Traversal instead of DISCO (#3595)
flub Nov 23, 2025
1efd2b5
feat(iroh): introduce EndpointHooks (#3688)
Frando Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor(multipath): Stop inactive endpoint actors (#3643)
## Description

Fixes #3638 (partially)

This is a first, small solution to stop inactive endpoint actors after
an idle timeout. I implemented it such that the *actor* decides once to
stop, while making sure that we *never* create senders to actors that
are shutting down.

Logic in the actor:
* The actor enters an idle timeout (set to 60 seconds) once it has no
active connections, an empty inbox, and no inbox senders
* Once the timeout expires, it is rechecked that the idle conditions
hold, and if so the actor exits
* Once any of the idle conditions don't hold anymore, the idle timeout
is deactivated and restarted once the conditions are met again

The actor checks if the inbox's sender strong count equals 1, which
means that no senders exist apart from the one held in the endpoint map.
This check is protected with a mutex, to enter a critical section for
closing the inbox while the lock is held in case the conditions are met.
This is to ensure that there cannot be a race condition where a sender
is cloned out right after the check in the actor returns true, but
before the inbox is closed.

Logic in the endpoint map:
* When handing out senders, we acquire the shared lock, and check that
the channel is not closed while the lock is held. This ensures that the
actor never closes while a sender is alive. If the actor is closed, we
remove the handle to the dead actor and create a new actor.
* On regular intervals (set to 60 seconds) the magicsock actor removes
handles to dead actors.


## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

* I *think* my logic around the critical section and ensuring that we
never close the actor while senders exist is sound. However, it needs
careful review and tests. I'll do some thinking on how to best test
this.
* Instead of employing an interval to remove dead actor handles, we
could use a channel where the actor informs an outside-task which
endpoint actors terminated, so that the outside-task can then lock the
endpoint map and remove just those. Not sure if that's worth it.
* Another solution here might be to spawn the actor tasks into a join
set in the magicsock actor. However this would need further refactoring
and would likely make spawning actors async. I think I'd prefer to keep
that sync because it makes the surrounding code a lot simpler.
* This does not yet implement some of the more advanced reasoning that
#3638 proposes. I think we should start with something simple that
prevents memory exhaustion and tweak as needed. However, it could also
be argued that we should start with a more featureful design right away.

## Change checklist
<!-- Remove any that are not relevant. -->
- [ ] Self-review.
- [ ] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [ ] Tests if relevant.
- [ ] All breaking changes documented.
- [ ] List all breaking changes in the above "Breaking Changes" section.
- [ ] Open an issue or PR on any number0 repos that are affected by this
breaking change. Give guidance on how the updates should be handled or
do the actual updates themselves. The major ones are:
    - [ ] [`quic-rpc`](https://github.com/n0-computer/quic-rpc)
    - [ ] [`iroh-gossip`](https://github.com/n0-computer/iroh-gossip)
    - [ ] [`iroh-blobs`](https://github.com/n0-computer/iroh-blobs)
    - [ ] [`dumbpipe`](https://github.com/n0-computer/dumbpipe)
    - [ ] [`sendme`](https://github.com/n0-computer/sendme)
  • Loading branch information
Frando authored Nov 12, 2025
commit 25fe805979806b8102ab5b00e98ce96e6e0ab1c7
6 changes: 6 additions & 0 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,9 @@ impl Actor {
// ensure we are doing an initial publish of our addresses
self.msock.publish_my_addr();

// Interval timer to remove closed `EndpointStateActor` handles from the endpoint map.
let mut endpoint_map_gc = time::interval(endpoint_map::ENDPOINT_MAP_GC_INTERVAL);

loop {
self.msock.metrics.magicsock.actor_tick_main.inc();
#[cfg(not(wasm_browser))]
Expand Down Expand Up @@ -1499,6 +1502,9 @@ impl Actor {
warn!(%dst, endpoint = %dst_key.fmt_short(), ?err, "failed to send disco message (UDP)");
}
}
_ = endpoint_map_gc.tick() => {
self.msock.endpoint_map.remove_closed_endpoint_state_actors();
}
}
}
}
Expand Down
74 changes: 51 additions & 23 deletions iroh/src/magicsock/endpoint_map.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{
collections::BTreeSet,
collections::{BTreeSet, hash_map},
hash::Hash,
net::{IpAddr, SocketAddr},
sync::{Arc, Mutex},
time::Duration,
};

use iroh_base::{EndpointAddr, EndpointId, RelayUrl};
Expand All @@ -27,6 +28,9 @@ use crate::disco;
mod endpoint_state;
mod path_state;

/// Interval in which handles to closed [`EndpointStateActor`]s should be removed.
pub(super) const ENDPOINT_MAP_GC_INTERVAL: Duration = Duration::from_secs(60);

// TODO: use this
// /// Number of endpoints that are inactive for which we keep info about. This limit is enforced
// /// periodically via [`NodeMap::prune_inactive`].
Expand Down Expand Up @@ -106,6 +110,15 @@ impl EndpointMap {
self.endpoint_mapped_addrs.get(&eid)
}

/// Removes the handles for terminated [`EndpointStateActor`]s from the endpoint map.
///
/// This should be called periodically to remove handles to endpoint state actors
/// that have shutdown after their idle timeout expired.
pub(super) fn remove_closed_endpoint_state_actors(&self) {
let mut handles = self.actor_handles.lock().expect("poisoned");
handles.retain(|_eid, handle| !handle.sender.is_closed())
}

/// Returns the sender for the [`EndpointStateActor`].
///
/// If needed a new actor is started on demand.
Expand All @@ -116,33 +129,48 @@ impl EndpointMap {
eid: EndpointId,
) -> mpsc::Sender<EndpointStateMessage> {
let mut handles = self.actor_handles.lock().expect("poisoned");
match handles.get(&eid) {
Some(handle) => handle.sender.clone(),
None => {
// Create a new EndpointStateActor and insert it into the endpoint map.
let local_addrs = self.local_addrs.clone();
let disco = self.disco.clone();
let metrics = self.metrics.clone();
let actor = EndpointStateActor::new(
eid,
self.local_endpoint_id,
local_addrs,
disco,
self.relay_mapped_addrs.clone(),
metrics,
self.sender.clone(),
);
let handle = actor.start();
let sender = handle.sender.clone();
handles.insert(eid, handle);

// Ensure there is a EndpointMappedAddr for this EndpointId.
self.endpoint_mapped_addrs.get(&eid);
match handles.entry(eid) {
hash_map::Entry::Occupied(mut entry) => {
if let Some(sender) = entry.get().sender.get() {
sender
} else {
// The actor is dead: Start a new actor.
let (handle, sender) = self.start_endpoint_state_actor(eid);
entry.insert(handle);
sender
}
}
hash_map::Entry::Vacant(entry) => {
let (handle, sender) = self.start_endpoint_state_actor(eid);
entry.insert(handle);
sender
}
}
}

/// Starts a new endpoint state actor and returns a handle and a sender.
///
/// The handle is not inserted into the endpoint map, this must be done by the caller of this function.
fn start_endpoint_state_actor(
&self,
eid: EndpointId,
) -> (EndpointStateHandle, mpsc::Sender<EndpointStateMessage>) {
// Ensure there is a EndpointMappedAddr for this EndpointId.
self.endpoint_mapped_addrs.get(&eid);
let handle = EndpointStateActor::new(
eid,
self.local_endpoint_id,
self.local_addrs.clone(),
self.disco.clone(),
self.relay_mapped_addrs.clone(),
self.metrics.clone(),
self.sender.clone(),
)
.start();
let sender = handle.sender.get().expect("just created");
(handle, sender)
}

pub(super) fn handle_ping(&self, msg: disco::Ping, sender: EndpointId, src: transports::Addr) {
if msg.endpoint_key != sender {
warn!("DISCO Ping EndpointId mismatch, ignoring ping");
Expand Down
51 changes: 43 additions & 8 deletions iroh/src/magicsock/endpoint_map/endpoint_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ use quinn::{PathStats, WeakConnectionHandle};
use quinn_proto::{PathError, PathEvent, PathId, PathStatus};
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn};

use self::guarded_channel::{GuardedReceiver, GuardedSender, guarded_channel};
use super::{Source, path_state::PathState};
// TODO: Use this
// #[cfg(any(test, feature = "test-utils"))]
// use crate::endpoint::PathSelection;
use crate::{
disco::{self},
endpoint::DirectAddr,
Expand All @@ -37,6 +35,12 @@ use crate::{
util::MaybeFuture,
};

// TODO: Use this
// #[cfg(any(test, feature = "test-utils"))]
// use crate::endpoint::PathSelection;

mod guarded_channel;

// TODO: use this
// /// Number of addresses that are not active that we keep around per endpoint.
// ///
Expand Down Expand Up @@ -67,6 +71,14 @@ use crate::{
// TODO: Quinn should just do this. Also, I made this value up.
const APPLICATION_ABANDON_PATH: u8 = 30;

/// The time after which an idle [`EndpointStateActor`] stops.
///
/// The actor only enters the idle state if no connections are active and no inbox senders exist
/// apart from the one stored in the endpoint map. Stopping and restarting the actor in this state
/// is not an issue; a timeout here serves the purpose of not stopping-and-recreating actors
/// in a high frequency, and to keep data about previous path around for subsequent connections.
const ACTOR_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60);

/// A stream of events from all paths for all connections.
///
/// The connection is identified using [`ConnId`]. The event `Err` variant happens when the
Expand Down Expand Up @@ -172,7 +184,7 @@ impl EndpointStateActor {
}

pub(super) fn start(mut self) -> EndpointStateHandle {
let (tx, rx) = mpsc::channel(16);
let (tx, rx) = guarded_channel(16);
let me = self.local_endpoint_id;
let endpoint_id = self.endpoint_id;

Expand Down Expand Up @@ -207,9 +219,11 @@ impl EndpointStateActor {
/// discipline is needed to not turn pending for a long time.
async fn run(
&mut self,
mut inbox: mpsc::Receiver<EndpointStateMessage>,
mut inbox: GuardedReceiver<EndpointStateMessage>,
) -> n0_error::Result<()> {
trace!("actor started");
let idle_timeout = MaybeFuture::None;
tokio::pin!(idle_timeout);
loop {
let scheduled_path_open = match self.scheduled_open_path {
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
Expand Down Expand Up @@ -252,6 +266,22 @@ impl EndpointStateActor {
self.scheduled_holepunch = None;
self.trigger_holepunching().await;
}
_ = &mut idle_timeout => {
if self.connections.is_empty() && inbox.close_if_idle() {
trace!("idle timeout expired and still idle: terminate actor");
break;
}
}
}

if self.connections.is_empty() && inbox.is_idle() && idle_timeout.is_none() {
trace!("start idle timeout");
idle_timeout
.as_mut()
.set_future(time::sleep(ACTOR_MAX_IDLE_TIMEOUT));
} else if idle_timeout.is_some() {
trace!("abort idle timeout");
idle_timeout.as_mut().set_none()
}
}
trace!("actor terminating");
Expand Down Expand Up @@ -1028,11 +1058,16 @@ pub(crate) enum EndpointStateMessage {

/// A handle to a [`EndpointStateActor`].
///
/// Dropping this will stop the actor.
/// Dropping this will stop the actor. The actor will also stop after an idle timeout
/// if it has no connections, an empty inbox, and no other senders than the one stored
/// in the endpoint map exist.
#[derive(Debug)]
pub(super) struct EndpointStateHandle {
/// Sender for the channel into the [`EndpointStateActor`].
pub(super) sender: mpsc::Sender<EndpointStateMessage>,
///
/// This is a [`GuardedSender`], from which we can get a sender but only if the receiver
/// hasn't been closed.
pub(super) sender: GuardedSender<EndpointStateMessage>,
_task: AbortOnDropHandle<()>,
}

Expand Down
78 changes: 78 additions & 0 deletions iroh/src/magicsock/endpoint_map/endpoint_state/guarded_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::sync::{Arc, Mutex};

use tokio::sync::mpsc;

/// Creates a new [`mpsc`] channel where the receiver can only close if there are no active senders.
pub(super) fn guarded_channel<T>(cap: usize) -> (GuardedSender<T>, GuardedReceiver<T>) {
let (tx, rx) = mpsc::channel(cap);
let tx = Arc::new(Mutex::new(Some(tx)));
(GuardedSender { tx: tx.clone() }, GuardedReceiver { tx, rx })
}

#[derive(Debug)]
pub(crate) struct GuardedSender<T> {
tx: Arc<Mutex<Option<mpsc::Sender<T>>>>,
}

impl<T> GuardedSender<T> {
/// Returns a sender to the channel.
///
/// Returns a new sender if the channel is not closed. It is guaranteed that
/// [`GuardedReceiver::close_if_idle`] will not return `true` until the sender is dropped.
/// Returns `None` if the channel has been closed.
pub(crate) fn get(&self) -> Option<mpsc::Sender<T>> {
self.tx.lock().expect("poisoned").clone()
}

/// Returns `true` if the channel has been closed.
pub(crate) fn is_closed(&self) -> bool {
self.tx.lock().expect("poisoned").is_none()
}
}

#[derive(Debug)]
pub(super) struct GuardedReceiver<T> {
rx: mpsc::Receiver<T>,
tx: Arc<Mutex<Option<mpsc::Sender<T>>>>,
}

impl<T> GuardedReceiver<T> {
/// Receives the next value for this receiver.
///
/// See [`mpsc::Receiver::recv`].
pub(super) async fn recv(&mut self) -> Option<T> {
self.rx.recv().await
}

/// Returns `true` if the inbox is empty and no senders to the inbox exist.
pub(super) fn is_idle(&self) -> bool {
self.rx.is_empty() && self.rx.sender_strong_count() <= 1
}

/// Closes the channel if the channel is idle.
///
/// Returns `true` if the channel is idle and has now been closed, and `false` if the channel
/// is not idle and therefore has not been not closed.
///
/// Uses a lock internally to make sure that there cannot be a race condition between
/// calling this and a new sender being created.
pub(super) fn close_if_idle(&mut self) -> bool {
let mut guard = self.tx.lock().expect("poisoned");
if self.is_idle() {
*guard = None;
self.rx.close();
true
} else {
false
}
}
}

impl<T> Drop for GuardedReceiver<T> {
fn drop(&mut self) {
let mut guard = self.tx.lock().expect("poisoned");
*guard = None;
self.rx.close();
drop(guard)
}
}
Loading