Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
//! This module contains the iceberg REST catalog implementation.

use std::any::Any;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::str::FromStr;
use std::sync::OnceLock;

use async_trait::async_trait;
use iceberg::io::{self, FileIO};
Expand All @@ -37,6 +38,7 @@ use reqwest::{Client, Method, StatusCode, Url};
use tokio::sync::OnceCell;
use typed_builder::TypedBuilder;

use crate::Endpoint;
use crate::client::{
HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
};
Expand All @@ -55,6 +57,33 @@ const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
const PATH_V1: &str = "v1";

static DEFAULT_ENDPOINTS: OnceLock<HashSet<Endpoint>> = OnceLock::new();

fn default_endpoints() -> &'static HashSet<Endpoint> {
DEFAULT_ENDPOINTS.get_or_init(|| {
[
Endpoint::v1_config(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can remove config from this list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! good point, it's implicit that config is supported.

Endpoint::v1_list_namespaces(),
Endpoint::v1_create_namespace(),
Endpoint::v1_load_namespace(),
Endpoint::v1_update_namespace(),
Endpoint::v1_delete_namespace(),
Endpoint::v1_list_tables(),
Endpoint::v1_create_table(),
Endpoint::v1_load_table(),
Endpoint::v1_update_table(),
Endpoint::v1_delete_table(),
Endpoint::v1_rename_table(),
Endpoint::v1_register_table(),
Endpoint::v1_report_metrics(),
Endpoint::v1_commit_transaction(),
]
.into_iter()
.cloned()
.collect()
})
}

/// Builder for [`RestCatalog`].
#[derive(Debug)]
pub struct RestCatalogBuilder(RestCatalogConfig);
Expand All @@ -67,6 +96,7 @@ impl Default for RestCatalogBuilder {
warehouse: None,
props: HashMap::new(),
client: None,
endpoints: default_endpoints().clone(),
})
}
}
Expand Down Expand Up @@ -142,6 +172,9 @@ pub(crate) struct RestCatalogConfig {

#[builder(default)]
client: Option<Client>,

#[builder(default)]
endpoints: HashSet<Endpoint>,
}

impl RestCatalogConfig {
Expand Down Expand Up @@ -304,6 +337,13 @@ impl RestCatalogConfig {
props.extend(config.overrides);

self.props = props;
self.endpoints = if config.endpoints.is_empty() {
default_endpoints().clone()
} else {
eprintln!("Endpoints are {:?}", config.endpoints);
config.endpoints
};

self
}
}
Expand Down Expand Up @@ -353,7 +393,6 @@ impl RestCatalog {
let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
let config = self.user_config.clone().merge_with_config(catalog_config);
let client = client.update_with(&config)?;

Ok(RestContext { config, client })
})
.await
Expand Down Expand Up @@ -442,6 +481,7 @@ impl Catalog for RestCatalog {
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_list_namespaces())?;
let endpoint = context.config.namespaces_endpoint();
let mut namespaces = Vec::new();
let mut next_token = None;
Expand Down Expand Up @@ -492,6 +532,7 @@ impl Catalog for RestCatalog {
properties: HashMap<String, String>,
) -> Result<Namespace> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_create_namespace())?;

let request = context
.client
Expand Down Expand Up @@ -520,6 +561,7 @@ impl Catalog for RestCatalog {

async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_load_namespace())?;

let request = context
.client
Expand All @@ -544,6 +586,7 @@ impl Catalog for RestCatalog {

async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_namespace_exists())?;

let request = context
.client
Expand Down Expand Up @@ -572,6 +615,7 @@ impl Catalog for RestCatalog {

async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_delete_namespace())?;

let request = context
.client
Expand All @@ -592,6 +636,7 @@ impl Catalog for RestCatalog {

async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_list_tables())?;
let endpoint = context.config.tables_endpoint(namespace);
let mut identifiers = Vec::new();
let mut next_token = None;
Expand Down Expand Up @@ -642,6 +687,7 @@ impl Catalog for RestCatalog {
creation: TableCreation,
) -> Result<Table> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_create_table())?;

let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());

Expand Down Expand Up @@ -714,6 +760,15 @@ impl Catalog for RestCatalog {
/// provided locally to the `RestCatalog` will take precedence.
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
let context = self.context().await?;
eprintln!(
"DEBUG: endpoints value in load_table: {:?}",
context.config.endpoints
);
eprintln!(
"DEBUG: looking for endpoint: {:?}",
Endpoint::v1_load_table()
);
Comment on lines +763 to +770
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to clean this up

Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_load_table())?;

let request = context
.client
Expand Down Expand Up @@ -760,6 +815,7 @@ impl Catalog for RestCatalog {
/// Drop a table from the catalog.
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_delete_table())?;

let request = context
.client
Expand All @@ -781,6 +837,7 @@ impl Catalog for RestCatalog {
/// Check if a table exists in the catalog.
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_table_exists())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may break compatibility with older implementations where table existence checks happened through LoadTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's why I left this in draft, I wanted to think through more how to handle this particular case. We have a choice here, either we say that the rust client needs to work with those older servers so we take the complexity of not doing this strict check, or we opt for a cleaner solution which has some stricter assumptions.

I don't think it's too much extra work to handle the compatibility so I'd probably just address it.


let request = context
.client
Expand All @@ -799,6 +856,7 @@ impl Catalog for RestCatalog {
/// Rename a table in the catalog.
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_rename_table())?;

let request = context
.client
Expand Down Expand Up @@ -831,6 +889,7 @@ impl Catalog for RestCatalog {
metadata_location: String,
) -> Result<Table> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_register_table())?;

let request = context
.client
Expand Down Expand Up @@ -885,6 +944,7 @@ impl Catalog for RestCatalog {

async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
let context = self.context().await?;
Endpoint::check_supported(&context.config.endpoints, Endpoint::v1_update_table())?;

let request = context
.client
Expand Down
Loading
Loading