Skip to content

Conversation

@jshearer
Copy link
Contributor

@jshearer jshearer commented Aug 22, 2025

Description:

Partially suspended journals were incorrectly returning a value of 0 as the high-watermark, confusing some consumers and preventing them from reading data. The issue occurred because fragment listings with begin_mod_time=i64::MAX return empty for partially suspended journals, since there is no open fragment.

The new behavior is now:

  • Request earliest offset
    • suspend::Level::Fullsuspend.offset
    • suspend::Level::Partial | suspend::Level::None: fragment listing with begin_mod_time = 0, return 0th fragment's begin
  • Request latest offset
    • suspend::Level::Full | suspend::Level::Partialsuspend.offset
    • suspend::Level::None: write offset returned by non-blocking read at offset = -1
      • This is different than before where we would issue a fragment listing with max begin_mod_time. The behavior ought to be the same, but this is more correct.

Testing

Collection: joseph/test-dekaf-migrations/events
Materialization: joseph/test-dekaf-migrations/dekaf-generic
Data Plane: gcp: us-central1 c1

Request latest offset against dekaf-dev pre-deploy, pre-suspend

$ kcat -o beginning  -b dekaf-dev.estuary-data.com:9092 \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X sasl.username='joseph/test-dekaf-migrations/dekaf-generic' \
-X sasl.password="..." -Q -t "events2:0:-1"
events2 [0] offset 414002871

Suspending

$ flowctl raw gazctl-env --data-plane ops/dp/public/gcp-us-central1-c1 --prefix joseph/test-dekaf-migrations/ --admin

export BROKER_ADDRESS=https://us-central1.v1.estuary-data.dev
... redacted ...

$ flowctl-go journals suspend -l name=joseph/test-dekaf-migrations/events/11eaa81bb5000214/pivot=00 --force

$ flowctl-go journals list -l name:prefix=joseph/test-dekaf-migrations/events -ojson | jq

{
  "status": "OK",
  ...
    "suspend": {
	  "level": "PARTIAL",
	  "offset": "414002872"
    }
  ...
}

Request latest offset against dekaf-dev pre-deploy, post-suspend: ❌

$ kcat -o beginning  -b dekaf-dev.estuary-data.com:9092 \   -X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X sasl.username='joseph/test-dekaf-migrations/dekaf-generic' \
-X sasl.password="..." -Q -t "events2:0:-1"
events2 [0] offset 0

Request latest offset against dekaf-dev post-deploy, post-suspend: ✅

$ kcat -o beginning  -b dekaf-dev.estuary-data.com:9092 \   
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X sasl.username='joseph/test-dekaf-migrations/dekaf-generic' \
-X sasl.password="..." -Q -t "events2:0:-1"
events2 [0] offset 414002872

This change is Reviewable

Partially suspended journals were incorrectly returning offset 0 for both earliest and latest positions, preventing Kafka consumers from reading available data. The issue occurred because fragment listings with `begin_mod_time=i64::MAX` return empty for suspended journals. For partially suspended journals, we now:

* Return `suspend.offset` as the latest position (the write head)
* Continue using fragment listings for the earliest position (readable data still exists)
@jshearer jshearer marked this pull request as ready for review August 25, 2025 15:49
@jshearer jshearer requested a review from jgraettinger August 25, 2025 15:50
}
}
_ => {
// If fully suspended, return the suspend offset. There will be no fragments.
Copy link
Contributor Author

@jshearer jshearer Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this is right... We don't have any notion of a timestamp in the partition.spec.suspend data, so what if the requested timestamp is way in the past? Maybe this really should return 0? Or an error message like OFFSET_NOT_AVAILABLE 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose you have a kafka partition with no current records in it, but which has had data in the past (so offset is != zero). How does it behave? We should do the same.

I'd guess that right behavior is returning partition.spec.suspend.offset, but that's ☝️ the real question.

Copy link
Contributor Author

@jshearer jshearer Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the full Kafka codebase, Gemini says says that the Kafka server returns UNKNOWN_OFFSET (-1) when asked to list offsets at a positive timestamp for a topic that has had data written to it, but that data has since expired out due to the retention policy:

Here's the function that builds that response:

    def buildErrorResponse(e: Errors, partition: ListOffsetsPartition): ListOffsetsPartitionResponse = {
      new ListOffsetsPartitionResponse()
        .setPartitionIndex(partition.partitionIndex)
        .setErrorCode(e.code)
        .setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
        .setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
    }

And here's where it's called when no offsets can be found:

              } else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isEmpty) {
                // This is an empty offset response scenario
                resultHolder.maybeOffsetsError.map(e => throw e)
                ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.NONE, partition))).build()
              }

I also asked about -1 and -2, and the behavior we have here matches: return the known high-watermark, aka partition.spec.suspend.offset.


Positive Timestamp

When a consumer provides a specific positive timestamp, the broker searches its log segments to find the first message with a timestamp greater than or equal to the one provided.

In this scenario, since all log segments have been deleted by retention, the partition contains no data. Therefore, the broker's search will find no matching message for any positive timestamp.

The code in KafkaApis#handleListOffsetRequest and ReplicaManager#fetchOffset shows that when no offset is found for a given timestamp, the broker constructs a response for that partition with an offset of -1 (UNKNOWN_OFFSET) and a timestamp of -1 (UNKNOWN_TIMESTAMP).

Behavior: The API call will return a response indicating that no offset could be found for the given timestamp. The response for the partition will have an offset of -1. This signals to the consumer that there are no messages at or after the requested timestamp.

// No fragments found. This can happen if there are no fragments at all, or if the requested timestamp is
// after the latest fragment's begin_mod_time and there is no currently open fragment. In this case, return
// the high-water mark as the requested timestamp is beyond any known offset.
None => self.fetch_write_head(partition_index).await?,
Copy link
Contributor Author

@jshearer jshearer Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is the right behavior:

  • The request timestamp is > any existing fragment's begin_mod_time, and there just isn't an open fragment since the journal hasn't seen writes for longer than its flush interval, or is partially suspended or whatever the reason may be. In that case, the high-water-mark seems to me to be the clearly correct response
  • There are no fragments at all, but the journal isn't fully suspended. This must be because all of the fragments have expired or were deleted from cloud storage. What will a ReadRequest's metadata response return here? Whatever it is, I feel like that's the right thing to return.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a suspended journal cannot have a write head different from spec.suspend.offset. If you issue a ReadRequest, it will give you that same offset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like I was wrong here. Specifically, the cases where this line hits are:

  • suspend::Level::Partial so there is no open fragment, and the provided timestamp is after any existing persisted fragment's mod_time (and there cannot be an open fragment since the journal is partially suspended)
  • Not suspended, and either all fragments have expired from cloud storage, no data has ever been written, or the provided timestamp is after any persisted fragment's mod_time and there is no open fragment (maybe the collection hasn't seen any new data for longer than its flush interval?)

I now believe that both of these cases are the same case as above when the journal is fully suspended: a request for offsets when there are no covering fragments. As I discovered above, Kafka returns UNKNOWN_OFFSET (-1) in this case, so I believe that Dekaf should too.

@jshearer jshearer self-assigned this Aug 25, 2025
The Kafka server returns `UNKNOWN_OFFSET` (-1) when asked to list offsets at a positive timestamp for a topic that has had data written to it, but that data has since expired out of retention
@jshearer jshearer force-pushed the dekaf/fix_partition_fetching_for_partially_suspended_journals branch from 48ab4b1 to 5fdf23a Compare August 28, 2025 17:39
@jshearer jshearer requested a review from jgraettinger August 29, 2025 14:11
Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@jshearer jshearer merged commit 44565ea into master Aug 29, 2025
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants