Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Send back leftover messages
  • Loading branch information
matheus23 committed Nov 20, 2025
commit 4c4d11ddeaa170e266f32150e8c90c9cf9c4316c
32 changes: 27 additions & 5 deletions iroh/src/magicsock/remote_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub(crate) struct RemoteMap {
disco: DiscoState,
sender: TransportsSender,
discovery: ConcurrentDiscovery,
actor_tasks: Mutex<JoinSet<Vec<RemoteStateMessage>>>,
actor_tasks: Mutex<JoinSet<(EndpointId, Vec<RemoteStateMessage>)>>,
}

impl RemoteMap {
Expand Down Expand Up @@ -105,7 +105,22 @@ impl RemoteMap {
senders.retain(|_eid, sender| !sender.is_closed());
while let Some(result) = self.actor_tasks.lock().expect("poisoned").try_join_next() {
match result {
Ok(leftover_msgs) => debug!(?leftover_msgs, "TODO: handle leftover messages"),
Ok((eid, leftover_msgs)) => {
let entry = senders.entry(eid);
if leftover_msgs.is_empty() {
match entry {
hash_map::Entry::Occupied(occupied_entry) => occupied_entry.remove(),
hash_map::Entry::Vacant(_) => {
panic!("this should be impossible TODO(matheus23)");
}
};
} else {
// The remote actor got messages while it was closing, so we're restarting
debug!(%eid, "restarting terminated remote state actor: messages received during shutdown");
let sender = self.start_remote_state_actor(eid, leftover_msgs);
entry.insert_entry(sender);
}
}
Err(err) => {
if let Ok(panic) = err.try_into_panic() {
error!("RemoteStateActor panicked.");
Expand All @@ -126,7 +141,7 @@ impl RemoteMap {
match handles.entry(eid) {
hash_map::Entry::Occupied(entry) => entry.get().clone(),
hash_map::Entry::Vacant(entry) => {
let sender = self.start_remote_state_actor(eid);
let sender = self.start_remote_state_actor(eid, vec![]);
entry.insert(sender.clone());
sender
}
Expand All @@ -136,7 +151,11 @@ impl RemoteMap {
/// Starts a new remote state actor and returns a handle and a sender.
///
/// The handle is not inserted into the endpoint map, this must be done by the caller of this function.
fn start_remote_state_actor(&self, eid: EndpointId) -> mpsc::Sender<RemoteStateMessage> {
fn start_remote_state_actor(
&self,
eid: EndpointId,
initial_msgs: Vec<RemoteStateMessage>,
) -> mpsc::Sender<RemoteStateMessage> {
// Ensure there is a RemoteMappedAddr for this EndpointId.
self.endpoint_mapped_addrs.get(&eid);
RemoteStateActor::new(
Expand All @@ -149,7 +168,10 @@ impl RemoteMap {
self.sender.clone(),
self.discovery.clone(),
)
.start(self.actor_tasks.lock().expect("poisoned").deref_mut())
.start(
initial_msgs,
self.actor_tasks.lock().expect("poisoned").deref_mut(),
)
}

pub(super) fn handle_ping(&self, msg: disco::Ping, sender: EndpointId, src: transports::Addr) {
Expand Down
13 changes: 9 additions & 4 deletions iroh/src/magicsock/remote_map/remote_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl RemoteStateActor {

pub(super) fn start(
self,
tasks: &mut JoinSet<Vec<RemoteStateMessage>>,
initial_msgs: Vec<RemoteStateMessage>,
tasks: &mut JoinSet<(EndpointId, Vec<RemoteStateMessage>)>,
) -> mpsc::Sender<RemoteStateMessage> {
let (tx, rx) = mpsc::channel(16);
let me = self.local_endpoint_id;
Expand All @@ -221,7 +222,7 @@ impl RemoteStateActor {
// we don't explicitly set a span we get the spans from whatever call happens to
// first create the actor, which is often very confusing as it then keeps those
// spans for all logging of the actor.
tasks.spawn(self.run(rx).instrument(info_span!(
tasks.spawn(self.run(initial_msgs, rx).instrument(info_span!(
parent: None,
"RemoteStateActor",
me = %me.fmt_short(),
Expand All @@ -237,9 +238,13 @@ impl RemoteStateActor {
/// discipline is needed to not turn pending for a long time.
async fn run(
mut self,
initial_msgs: Vec<RemoteStateMessage>,
mut inbox: mpsc::Receiver<RemoteStateMessage>,
) -> Vec<RemoteStateMessage> {
) -> (EndpointId, Vec<RemoteStateMessage>) {
trace!("actor started");
for msg in initial_msgs {
self.handle_message(msg).await;
}
let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT);
n0_future::pin!(idle_timeout);
let leftover_msgs = loop {
Expand Down Expand Up @@ -318,7 +323,7 @@ impl RemoteStateActor {
};

trace!("actor terminating");
leftover_msgs
(self.endpoint_id, leftover_msgs)
}

/// Handles an actor message.
Expand Down
Loading