Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
refactor(imap): move resync request from Context to Imap
For multiple transports we will need to run
multiple IMAP clients in parallel.
UID validity change detected by one IMAP client
should not result in UID resync
for another IMAP client.
  • Loading branch information
link2xt committed Nov 6, 2025
commit ee2582b3024f1c191b60d4c1a5692f591bd8daf9
8 changes: 0 additions & 8 deletions src/configure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,14 +565,6 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Option<&'

progress!(ctx, 910);

if let Some(configured_addr) = ctx.get_config(Config::ConfiguredAddr).await? {
if configured_addr != param.addr {
// Switched account, all server UIDs we know are invalid
info!(ctx, "Scheduling resync because the address has changed.");
ctx.schedule_resync().await?;
}
}

let provider = configured_param.provider;
configured_param
.save_to_transports_table(ctx, param)
Expand Down
12 changes: 1 addition & 11 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::{BTreeMap, HashMap};
use std::ffi::OsString;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, OnceLock};
use std::time::Duration;

Expand Down Expand Up @@ -243,9 +243,6 @@ pub struct InnerContext {
/// Set to `None` if quota was never tried to load.
pub(crate) quota: RwLock<Option<QuotaInfo>>,

/// IMAP UID resync request.
pub(crate) resync_request: AtomicBool,

/// Notify about new messages.
///
/// This causes [`Context::wait_next_msgs`] to wake up.
Expand Down Expand Up @@ -457,7 +454,6 @@ impl Context {
scheduler: SchedulerState::new(),
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow at least 1 message every 10 seconds + a burst of 6.
quota: RwLock::new(None),
resync_request: AtomicBool::new(false),
new_msgs_notify,
server_id: RwLock::new(None),
metadata: RwLock::new(None),
Expand Down Expand Up @@ -616,12 +612,6 @@ impl Context {
Ok(())
}

pub(crate) async fn schedule_resync(&self) -> Result<()> {
self.resync_request.store(true, Ordering::Relaxed);
self.scheduler.interrupt_inbox().await;
Ok(())
}

/// Returns a reference to the underlying SQL instance.
///
/// Warning: this is only here for testing, not part of the public API.
Expand Down
14 changes: 12 additions & 2 deletions src/imap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ pub(crate) struct Imap {
/// immediately after logging in or returning an error in response to LOGIN command
/// due to internal server error.
ratelimit: Ratelimit,

/// IMAP UID resync request sender.
pub(crate) resync_request_sender: async_channel::Sender<()>,

/// IMAP UID resync request receiver.
pub(crate) resync_request_receiver: async_channel::Receiver<()>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -254,6 +260,7 @@ impl Imap {
oauth2: bool,
idle_interrupt_receiver: Receiver<()>,
) -> Self {
let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
Imap {
idle_interrupt_receiver,
addr: addr.to_string(),
Expand All @@ -268,6 +275,8 @@ impl Imap {
conn_backoff_ms: 0,
// 1 connection per minute + a burst of 2.
ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
resync_request_sender,
resync_request_receiver,
}
}

Expand Down Expand Up @@ -392,6 +401,7 @@ impl Imap {
match login_res {
Ok(mut session) => {
let capabilities = determine_capabilities(&mut session).await?;
let resync_request_sender = self.resync_request_sender.clone();

let session = if capabilities.can_compress {
info!(context, "Enabling IMAP compression.");
Expand All @@ -402,9 +412,9 @@ impl Imap {
})
.await
.context("Failed to enable IMAP compression")?;
Session::new(compressed_session, capabilities)
Session::new(compressed_session, capabilities, resync_request_sender)
} else {
Session::new(session, capabilities)
Session::new(session, capabilities, resync_request_sender)
};

// Store server ID in the context to display in account info.
Expand Down
4 changes: 2 additions & 2 deletions src/imap/select_folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl ImapSession {
"The server illegally decreased the uid_next of folder {folder:?} from {old_uid_next} to {new_uid_next} without changing validity ({new_uid_validity}), resyncing UIDs...",
);
set_uid_next(context, folder, new_uid_next).await?;
context.schedule_resync().await?;
self.resync_request_sender.try_send(()).ok();
}

// If UIDNEXT changed, there are new emails.
Expand Down Expand Up @@ -243,7 +243,7 @@ impl ImapSession {
.await?;

if old_uid_validity != 0 || old_uid_next != 0 {
context.schedule_resync().await?;
self.resync_request_sender.try_send(()).ok();
}
info!(
context,
Expand Down
4 changes: 4 additions & 0 deletions src/imap/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub(crate) struct Session {
///
/// Should be false if no folder is currently selected.
pub new_mail: bool,

pub resync_request_sender: async_channel::Sender<()>,
}

impl Deref for Session {
Expand All @@ -68,6 +70,7 @@ impl Session {
pub(crate) fn new(
inner: ImapSession<Box<dyn SessionStream>>,
capabilities: Capabilities,
resync_request_sender: async_channel::Sender<()>,
) -> Self {
Self {
inner,
Expand All @@ -77,6 +80,7 @@ impl Session {
selected_folder_needs_expunge: false,
last_full_folder_scan: Mutex::new(None),
new_mail: false,
resync_request_sender,
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::cmp;
use std::iter::{self, once};
use std::num::NonZeroUsize;
use std::sync::atomic::Ordering;

use anyhow::{Context as _, Error, Result, bail};
use async_channel::{self as channel, Receiver, Sender};
Expand Down Expand Up @@ -481,11 +480,10 @@ async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session)
}
}

let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed);
if resync_requested {
if let Ok(()) = imap.resync_request_receiver.try_recv() {
if let Err(err) = session.resync_folders(ctx).await {
warn!(ctx, "Failed to resync folders: {:#}.", err);
ctx.resync_request.store(true, Ordering::Relaxed);
imap.resync_request_sender.try_send(()).ok();
}
}

Expand Down