Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 75 additions & 36 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ impl Default for PartitionOffset {
}
}

const OFFSET_REQUEST_EARLIEST: i64 = -2;
const OFFSET_REQUEST_LATEST: i64 = -1;

impl Collection {
/// Build a Collection by fetching its spec, an authenticated data-plane access token, and its partitions.
pub async fn new(
Expand Down Expand Up @@ -332,12 +335,12 @@ impl Collection {
}

/// Map a partition and timestamp into the newest covering fragment offset.
/// Request earliest offset
/// - `suspend::Level::Full`: `suspend.offset`
/// - `suspend::Level::Partial | suspend::Level::None`: fragment listing with `begin_mod_time = 0`, return 0th fragment’s begin
/// Request latest offset
/// - `suspend::Level::Full | suspend::Level::Partial`: `suspend.offset`
/// - `suspend::Level::None`: write offset returned by non-blocking read at `offset = -1`
/// Request earliest offset
/// - `suspend::Level::Full`: `suspend.offset`
/// - `suspend::Level::Partial | suspend::Level::None`: fragment listing with `begin_mod_time = 0`, return 0th fragment’s begin
pub async fn fetch_partition_offset(
&self,
partition_index: usize,
Expand All @@ -348,42 +351,51 @@ impl Collection {
};

let offset_data = match timestamp_millis {
// Request latest offset
-1 => {
OFFSET_REQUEST_LATEST => {
match partition.spec.suspend {
Some(suspend)
if suspend.level == journal_spec::suspend::Level::Full as i32
|| suspend.level == journal_spec::suspend::Level::Partial as i32 =>
{
PartitionOffset {
Some(PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, // UNKNOWN_TIMESTAMP
}
})
}
_ => self
.fetch_write_head(partition_index)
.await?
.unwrap_or_default(),
// Not suspended, so return high-water mark.
_ => self.fetch_write_head(partition_index).await?,
}
}
// Request earliest offset
-2 => {
OFFSET_REQUEST_EARLIEST => {
match partition.spec.suspend {
Some(suspend) if suspend.level == journal_spec::suspend::Level::Full as i32 => {
PartitionOffset {
Some(PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, // UNKNOWN_TIMESTAMP
}
})
}
_ => self
.fetch_earliest_offset(partition_index)
.await?
.unwrap_or_default(),
// Not suspended or partially suspended, so return earliest available fragment offset.
_ => self.fetch_earliest_offset(partition_index).await?,
}
}
_ => {
// If fully suspended, return the suspend offset. There will be no fragments.
Copy link
Contributor Author

@jshearer jshearer Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this is right... We don't have any notion of a timestamp in the partition.spec.suspend data, so what if the requested timestamp is way in the past? Maybe this really should return 0? Or an error message like OFFSET_NOT_AVAILABLE 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose you have a kafka partition with no current records in it, but which has had data in the past (so offset is != zero). How does it behave? We should do the same.

I'd guess that right behavior is returning partition.spec.suspend.offset, but that's ☝️ the real question.

Copy link
Contributor Author

@jshearer jshearer Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the full Kafka codebase, Gemini says says that the Kafka server returns UNKNOWN_OFFSET (-1) when asked to list offsets at a positive timestamp for a topic that has had data written to it, but that data has since expired out due to the retention policy:

Here's the function that builds that response:

    def buildErrorResponse(e: Errors, partition: ListOffsetsPartition): ListOffsetsPartitionResponse = {
      new ListOffsetsPartitionResponse()
        .setPartitionIndex(partition.partitionIndex)
        .setErrorCode(e.code)
        .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
        .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
    }

And here's where it's called when no offsets can be found:

              } else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isEmpty) {
                // This is an empty offset response scenario
                resultHolder.maybeOffsetsError.map(e => throw e)
                ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.NONE, partition))).build()
              }

I also asked about -1 and -2, and the behavior we have here matches: return the known high-watermark, aka partition.spec.suspend.offset.


Positive Timestamp

When a consumer provides a specific positive timestamp, the broker searches its log segments to find the first message with a timestamp greater than or equal to the one provided.

In this scenario, since all log segments have been deleted by retention, the partition contains no data. Therefore, the broker's search will find no matching message for any positive timestamp.

The code in KafkaApis#handleListOffsetRequest and ReplicaManager#fetchOffset shows that when no offset is found for a given timestamp, the broker constructs a response for that partition with an offset of -1 (UNKNOWN_OFFSET) and a timestamp of -1 (UNKNOWN_TIMESTAMP).

Behavior: The API call will return a response indicating that no offset could be found for the given timestamp. The response for the partition will have an offset of -1. This signals to the consumer that there are no messages at or after the requested timestamp.

if let Some(suspend) = &partition.spec.suspend {
if suspend.level == journal_spec::suspend::Level::Full as i32 {
return Ok(Some(PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, // UNKNOWN_TIMESTAMP
}));
}
}

// Otherwise, list fragments with begin_mod_time <= timestamp_millis and return the latest fragment's begin offset.
// This will return the currently open fragment if there is one and `timestamp_millis` is after any other fragment's
// `begin_mod_time` since because the fragment is still open and hasn't been persisted to cloud storage, it doesn't
// have a `begin_mod_time` at all. Not all journals will have an open fragment though, so we need to consider that.
let (not_before_sec, _) = self
.not_before
.map(|not_before| not_before.to_unix())
Expand All @@ -405,14 +417,21 @@ impl Collection {
let response = self.journal_client.list_fragments(request).await?;

match response.fragments.get(0) {
// We found a fragment covering the requested timestamp, or we found the currently open fragment.
Some(broker::fragments_response::Fragment {
spec: Some(spec), ..
}) => PartitionOffset {
}) => Some(PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
},
_ => PartitionOffset::default(),
}),
// No fragments found. This can happen if there are no fragments at all, or if the requested timestamp is
// after the latest fragment's begin_mod_time and there is no currently open fragment. In this case, return
// the high-water mark as the requested timestamp is beyond any known offset.
None => self.fetch_write_head(partition_index).await?,
Copy link
Contributor Author

@jshearer jshearer Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is the right behavior:

  • The request timestamp is > any existing fragment's begin_mod_time, and there just isn't an open fragment since the journal hasn't seen writes for longer than its flush interval, or is partially suspended or whatever the reason may be. In that case, the high-water-mark seems to me to be the clearly correct response
  • There are no fragments at all, but the journal isn't fully suspended. This must be because all of the fragments have expired or were deleted from cloud storage. What will a ReadRequest's metadata response return here? Whatever it is, I feel like that's the right thing to return.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a suspended journal cannot have a write head different from spec.suspend.offset. If you issue a ReadRequest, it will give you that same offset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like I was wrong here. Specifically, the cases where this line hits are:

  • suspend::Level::Partial so there is no open fragment, and the provided timestamp is after any existing persisted fragment's mod_time (and there cannot be an open fragment since the journal is partially suspended)
  • Not suspended, and either all fragments have expired from cloud storage, no data has ever been written, or the provided timestamp is after any persisted fragment's mod_time and there is no open fragment (maybe the collection hasn't seen any new data for longer than its flush interval?)

I now believe that both of these cases are the same case as above when the journal is fully suspended: a request for offsets when there are no covering fragments. As I discovered above, Kafka returns UNKNOWN_OFFSET (-1) in this case, so I believe that Dekaf should too.

Some(broker::fragments_response::Fragment { spec: None, .. }) => {
anyhow::bail!("fragment missing spec");
}
}
}
};
Expand All @@ -425,9 +444,10 @@ impl Collection {
"fetched offset"
);

return Ok(Some(offset_data));
Ok(offset_data)
}

#[tracing::instrument(skip(self))]
async fn fetch_earliest_offset(
&self,
partition_index: usize,
Expand All @@ -442,7 +462,11 @@ impl Collection {
page_limit: 1,
..Default::default()
};
let response = self.journal_client.list_fragments(request).await?;
let response = self
.journal_client
.list_fragments(request)
.await
.context("listing fragments to fetch earliest offset")?;

match response.fragments.get(0) {
Some(broker::fragments_response::Fragment {
Expand All @@ -457,6 +481,7 @@ impl Collection {
}

/// Fetch the write head of a journal by issuing a non-blocking read request at offset -1
#[tracing::instrument(skip(self))]
async fn fetch_write_head(
&self,
partition_index: usize,
Expand All @@ -472,19 +497,33 @@ impl Collection {
};
let response_stream = self.journal_client.clone().read(request);
tokio::pin!(response_stream);
let response = response_stream.next().await;

match response {
Some(Ok(ReadResponse {
write_head,
fragment,
..
})) => Ok(Some(PartitionOffset {
fragment_start: fragment.map(|f| f.begin).unwrap_or(0),
offset: write_head,
mod_time: -1,
})),
_ => Ok(None),
// Continue polling the stream until we get Ok or a non-transient error
loop {
match response_stream.next().await {
Some(Ok(ReadResponse {
write_head,
fragment,
..
})) => {
return Ok(Some(PartitionOffset {
fragment_start: fragment.map(|f| f.begin).unwrap_or(0),
offset: write_head,
mod_time: -1,
}))
}
Some(Err(e)) => {
if e.inner.is_transient() {
continue;
} else {
return Err(anyhow::Error::new(e.inner).context(format!(
"failed to fetch write head after {} retries",
e.attempt
)));
}
}
None => anyhow::bail!("read stream ended unexpectedly"),
}
}
}

Expand Down
Loading