Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 156 additions & 13 deletions tansu-schema-registry/src/lake/berg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use async_trait::async_trait;
use iceberg::{
Catalog, NamespaceIdent, TableCreation, TableIdent,
io::{FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY},
spec::{DataFileFormat, Schema},
spec::{DataFileFormat, Schema, Transform, UnboundPartitionField, UnboundPartitionSpec},
table::Table,
transaction::Transaction,
writer::{
Expand Down Expand Up @@ -163,7 +163,12 @@ impl Iceberg {
Ok(namespace_ident)
}

async fn load_or_create_table(&self, name: &str, schema: Schema) -> Result<Table> {
async fn load_or_create_table(
&self,
name: &str,
schema: Schema,
config: Config,
) -> Result<Table> {
if let Some(table) = self.tables.lock().map(|guard| guard.get(name).cloned())? {
return Ok(table);
}
Expand All @@ -177,14 +182,20 @@ impl Iceberg {
.await
.inspect_err(|err| debug!(?err))?
} else {
let partition_spec = UnboundPartitionSpec::builder()
.add_partition_fields(config.partition_fields(&schema))
.map(|builder| builder.build())
.and_then(|partition_spec| partition_spec.bind(schema.clone()))
.inspect(|partition_spec| debug!(?partition_spec))?;

let table_creation = TableCreation::builder()
.name(name.into())
.schema(schema.clone())
.partition_spec(partition_spec)
.build();

self.catalog
.create_table(
&namespace_ident,
TableCreation::builder()
.name(name.into())
.schema(schema.clone())
.build(),
)
.create_table(&namespace_ident, table_creation)
.await
.inspect(|table| debug!(?table))
.inspect_err(|err| debug!(?err))?
Expand Down Expand Up @@ -224,7 +235,7 @@ impl LakeHouse for Iceberg {
.inspect_err(|err| debug!(?err))?;

let table = self
.load_or_create_table(topic, schema.clone())
.load_or_create_table(topic, schema, Config::from(config))
.await
.inspect(|table| {
for field in table.metadata().current_schema().as_struct().fields() {
Expand Down Expand Up @@ -288,13 +299,64 @@ impl LakeHouse for Iceberg {
}
}

#[derive(Clone, Debug, Default)]
struct Config(Vec<(String, String)>);

impl From<DescribeConfigsResult> for Config {
fn from(config: DescribeConfigsResult) -> Self {
Self(config.configs.map_or(vec![], |configs| {
configs
.into_iter()
.filter_map(|config| config.value.map(|value| (config.name, value)))
.collect::<Vec<(String, String)>>()
}))
}
}

impl Config {
fn as_columns(&self, name: &str) -> Vec<String> {
self.0
.iter()
.flat_map(|(key, value)| {
if key == name {
value
.split(",")
.map(str::trim)
.map(String::from)
.collect::<Vec<_>>()
.into_iter()
} else {
vec![].into_iter()
}
})
.collect::<Vec<_>>()
}

fn partition_fields(&self, schema: &Schema) -> impl Iterator<Item = UnboundPartitionField> {
self.as_columns("tansu.lake.partition")
.into_iter()
.filter_map(|field_name| {
schema.field_by_name(&field_name).map(|field| {
UnboundPartitionField::builder()
.source_id(field.id)
.name(field_name)
.transform(Transform::Identity)
.build()
})
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use dotenv::dotenv;
use iceberg::spec::{NestedField, PrimitiveType, Type};
use rand::{distr::Alphanumeric, prelude::*, rng};
use std::{env::var, fs::File, marker::PhantomData, sync::Arc, thread};
use tansu_kafka_sans_io::{
ConfigResource, ErrorCode, describe_configs_response::DescribeConfigsResourceResult,
};
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -416,10 +478,20 @@ mod tests {

let table_name = alphanumeric_string(5);

let table = lake_house.load_or_create_table(&table_name, schema).await?;
let table = lake_house
.load_or_create_table(&table_name, schema, Config::default())
.await?;
assert_eq!(table_name, table.identifier().name());
assert_eq!(namespace, table.identifier().namespace().to_url_string());

assert!(
table
.metadata()
.default_partition_type()
.fields()
.is_empty()
);

Ok(())
}

Expand Down Expand Up @@ -453,7 +525,7 @@ mod tests {
)?;

let table = lake_house
.load_or_create_table(&table_name, schema.clone())
.load_or_create_table(&table_name, schema.clone(), Config::default())
.await?;
assert_eq!(table_name, table.identifier().name());
assert_eq!(namespace, table.identifier().namespace().to_url_string());
Expand All @@ -467,11 +539,82 @@ mod tests {
.namespace(Some(namespace.clone())),
)?;

let table = lake_house.load_or_create_table(&table_name, schema).await?;
let table = lake_house
.load_or_create_table(&table_name, schema, Config::default())
.await?;
assert_eq!(table_name, table.identifier().name());
assert_eq!(namespace, table.identifier().namespace().to_url_string());
}

Ok(())
}

#[tokio::test]
async fn create_partitioned_table() -> Result<()> {
dotenv().ok();
let _guard = init_tracing()?;

let catalog_uri = &var("ICEBERG_CATALOG").unwrap_or("http://localhost:8181".into())[..];
let location_uri = &var("DATA_LAKE").unwrap_or("s3://lake".into())[..];
let namespace = alphanumeric_string(5);

let lake_house = Iceberg::try_from(
Builder::<PhantomData<Url>, PhantomData<Url>>::default()
.location(Url::parse(location_uri)?)
.catalog(Url::parse(catalog_uri)?)
.namespace(Some(namespace.clone())),
)?;

let schema = Schema::builder()
.with_fields(vec![
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
NestedField::optional(4, "pqr", Type::Primitive(PrimitiveType::Timestamp)).into(),
])
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
.build()?;

let topic = alphanumeric_string(5);

let partition = String::from("bar");

let config = DescribeConfigsResult {
error_code: ErrorCode::None.into(),
error_message: None,
resource_type: ConfigResource::Topic.into(),
resource_name: topic.clone(),
configs: Some(vec![DescribeConfigsResourceResult {
name: String::from("tansu.lake.partition"),
value: Some(partition.clone()),
read_only: true,
is_default: None,
config_source: None,
is_sensitive: false,
synonyms: None,
config_type: None,
documentation: None,
}]),
};

let table = lake_house
.load_or_create_table(&topic, schema, Config::from(config))
.await?;
assert_eq!(topic, table.identifier().name());
assert_eq!(namespace, table.identifier().namespace().to_url_string());

assert_eq!(
vec![partition],
table
.metadata()
.default_partition_type()
.fields()
.iter()
.map(|field| field.name.clone())
.collect::<Vec<_>>()
);

Ok(())
}
}