Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 170 additions & 53 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{connector, utils, SessionAuthentication, TaskAuth, TaskState, UserAu
use anyhow::{anyhow, bail, Context};
use futures::{StreamExt, TryStreamExt};
use gazette::{
broker::{self, journal_spec},
broker::{self, journal_spec, ReadResponse},
journal, uuid,
};
use models::RawValue;
Expand Down Expand Up @@ -105,15 +105,8 @@ pub struct PartitionOffset {
pub mod_time: i64,
}

impl Default for PartitionOffset {
fn default() -> Self {
Self {
mod_time: -1, // UNKNOWN_TIMESTAMP
fragment_start: 0,
offset: 0,
}
}
}
const OFFSET_REQUEST_EARLIEST: i64 = -2;
const OFFSET_REQUEST_LATEST: i64 = -1;

impl Collection {
/// Build a Collection by fetching its spec, an authenticated data-plane access token, and its partitions.
Expand Down Expand Up @@ -332,6 +325,12 @@ impl Collection {
}

/// Map a partition and timestamp into the newest covering fragment offset.
/// Request latest offset
/// - `suspend::Level::Full | suspend::Level::Partial`: `suspend.offset`
/// - `suspend::Level::None`: write offset returned by non-blocking read at `offset = -1`
/// Request earliest offset
/// - `suspend::Level::Full`: `suspend.offset`
/// - `suspend::Level::Partial | suspend::Level::None`: fragment listing with `begin_mod_time = 0`, return 0th fragment’s begin
pub async fn fetch_partition_offset(
&self,
partition_index: usize,
Expand All @@ -341,31 +340,63 @@ impl Collection {
return Ok(None);
};

match partition.spec.suspend {
Some(suspend) if suspend.level == journal_spec::suspend::Level::Full as i32 => {
return Ok(Some(PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, //UNKNOWN_TIMESTAMP
}));
let offset_data = match timestamp_millis {
OFFSET_REQUEST_LATEST => {
match partition.spec.suspend {
Some(suspend)
if suspend.level == journal_spec::suspend::Level::Full as i32
|| suspend.level == journal_spec::suspend::Level::Partial as i32 =>
{
Some(PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, // UNKNOWN_TIMESTAMP
})
}
// Not suspended, so return high-water mark.
_ => self.fetch_write_head(partition_index).await?,
}
}
OFFSET_REQUEST_EARLIEST => {
match partition.spec.suspend {
Some(suspend) if suspend.level == journal_spec::suspend::Level::Full as i32 => {
Some(PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, // UNKNOWN_TIMESTAMP
})
}
// Not suspended or partially suspended, so return earliest available fragment offset.
_ => self.fetch_earliest_offset(partition_index).await?,
}
}
_ => {
// If fully suspended, there are no actual fragments to search through, so we have no way to correlate
// timestamps with offsets. Kafka returns UNKNOWN_OFFSET in this case, so we do the same.
if let Some(suspend) = &partition.spec.suspend {
if suspend.level == journal_spec::suspend::Level::Full as i32 {
return Ok(Some(PartitionOffset {
fragment_start: suspend.offset,
offset: -1, // UNKNOWN_OFFSET
mod_time: -1, // UNKNOWN_TIMESTAMP
}));
}
}

// Otherwise, list fragments with begin_mod_time <= timestamp_millis and return the latest fragment's begin offset.
// This will return the currently open fragment if there is one and `timestamp_millis` is after any other fragment's
// `begin_mod_time` since because the fragment is still open and hasn't been persisted to cloud storage, it doesn't
// have a `begin_mod_time` at all. Not all journals will have an open fragment though, so we need to consider that.
let (not_before_sec, _) = self
.not_before
.map(|not_before| not_before.to_unix())
.unwrap_or((0, 0));

let begin_mod_time = if timestamp_millis == -1 {
i64::MAX // Sentinel for "largest available offset",
} else if timestamp_millis == -2 {
0 // Sentinel for "first available offset"
let timestamp = timestamp_millis / 1_000;
let begin_mod_time = if timestamp < not_before_sec as i64 {
not_before_sec as i64
} else {
let timestamp = timestamp_millis / 1_000;
if timestamp < not_before_sec as i64 {
not_before_sec as i64
} else {
timestamp as i64
}
timestamp as i64
};

let request = broker::FragmentsRequest {
Expand All @@ -376,37 +407,123 @@ impl Collection {
};
let response = self.journal_client.list_fragments(request).await?;

let offset_data = match response.fragments.get(0) {
match response.fragments.get(0) {
// We found a fragment covering the requested timestamp, or we found the currently open fragment.
Some(broker::fragments_response::Fragment {
spec: Some(spec), ..
}) => {
if timestamp_millis == -1 {
PartitionOffset {
fragment_start: spec.begin,
// Subtract one to reflect the largest fetch-able offset of the fragment.
offset: spec.end - 1,
mod_time: spec.mod_time,
}
} else {
PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
}
}
}) => Some(PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
}),
// The cases where this line hits are:
// * `suspend::Level::Partial` so there is no open fragment, and the provided timestamp is after any
// existing persisted fragment's `mod_time` (and there cannot be an open fragment since the journal is partially suspended)
// * Not suspended, and either all fragments have expired from cloud storage, no data has ever been written,
// or the provided timestamp is after any persisted fragment's `mod_time` and there is no open fragment
// (maybe the collection hasn't seen any new data for longer than its flush interval?)
// Both of these cases are the same case as above when the journal is fully suspended: a request for offsets
// when there are no covering fragments. As I discovered above, Kafka returns `UNKNOWN_OFFSET` (-1) in this case,
// so I believe that Dekaf should too.
None => Some(PartitionOffset {
fragment_start: -1,
offset: -1, // UNKNOWN_OFFSET
mod_time: -1, // UNKNOWN_TIMESTAMP
}),
Some(broker::fragments_response::Fragment { spec: None, .. }) => {
anyhow::bail!("fragment missing spec");
}
_ => PartitionOffset::default(),
};
}
}
};

tracing::debug!(
collection = self.spec.name,
?offset_data,
partition_index,
timestamp_millis,
"fetched offset"
);
tracing::debug!(
collection = self.spec.name,
?offset_data,
partition_index,
timestamp_millis,
"fetched offset"
);

Ok(offset_data)
}

return Ok(Some(offset_data));
#[tracing::instrument(skip(self))]
async fn fetch_earliest_offset(
&self,
partition_index: usize,
) -> anyhow::Result<Option<PartitionOffset>> {
let Some(partition) = self.partitions.get(partition_index) else {
return Ok(None);
};

let request = broker::FragmentsRequest {
journal: partition.spec.name.clone(),
begin_mod_time: 0, // Fetch earliest offset
page_limit: 1,
..Default::default()
};
let response = self
.journal_client
.list_fragments(request)
.await
.context("listing fragments to fetch earliest offset")?;

match response.fragments.get(0) {
Some(broker::fragments_response::Fragment {
spec: Some(spec), ..
}) => Ok(Some(PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
})),
_ => Ok(None),
}
}

/// Fetch the write head of a journal by issuing a non-blocking read request at offset -1
#[tracing::instrument(skip(self))]
async fn fetch_write_head(
&self,
partition_index: usize,
) -> anyhow::Result<Option<PartitionOffset>> {
let Some(partition) = self.partitions.get(partition_index) else {
return Ok(None);
};

let request = broker::ReadRequest {
journal: partition.spec.name.clone(),
offset: -1, // Fetch write head
..Default::default()
};
let response_stream = self.journal_client.clone().read(request);
tokio::pin!(response_stream);

// Continue polling the stream until we get Ok or a non-transient error
loop {
match response_stream.next().await {
Some(Ok(ReadResponse {
write_head,
fragment,
..
})) => {
return Ok(Some(PartitionOffset {
fragment_start: fragment.map(|f| f.begin).unwrap_or(0),
offset: write_head,
mod_time: -1,
}))
}
Some(Err(e)) => {
if e.inner.is_transient() {
continue;
} else {
return Err(anyhow::Error::new(e.inner).context(format!(
"failed to fetch write head after {} retries",
e.attempt
)));
}
}
None => anyhow::bail!("read stream ended unexpectedly"),
}
}
}
Expand Down
Loading