-
Notifications
You must be signed in to change notification settings - Fork 82
dekaf: Improve timeout handling when refreshing tokens #2348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
29c58cf to
709ecc5
Compare
| let current_value = temp_rx.borrow_and_update(); | ||
| if let Some(ref result) = *current_value { | ||
| return result.clone().map_err(anyhow::Error::from); | ||
| match &*current_value { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We used to happily hand out expired tokens here, just assuming that the TaskManager loop could keep up. That's the real cause of the problem
| fn exp(&self) -> u64 { | ||
| match self { | ||
| DekafTaskAuth::Redirect { fetched_at, .. } => { | ||
| // Redirects are valid for 10 minutes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just made this up. Since redirects don't get a token, they don't get an agent-api-specified expiration. 10 minutes seems more than fine, as the only time where this would matter is if a task were migrated such that it is no longer a redirect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 Does this mean that we'll now start to expire Redirects, where previously we haven't? Just wanting to double check whether that poses any risk for existing tasks that may not have encountered that before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We used to re-fetch every 30s no matter what. Now we only fetch when the (made up) expiration is coming up. I don't think this substantively changes the behavior, just the amount of time a redirect response can be cached for
|
This has been running on One thing I want to do before merging is to test the behavior during a migration, as that has a similar effect of causing tokens to be unable to be refreshed for a period of time. |
psFried
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of questions on this, but nothing major.
0c500bd to
4811675
Compare
There was an event early this morning that ended up causing a lot of connection drops. After investigation, I believe it has to do with waiting indefinitely on requests to `/authorize/task` and `/authorize/dekaf`. I was not able to find any logs in agent-api indicating long requests that eventually succeeded, but other evidence includes: * Dekaf logs complaining about expired tokens: `verifying Authorization: token has invalid claims: token is expired` * Dekaf logs from `Read::next_batch()` saying `second time provided was later than self` * This is coming from `SystemTime::duration_since`. The only way I could imagine this happening is if `new_stream`, which fetches the latest `TaskState` from its `TaskStateListener` was to get an expired token. The assumption is that `TaskStateListener::get()` will either return a valid non-expired token, or an error. If the TaskManager's loop were to get stuck waiting on one of the network requests it makes, this assumption would be broken and we would see the behavior that we saw. So, to fix the problem: * I updated `TaskManager` to proactively refresh its tokens earlier, added timeouts to the network calls, and made it resilient in the face of those timeouts: while the token is still valid, keep retrying and returning the cached token if refresh requests time out. * I updated `Read` to be a bit smarter about calculating its timeout
4811675 to
4c5da99
Compare
psFried
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| fn exp(&self) -> u64 { | ||
| match self { | ||
| DekafTaskAuth::Redirect { fetched_at, .. } => { | ||
| // Redirects are valid for 10 minutes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 Does this mean that we'll now start to expire Redirects, where previously we haven't? Just wanting to double check whether that poses any risk for existing tasks that may not have encountered that before.
Since #2348 changed `TaskManager`'s behavior to only call `/authorize/dekaf` ~5 minutes before token expiration, we would only fetch updated task specs that frequently. This is a bug, we need to be refreshing task specs much more frequently than that. So instead, let's set a fairly short upper bound on how stale materialization specs can be and start trying to refresh after that passes.
Since #2348 changed `TaskManager`'s behavior to only call `/authorize/dekaf` ~5 minutes before token expiration, we would only fetch updated task specs that frequently. This is a bug, we need to be refreshing task specs much more frequently than that. So instead, let's set a fairly short upper bound on how stale materialization specs can be and start trying to refresh after that passes.
Description:
There was an event early this morning that ended up causing a lot of connection drops. After investigation, I believe it has to do with waiting indefinitely on requests to
/authorize/taskand/authorize/dekaf. I was not able to find any logs in agent-api indicating long requests that eventually succeeded, but other evidence includes:Dekaf logs complaining about expired tokens:
verifying Authorization: token has invalid claims: token is expired. This means that we're attempting to use a journal client with an expired token.Dekaf logs from
Read::next_batch()sayingsecond time provided was later than selfThis is coming from
SystemTime::duration_since. The only way I could imagine this happening is ifRead::new_stream(), which fetches the latestTaskStatefrom itsTaskStateListenerwas to get an expired token. The assumption is thatTaskStateListener::get()will either return a valid non-expired token, or an error.If the TaskManager's loop were to get stuck waiting on one of the network requests it makes, this assumption would be broken and we would see the behavior that we saw.
So, to fix the problem:
TaskManagerto:Readto error if, even after attempting to refresh its token, the provided expiration is still in the past.TASK_REQUEST_TIMEOUT/--task-request-timeoutwhich allows configuring the TaskManager timeout. I defaulted it to 20s which seems long, but agent-api has a backoff mechanism that includes multi-second delays which are covered under this timeout, and I wanted to avoid false-positive timeouts.This change is