-
Notifications
You must be signed in to change notification settings - Fork 82
dekaf: Fix partially suspended journals returning zero offsets #2358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,7 @@ use crate::{connector, utils, SessionAuthentication, TaskAuth, TaskState, UserAu | |
| use anyhow::{anyhow, bail, Context}; | ||
| use futures::{StreamExt, TryStreamExt}; | ||
| use gazette::{ | ||
| broker::{self, journal_spec}, | ||
| broker::{self, journal_spec, ReadResponse}, | ||
| journal, uuid, | ||
| }; | ||
| use models::RawValue; | ||
|
|
@@ -115,6 +115,9 @@ impl Default for PartitionOffset { | |
| } | ||
| } | ||
|
|
||
| const OFFSET_REQUEST_EARLIEST: i64 = -2; | ||
| const OFFSET_REQUEST_LATEST: i64 = -1; | ||
|
|
||
| impl Collection { | ||
| /// Build a Collection by fetching its spec, an authenticated data-plane access token, and its partitions. | ||
| pub async fn new( | ||
|
|
@@ -332,6 +335,12 @@ impl Collection { | |
| } | ||
|
|
||
| /// Map a partition and timestamp into the newest covering fragment offset. | ||
| /// Request latest offset | ||
| /// - `suspend::Level::Full | suspend::Level::Partial`: `suspend.offset` | ||
| /// - `suspend::Level::None`: write offset returned by non-blocking read at `offset = -1` | ||
| /// Request earliest offset | ||
| /// - `suspend::Level::Full`: `suspend.offset` | ||
| /// - `suspend::Level::Partial | suspend::Level::None`: fragment listing with `begin_mod_time = 0`, return 0th fragment’s begin | ||
| pub async fn fetch_partition_offset( | ||
| &self, | ||
| partition_index: usize, | ||
|
|
@@ -341,31 +350,62 @@ impl Collection { | |
| return Ok(None); | ||
| }; | ||
|
|
||
| match partition.spec.suspend { | ||
| Some(suspend) if suspend.level == journal_spec::suspend::Level::Full as i32 => { | ||
| return Ok(Some(PartitionOffset { | ||
| fragment_start: suspend.offset, | ||
| offset: suspend.offset, | ||
| mod_time: -1, //UNKNOWN_TIMESTAMP | ||
| })); | ||
| let offset_data = match timestamp_millis { | ||
| OFFSET_REQUEST_LATEST => { | ||
| match partition.spec.suspend { | ||
| Some(suspend) | ||
| if suspend.level == journal_spec::suspend::Level::Full as i32 | ||
| || suspend.level == journal_spec::suspend::Level::Partial as i32 => | ||
| { | ||
| Some(PartitionOffset { | ||
| fragment_start: suspend.offset, | ||
| offset: suspend.offset, | ||
| mod_time: -1, // UNKNOWN_TIMESTAMP | ||
| }) | ||
| } | ||
| // Not suspended, so return high-water mark. | ||
| _ => self.fetch_write_head(partition_index).await?, | ||
| } | ||
| } | ||
| OFFSET_REQUEST_EARLIEST => { | ||
| match partition.spec.suspend { | ||
| Some(suspend) if suspend.level == journal_spec::suspend::Level::Full as i32 => { | ||
| Some(PartitionOffset { | ||
| fragment_start: suspend.offset, | ||
| offset: suspend.offset, | ||
| mod_time: -1, // UNKNOWN_TIMESTAMP | ||
| }) | ||
| } | ||
| // Not suspended or partially suspended, so return earliest available fragment offset. | ||
| _ => self.fetch_earliest_offset(partition_index).await?, | ||
| } | ||
| } | ||
| _ => { | ||
| // If fully suspended, return the suspend offset. There will be no fragments. | ||
| if let Some(suspend) = &partition.spec.suspend { | ||
| if suspend.level == journal_spec::suspend::Level::Full as i32 { | ||
| return Ok(Some(PartitionOffset { | ||
| fragment_start: suspend.offset, | ||
| offset: suspend.offset, | ||
| mod_time: -1, // UNKNOWN_TIMESTAMP | ||
| })); | ||
| } | ||
| } | ||
|
|
||
| // Otherwise, list fragments with begin_mod_time <= timestamp_millis and return the latest fragment's begin offset. | ||
| // This will return the currently open fragment if there is one and `timestamp_millis` is after any other fragment's | ||
| // `begin_mod_time` since because the fragment is still open and hasn't been persisted to cloud storage, it doesn't | ||
| // have a `begin_mod_time` at all. Not all journals will have an open fragment though, so we need to consider that. | ||
| let (not_before_sec, _) = self | ||
| .not_before | ||
| .map(|not_before| not_before.to_unix()) | ||
| .unwrap_or((0, 0)); | ||
|
|
||
| let begin_mod_time = if timestamp_millis == -1 { | ||
| i64::MAX // Sentinel for "largest available offset", | ||
| } else if timestamp_millis == -2 { | ||
| 0 // Sentinel for "first available offset" | ||
| let timestamp = timestamp_millis / 1_000; | ||
| let begin_mod_time = if timestamp < not_before_sec as i64 { | ||
| not_before_sec as i64 | ||
| } else { | ||
| let timestamp = timestamp_millis / 1_000; | ||
| if timestamp < not_before_sec as i64 { | ||
| not_before_sec as i64 | ||
| } else { | ||
| timestamp as i64 | ||
| } | ||
| timestamp as i64 | ||
| }; | ||
|
|
||
| let request = broker::FragmentsRequest { | ||
|
|
@@ -376,37 +416,113 @@ impl Collection { | |
| }; | ||
| let response = self.journal_client.list_fragments(request).await?; | ||
|
|
||
| let offset_data = match response.fragments.get(0) { | ||
| match response.fragments.get(0) { | ||
| // We found a fragment covering the requested timestamp, or we found the currently open fragment. | ||
| Some(broker::fragments_response::Fragment { | ||
| spec: Some(spec), .. | ||
| }) => { | ||
| if timestamp_millis == -1 { | ||
| PartitionOffset { | ||
| fragment_start: spec.begin, | ||
| // Subtract one to reflect the largest fetch-able offset of the fragment. | ||
| offset: spec.end - 1, | ||
| mod_time: spec.mod_time, | ||
| } | ||
| } else { | ||
| PartitionOffset { | ||
| fragment_start: spec.begin, | ||
| offset: spec.begin, | ||
| mod_time: spec.mod_time, | ||
| } | ||
| } | ||
| }) => Some(PartitionOffset { | ||
| fragment_start: spec.begin, | ||
| offset: spec.begin, | ||
| mod_time: spec.mod_time, | ||
| }), | ||
| // No fragments found. This can happen if there are no fragments at all, or if the requested timestamp is | ||
| // after the latest fragment's begin_mod_time and there is no currently open fragment. In this case, return | ||
| // the high-water mark as the requested timestamp is beyond any known offset. | ||
| None => self.fetch_write_head(partition_index).await?, | ||
|
||
| Some(broker::fragments_response::Fragment { spec: None, .. }) => { | ||
| anyhow::bail!("fragment missing spec"); | ||
| } | ||
| _ => PartitionOffset::default(), | ||
| }; | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| tracing::debug!( | ||
| collection = self.spec.name, | ||
| ?offset_data, | ||
| partition_index, | ||
| timestamp_millis, | ||
| "fetched offset" | ||
| ); | ||
| tracing::debug!( | ||
| collection = self.spec.name, | ||
| ?offset_data, | ||
| partition_index, | ||
| timestamp_millis, | ||
| "fetched offset" | ||
| ); | ||
|
|
||
| return Ok(Some(offset_data)); | ||
| Ok(offset_data) | ||
| } | ||
|
|
||
| #[tracing::instrument(skip(self))] | ||
| async fn fetch_earliest_offset( | ||
| &self, | ||
| partition_index: usize, | ||
| ) -> anyhow::Result<Option<PartitionOffset>> { | ||
| let Some(partition) = self.partitions.get(partition_index) else { | ||
| return Ok(None); | ||
| }; | ||
|
|
||
| let request = broker::FragmentsRequest { | ||
| journal: partition.spec.name.clone(), | ||
| begin_mod_time: 0, // Fetch earliest offset | ||
| page_limit: 1, | ||
| ..Default::default() | ||
| }; | ||
| let response = self | ||
| .journal_client | ||
| .list_fragments(request) | ||
| .await | ||
| .context("listing fragments to fetch earliest offset")?; | ||
|
|
||
| match response.fragments.get(0) { | ||
jshearer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Some(broker::fragments_response::Fragment { | ||
| spec: Some(spec), .. | ||
| }) => Ok(Some(PartitionOffset { | ||
| fragment_start: spec.begin, | ||
| offset: spec.begin, | ||
| mod_time: spec.mod_time, | ||
| })), | ||
| _ => Ok(None), | ||
| } | ||
| } | ||
|
|
||
| /// Fetch the write head of a journal by issuing a non-blocking read request at offset -1 | ||
| #[tracing::instrument(skip(self))] | ||
| async fn fetch_write_head( | ||
| &self, | ||
| partition_index: usize, | ||
| ) -> anyhow::Result<Option<PartitionOffset>> { | ||
| let Some(partition) = self.partitions.get(partition_index) else { | ||
| return Ok(None); | ||
| }; | ||
|
|
||
| let request = broker::ReadRequest { | ||
| journal: partition.spec.name.clone(), | ||
| offset: -1, // Fetch write head | ||
| ..Default::default() | ||
| }; | ||
| let response_stream = self.journal_client.clone().read(request); | ||
| tokio::pin!(response_stream); | ||
|
|
||
| // Continue polling the stream until we get Ok or a non-transient error | ||
| loop { | ||
| match response_stream.next().await { | ||
| Some(Ok(ReadResponse { | ||
| write_head, | ||
| fragment, | ||
| .. | ||
| })) => { | ||
| return Ok(Some(PartitionOffset { | ||
| fragment_start: fragment.map(|f| f.begin).unwrap_or(0), | ||
| offset: write_head, | ||
| mod_time: -1, | ||
| })) | ||
| } | ||
| Some(Err(e)) => { | ||
| if e.inner.is_transient() { | ||
| continue; | ||
| } else { | ||
| return Err(anyhow::Error::new(e.inner).context(format!( | ||
| "failed to fetch write head after {} retries", | ||
| e.attempt | ||
| ))); | ||
| } | ||
| } | ||
| None => anyhow::bail!("read stream ended unexpectedly"), | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced this is right... We don't have any notion of a timestamp in the
partition.spec.suspenddata, so what if the requested timestamp is way in the past? Maybe this really should return 0? Or an error message likeOFFSET_NOT_AVAILABLE🤔There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppose you have a kafka partition with no current records in it, but which has had data in the past (so offset is != zero). How does it behave? We should do the same.
I'd guess that right behavior is returning
partition.spec.suspend.offset, but that's ☝️ the real question.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the full Kafka codebase, Gemini says says that the Kafka server returns
UNKNOWN_OFFSET(-1) when asked to list offsets at a positive timestamp for a topic that has had data written to it, but that data has since expired out due to the retention policy:Here's the function that builds that response:
And here's where it's called when no offsets can be found:
I also asked about
-1and-2, and the behavior we have here matches: return the known high-watermark, akapartition.spec.suspend.offset.Positive Timestamp
When a consumer provides a specific positive timestamp, the broker searches its log segments to find the first message with a timestamp greater than or equal to the one provided.
In this scenario, since all log segments have been deleted by retention, the partition contains no data. Therefore, the broker's search will find no matching message for any positive timestamp.
The code in
KafkaApis#handleListOffsetRequestandReplicaManager#fetchOffsetshows that when no offset is found for a given timestamp, the broker constructs a response for that partition with an offset of -1 (UNKNOWN_OFFSET) and a timestamp of -1 (UNKNOWN_TIMESTAMP).Behavior: The API call will return a response indicating that no offset could be found for the given timestamp. The response for the partition will have an offset of -1. This signals to the consumer that there are no messages at or after the requested timestamp.