From 4c5da99e0069af2b8c00d628b3d3defcd515fce6 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Tue, 19 Aug 2025 16:20:30 -0400 Subject: [PATCH] dekaf: Improve timeout handling when refreshing tokens There was an event early this morning that ended up causing a lot of connection drops. After investigation, I believe it has to do with waiting indefinitely on requests to `/authorize/task` and `/authorize/dekaf`. I was not able to find any logs in agent-api indicating long requests that eventually succeeded, but other evidence includes: * Dekaf logs complaining about expired tokens: `verifying Authorization: token has invalid claims: token is expired` * Dekaf logs from `Read::next_batch()` saying `second time provided was later than self` * This is coming from `SystemTime::duration_since`. The only way I could imagine this happening is if `new_stream`, which fetches the latest `TaskState` from its `TaskStateListener` was to get an expired token. The assumption is that `TaskStateListener::get()` will either return a valid non-expired token, or an error. If the TaskManager's loop were to get stuck waiting on one of the network requests it makes, this assumption would be broken and we would see the behavior that we saw. So, to fix the problem: * I updated `TaskManager` to proactively refresh its tokens earlier, added timeouts to the network calls, and made it resilient in the face of those timeouts: while the token is still valid, keep retrying and returning the cached token if refresh requests time out. * I updated `Read` to be a bit smarter about calculating its timeout --- crates/dekaf/src/main.rs | 7 +- crates/dekaf/src/read.rs | 3 + crates/dekaf/src/task_manager.rs | 171 +++++++++++++++++++++++++++---- crates/dekaf/src/topology.rs | 1 + 4 files changed, 161 insertions(+), 21 deletions(-) diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index ed578219e81..b807dec7a7d 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -87,10 +87,14 @@ pub struct Cli { #[arg(long, env = "IDLE_SESSION_TIMEOUT", value_parser = humantime::parse_duration, default_value = "30s")] idle_session_timeout: std::time::Duration, - /// How long to cache materialization specs and other task metadata for before re-refreshing + /// How long to cache materialization specs and other task metadata for before refreshing #[arg(long, env = "TASK_REFRESH_INTERVAL", value_parser = humantime::parse_duration, default_value = "30s")] task_refresh_interval: std::time::Duration, + /// How long before a request for materialization specs and other task metadata times out + #[arg(long, env = "TASK_REQUEST_TIMEOUT", value_parser = humantime::parse_duration, default_value = "30s")] + task_request_timeout: std::time::Duration, + /// Timeout for TLS handshake completion #[arg(long, env = "TLS_HANDSHAKE_TIMEOUT", value_parser = humantime::parse_duration, default_value = "10s")] tls_handshake_timeout: std::time::Duration, @@ -223,6 +227,7 @@ async fn async_main(cli: Cli) -> anyhow::Result<()> { let task_manager = Arc::new(TaskManager::new( cli.task_refresh_interval, + cli.task_request_timeout, client_base.clone(), cli.data_plane_fqdn.clone(), signing_token.clone(), diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 5897869d127..7f4bbc14d25 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -232,6 +232,9 @@ impl Read { if timeout_at > self.stream_exp { timeout_at = self.stream_exp; } + if timeout_at < now { + anyhow::bail!("Encountered a read stream with token expiring in the past. This should not happen, cancelling the read."); + } tokio::time::Instant::now() + timeout_at.duration_since(now)? }; diff --git a/crates/dekaf/src/task_manager.rs b/crates/dekaf/src/task_manager.rs index 3a0c1a9bcb0..be755492168 100644 --- a/crates/dekaf/src/task_manager.rs +++ b/crates/dekaf/src/task_manager.rs @@ -45,7 +45,10 @@ impl fmt::Display for SharedError { pub type Result = core::result::Result; +/// How long to keep a TaskManager alive without any listeners. const TASK_TIMEOUT: Duration = Duration::from_secs(60 * 3); +/// How long before the end of an access token should we start trying to refresh it +const REFRESH_START_AT: Duration = Duration::from_secs(60 * 5); #[derive(Clone)] pub enum TaskState { @@ -85,11 +88,24 @@ impl TaskStateListener { // Scope to force the borrow to end before awaiting { let current_value = temp_rx.borrow_and_update(); - if let Some(ref result) = *current_value { - return result - .as_ref() - .map(|arc| Arc::clone(arc)) - .map_err(|e| anyhow::Error::from(e.clone())); + match &*current_value { + Some(Ok(state)) => match state.as_ref() { + TaskState::Authorized { + access_token_claims, + .. + } if access_token_claims.exp + <= time::OffsetDateTime::now_utc().unix_timestamp() as u64 => + { + anyhow::bail!("Access token has expired and the task manager has been unable to refresh it."); + } + _ => return Ok(state.clone()), + }, + Some(res) => { + return res.clone().map_err(anyhow::Error::from); + } + None => { + tracing::debug!("No task state available yet, waiting for the next update"); + } } } @@ -125,6 +141,7 @@ pub struct TaskManager { >, >, interval: Duration, + timeout: Duration, client: flow_client::Client, data_plane_fqdn: String, data_plane_signer: jsonwebtoken::EncodingKey, @@ -132,6 +149,7 @@ pub struct TaskManager { impl TaskManager { pub fn new( interval: Duration, + timeout: Duration, client: flow_client::Client, data_plane_fqdn: String, data_plane_signer: jsonwebtoken::EncodingKey, @@ -139,6 +157,7 @@ impl TaskManager { TaskManager { tasks: std::sync::Mutex::new(HashMap::new()), interval, + timeout, client, data_plane_fqdn, data_plane_signer: data_plane_signer, @@ -228,6 +247,8 @@ impl TaskManager { let mut cached_ops_stats_client: Option> = None; + let mut cached_dekaf_auth: Option = None; + let mut timeout_start = None; loop { @@ -258,21 +279,23 @@ impl TaskManager { let mut has_been_migrated = false; let loop_result: Result<()> = async { - // For the moment, let's just refresh this every tick in order to have relatively - // fresh MaterializationSpecs, even if the access token may live for a while. - let dekaf_auth = fetch_dekaf_task_auth( + let dekaf_auth = get_or_refresh_dekaf_auth( + cached_dekaf_auth.take(), &self.client, &task_name, &self.data_plane_fqdn, &self.data_plane_signer, + self.timeout, ) .await - .context("error fetching dekaf task auth")?; + .context("error fetching or refreshing dekaf task auth")?; + cached_dekaf_auth = Some(dekaf_auth.clone()); match dekaf_auth { DekafTaskAuth::Redirect { target_dataplane_fqdn, spec, + .. } => { if !has_been_migrated { has_been_migrated = true; @@ -306,6 +329,7 @@ impl TaskManager { &task_name, &spec, std::mem::take(&mut partitions_and_clients), + self.timeout, ) .await?; @@ -325,6 +349,7 @@ impl TaskManager { cached_ops_logs_client .as_ref() .and_then(|r| r.as_ref().ok()), + self.timeout, ) .await .map_err(SharedError::from); @@ -346,6 +371,7 @@ impl TaskManager { cached_ops_stats_client .as_ref() .and_then(|r| r.as_ref().ok()), + self.timeout, ) .await .map_err(SharedError::from); @@ -372,11 +398,11 @@ impl TaskManager { .collect_vec(), ops_logs_client: cached_ops_logs_client .as_ref() - .expect("this is guarinteed to be present") + .expect("this is guaranteed to be present") .clone(), ops_stats_client: cached_ops_stats_client .as_ref() - .expect("this is guarinteed to be present") + .expect("this is guaranteed to be present") .clone(), })))); @@ -413,6 +439,7 @@ async fn update_partition_info( task_name: &str, spec: &MaterializationSpec, mut info: HashMap)>>, + timeout: Duration, ) -> anyhow::Result)>>> { let mut tasks = Vec::with_capacity(spec.bindings.len()); @@ -452,6 +479,7 @@ async fn update_partition_info( exclude: None, }, existing_client.as_ref(), + timeout ) .await; @@ -496,11 +524,13 @@ async fn get_or_refresh_journal_client( capability: u32, selector: broker::LabelSelector, cached_client_and_claims: Option<&(journal::Client, proto_gazette::Claims)>, + timeout: Duration, ) -> anyhow::Result<(journal::Client, proto_gazette::Claims)> { if let Some((cached_client, claims)) = cached_client_and_claims { let now_unix = time::OffsetDateTime::now_utc().unix_timestamp(); - // Add a buffer to token expiry check - if claims.exp > now_unix as u64 + 60 { + // Refresh the client if its token is closer than REFRESH_START_AT to its expiration. + let refresh_from = (claims.exp - REFRESH_START_AT.as_millis() as u64) as i64; + if now_unix < refresh_from { tracing::debug!(task=%task_name, "Re-using existing journal client."); return Ok((cached_client.clone(), claims.clone())); } else { @@ -508,17 +538,41 @@ async fn get_or_refresh_journal_client( } } + let timeouts_allowed_until = if let Some((client, claims)) = cached_client_and_claims { + // If we have a cached client, we can use its expiration time to determine how long we can wait for the new client to be fetched. + Some((claims.exp, client, claims)) + } else { + None + }; + tracing::debug!(task=%task_name, capability, "Fetching new task authorization for journal client."); metrics::counter!("dekaf_fetch_auth", "endpoint" => "/authorize/task", "task_name" => task_name.to_owned()).increment(1); - flow_client::fetch_task_authorization( - flow_client, - &crate::dekaf_shard_template_id(task_name), - data_plane_fqdn, - data_plane_signer, - capability, - selector, + match tokio::time::timeout( + timeout, + flow_client::fetch_task_authorization( + flow_client, + &crate::dekaf_shard_template_id(task_name), + data_plane_fqdn, + data_plane_signer, + capability, + selector, + ), ) .await + { + Ok(resp) => resp, + Err(_) => { + if let Some((allowed_until, cached_client, cached_claims)) = timeouts_allowed_until { + if time::OffsetDateTime::now_utc().unix_timestamp() < allowed_until as i64 { + tracing::warn!(task=%task_name, allowed_until, "Timed out while fetching task authorization for journal client within acceptable retry window."); + return Ok((cached_client.clone(), cached_claims.clone())); + } + } + Err(anyhow::anyhow!( + "Timed out while fetching task authorization for journal client." + )) + } + } } /// Fetch the journals of a collection and map into stable-order partitions. @@ -560,14 +614,17 @@ pub async fn fetch_partitions( // Claims returned by `/authorize/dekaf` #[derive(Debug, Clone, serde::Deserialize)] pub struct AccessTokenClaims { + pub iat: u64, pub exp: u64, } +#[derive(Debug, Clone)] pub enum DekafTaskAuth { /// Task has been migrated to a different dataplane, and the session should redirect to it. Redirect { target_dataplane_fqdn: String, spec: MaterializationSpec, + fetched_at: time::OffsetDateTime, }, /// Task authorization data. Auth { @@ -579,6 +636,79 @@ pub enum DekafTaskAuth { }, } +impl DekafTaskAuth { + fn exp(&self) -> u64 { + match self { + DekafTaskAuth::Redirect { fetched_at, .. } => { + // Redirects are valid for 10 minutes + fetched_at.unix_timestamp() as u64 + 60 * 10 + } + DekafTaskAuth::Auth { claims, .. } => claims.exp, + } + } + fn refresh_at(&self) -> u64 { + // Refresh the client if its token is closer than REFRESH_START_AT to its expiration. + self.exp() - REFRESH_START_AT.as_millis() as u64 + } +} + +async fn get_or_refresh_dekaf_auth( + cached: Option, + client: &flow_client::Client, + shard_template_id: &str, + data_plane_fqdn: &str, + data_plane_signer: &jsonwebtoken::EncodingKey, + timeout: Duration, +) -> anyhow::Result { + let now = time::OffsetDateTime::now_utc().unix_timestamp() as u64; + + if let Some(cached_auth) = cached { + if now < cached_auth.refresh_at() { + tracing::debug!("DekafTaskAuth is still valid, no need to refresh."); + return Ok(cached_auth); + } + + // Try to refresh, but fall back to cached if timeout and still valid + match tokio::time::timeout( + timeout, + fetch_dekaf_task_auth( + client, + shard_template_id, + data_plane_fqdn, + data_plane_signer, + ), + ) + .await + { + Ok(resp) => resp, + Err(_) => { + if time::OffsetDateTime::now_utc().unix_timestamp() < cached_auth.exp() as i64 { + tracing::warn!( + "Timed out while refreshing DekafTaskAuth, but the token is still valid." + ); + return Ok(cached_auth); + } + anyhow::bail!( + "Timed out while refreshing DekafTaskAuth, and the token is expired." + ); + } + } + } else { + // No cached value, fetch new one + tokio::time::timeout( + timeout, + fetch_dekaf_task_auth( + client, + shard_template_id, + data_plane_fqdn, + data_plane_signer, + ), + ) + .await + .map_err(|_| anyhow::anyhow!("Timed out while fetching dekaf task auth"))? + } +} + #[tracing::instrument(skip(client, data_plane_signer), err)] async fn fetch_dekaf_task_auth( client: &flow_client::Client, @@ -647,6 +777,7 @@ async fn fetch_dekaf_task_auth( return Ok(DekafTaskAuth::Redirect { target_dataplane_fqdn: redirect_fqdn, spec: parsed_spec, + fetched_at: time::OffsetDateTime::now_utc(), }); } diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index f493fcfadc2..9ec12d80d88 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -1,5 +1,6 @@ use crate::{connector, utils, SessionAuthentication, TaskState}; use anyhow::{anyhow, bail, Context}; +use futures::StreamExt; use gazette::{ broker::{self, journal_spec, ReadResponse}, journal, uuid,