Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 122 additions & 45 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{connector, utils, SessionAuthentication, TaskAuth, TaskState, UserAu
use anyhow::{anyhow, bail, Context};
use futures::{StreamExt, TryStreamExt};
use gazette::{
broker::{self, journal_spec},
broker::{self, journal_spec, ReadResponse},
journal, uuid,
};
use models::RawValue;
Expand Down Expand Up @@ -332,6 +332,12 @@ impl Collection {
}

/// Map a partition and timestamp into the newest covering fragment offset.
/// Request earliest offset
/// - `suspend::Level::Full`: `suspend.offset`
/// - `suspend::Level::Partial | suspend::Level::None`: fragment listing with `begin_mod_time = 0`, return 0th fragment’s begin
/// Request latest offset
/// - `suspend::Level::Full | suspend::Level::Partial`: `suspend.offset`
/// - `suspend::Level::None`: write offset returned by non-blocking read at `offset = -1`
pub async fn fetch_partition_offset(
&self,
partition_index: usize,
Expand All @@ -341,31 +347,53 @@ impl Collection {
return Ok(None);
};

match partition.spec.suspend {
Some(suspend) if suspend.level == journal_spec::suspend::Level::Full as i32 => {
return Ok(Some(PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, //UNKNOWN_TIMESTAMP
}));
let offset_data = match timestamp_millis {
// Request latest offset
-1 => {
match partition.spec.suspend {
Some(suspend)
if suspend.level == journal_spec::suspend::Level::Full as i32
|| suspend.level == journal_spec::suspend::Level::Partial as i32 =>
{
PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, // UNKNOWN_TIMESTAMP
}
}
_ => self
.fetch_write_head(partition_index)
.await?
.unwrap_or_default(),
}
}
// Request earliest offset
-2 => {
match partition.spec.suspend {
Some(suspend) if suspend.level == journal_spec::suspend::Level::Full as i32 => {
PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, // UNKNOWN_TIMESTAMP
}
}
_ => self
.fetch_earliest_offset(partition_index)
.await?
.unwrap_or_default(),
}
}
_ => {
let (not_before_sec, _) = self
.not_before
.map(|not_before| not_before.to_unix())
.unwrap_or((0, 0));

let begin_mod_time = if timestamp_millis == -1 {
i64::MAX // Sentinel for "largest available offset",
} else if timestamp_millis == -2 {
0 // Sentinel for "first available offset"
let timestamp = timestamp_millis / 1_000;
let begin_mod_time = if timestamp < not_before_sec as i64 {
not_before_sec as i64
} else {
let timestamp = timestamp_millis / 1_000;
if timestamp < not_before_sec as i64 {
not_before_sec as i64
} else {
timestamp as i64
}
timestamp as i64
};

let request = broker::FragmentsRequest {
Expand All @@ -376,38 +404,87 @@ impl Collection {
};
let response = self.journal_client.list_fragments(request).await?;

let offset_data = match response.fragments.get(0) {
match response.fragments.get(0) {
Some(broker::fragments_response::Fragment {
spec: Some(spec), ..
}) => {
if timestamp_millis == -1 {
PartitionOffset {
fragment_start: spec.begin,
// Subtract one to reflect the largest fetch-able offset of the fragment.
offset: spec.end - 1,
mod_time: spec.mod_time,
}
} else {
PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
}
}
}
}) => PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
},
_ => PartitionOffset::default(),
};
}
}
};

tracing::debug!(
collection = self.spec.name,
?offset_data,
partition_index,
timestamp_millis,
"fetched offset"
);

tracing::debug!(
collection = self.spec.name,
?offset_data,
partition_index,
timestamp_millis,
"fetched offset"
);
return Ok(Some(offset_data));
}

return Ok(Some(offset_data));
}
async fn fetch_earliest_offset(
&self,
partition_index: usize,
) -> anyhow::Result<Option<PartitionOffset>> {
let Some(partition) = self.partitions.get(partition_index) else {
return Ok(None);
};

let request = broker::FragmentsRequest {
journal: partition.spec.name.clone(),
begin_mod_time: 0, // Fetch earliest offset
page_limit: 1,
..Default::default()
};
let response = self.journal_client.list_fragments(request).await?;

match response.fragments.get(0) {
Some(broker::fragments_response::Fragment {
spec: Some(spec), ..
}) => Ok(Some(PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
})),
_ => Ok(None),
}
}

/// Fetch the write head of a journal by issuing a non-blocking read request at offset -1
async fn fetch_write_head(
&self,
partition_index: usize,
) -> anyhow::Result<Option<PartitionOffset>> {
let Some(partition) = self.partitions.get(partition_index) else {
return Ok(None);
};

let request = broker::ReadRequest {
journal: partition.spec.name.clone(),
offset: -1, // Fetch write head
..Default::default()
};
let response_stream = self.journal_client.clone().read(request);
tokio::pin!(response_stream);
let response = response_stream.next().await;

match response {
Some(Ok(ReadResponse {
write_head,
fragment,
..
})) => Ok(Some(PartitionOffset {
fragment_start: fragment.map(|f| f.begin).unwrap_or(0),
offset: write_head,
mod_time: -1,
})),
_ => Ok(None),
}
}

Expand Down
Loading