Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
initial commit
  • Loading branch information
tingold committed May 28, 2025
commit 5fe831d82d73fdcaf07e80d8ed81c43047223f2b
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pgdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ uuid = { version = "1", features = ["v4"] }
url = "2"
ratatui = { version = "0.30.0-alpha.1", optional = true }
rmp-serde = "1"
chrono = "0.4"
chrono = { version = "0.4", features = ["serde"] }
hyper = { version = "1", features = ["full"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
Expand Down
3 changes: 3 additions & 0 deletions pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ mod test {
data_type: DataType::Bigint,
centroids_path: None,
centroid_probes: 1,
sharding_method: None,
shard_range_map: None,
shard_list_map: None
}],
vec!["sharded_omni".into()],
false,
Expand Down
15 changes: 14 additions & 1 deletion pgdog/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod convert;
pub mod error;
pub mod overrides;
pub mod url;
mod shards;

use error::Error;
pub use overrides::Overrides;
Expand All @@ -21,6 +22,7 @@ use serde::{Deserialize, Serialize};
use tracing::info;
use tracing::warn;

pub(crate) use crate::config::shards::{ShardingMethod, ShardListMap, ShardRangeMap};
use crate::net::messages::Vector;
use crate::util::{human_duration_optional, random_string};

Expand Down Expand Up @@ -826,6 +828,12 @@ pub struct ShardedTable {
/// How many centroids to probe.
#[serde(default)]
pub centroid_probes: usize,
#[serde(default)]
pub sharding_method: Option<ShardingMethod>,

pub shard_range_map: Option<ShardRangeMap>,

pub shard_list_map: Option<ShardListMap>
}

impl ShardedTable {
Expand Down Expand Up @@ -865,6 +873,10 @@ pub enum DataType {
Bigint,
Uuid,
Vector,
// TODO: implement more types?
// String,
// DateTimeUTC
// Float
}

#[derive(Serialize, Deserialize, PartialEq, Debug, Clone, Default)]
Expand Down Expand Up @@ -955,8 +967,8 @@ pub struct MultiTenant {

#[cfg(test)]
pub mod test {

use crate::backend::databases::init;

use super::*;

pub fn load_test() {
Expand Down Expand Up @@ -1052,4 +1064,5 @@ column = "tenant_id"
assert_eq!(config.tcp.retries().unwrap(), 5);
assert_eq!(config.multi_tenant.unwrap().column, "tenant_id");
}

}
230 changes: 230 additions & 0 deletions pgdog/src/config/shards.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use std::collections::HashMap;

use serde::{Deserialize, Deserializer, Serialize, Serializer};

use crate::frontend::router::parser::{Shard};


// =============================================================================
// Serialization Helper Module
// =============================================================================

/// Helper module for (de)serializing maps with usize keys as strings
mod usize_map_keys_as_strings {
use super::*;

pub fn serialize<S, V>(map: &HashMap<usize, V>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
V: Serialize,
{
let string_map: HashMap<String, &V> = map
.iter()
.map(|(k, v)| (k.to_string(), v))
.collect();
string_map.serialize(serializer)
}

pub fn deserialize<'de, D, V>(deserializer: D) -> Result<HashMap<usize, V>, D::Error>
where
D: Deserializer<'de>,
V: Deserialize<'de>,
{
let string_map = HashMap::<String, V>::deserialize(deserializer)?;
string_map
.into_iter()
.map(|(s, v)| {
s.parse::<usize>()
.map(|k| (k, v))
.map_err(serde::de::Error::custom)
})
.collect()
}
}

// =============================================================================
// Core Sharding Types
// =============================================================================

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "snake_case")]
pub enum ShardingMethod {
#[default]
Hash,
Range,
List,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ShardRange {
pub start: Option<i64>,
pub end: Option<i64>,
#[serde(default)]
pub no_max: bool,
#[serde(default)]
pub no_min: bool,
}

impl ShardRange {
/// Check if a value falls within this range
pub fn contains(&self, value: i64) -> bool {
// Check lower bound
if !self.no_min {
if let Some(start) = self.start {
if value < start {
return false;
}
}
}

// Check upper bound
if !self.no_max {
if let Some(end) = self.end {
if value >= end {
// Using >= for exclusive upper bound
return false;
}
}
}

true
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ShardList {
pub values: Vec<i64>,
}

impl ShardList {
/// Check if a value is contained in this list
pub fn contains(&self, value: i64) -> bool {
self.values.contains(&value)
}
}

// =============================================================================
// Shard Map Types
// =============================================================================

/// A map of shard IDs to their range definitions
#[derive(Debug, Clone, PartialEq)]
pub struct ShardRangeMap(pub HashMap<usize, ShardRange>);

impl ShardRangeMap {
pub fn new() -> Self {
Self::default()
}

/// Find the shard key for a given value based on range containment
pub fn find_shard_key(&self, value: i64) -> Option<Shard> {
for (shard_id, range) in &self.0 {
if range.contains(value) {
return Some(Shard::Direct(*shard_id));
}
}
None
}
}

impl Default for ShardRangeMap {
fn default() -> Self {
Self(HashMap::new())
}
}

impl Serialize for ShardRangeMap {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
usize_map_keys_as_strings::serialize(&self.0, serializer)
}
}

impl<'de> Deserialize<'de> for ShardRangeMap {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Ok(ShardRangeMap(usize_map_keys_as_strings::deserialize(
deserializer,
)?))
}
}

/// A map of shard IDs to their list definitions
#[derive(Debug, Clone, PartialEq)]
pub struct ShardListMap(pub HashMap<usize, ShardList>);

impl ShardListMap {
pub fn new() -> Self {
Self::default()
}

/// Find the shard key for a given value based on list containment
pub fn find_shard_key(&self, value: i64) -> Option<Shard> {
for (shard_id, list) in &self.0 {
if list.contains(value) {
return Some(Shard::Direct(*shard_id));
}
}
None
}
}

impl Default for ShardListMap {
fn default() -> Self {
Self(HashMap::new())
}
}

impl Serialize for ShardListMap {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
usize_map_keys_as_strings::serialize(&self.0, serializer)
}
}

impl<'de> Deserialize<'de> for ShardListMap {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Ok(ShardListMap(usize_map_keys_as_strings::deserialize(
deserializer,
)?))
}
}

// =============================================================================
// Shardable Trait and Implementations
// =============================================================================

/// Trait for types that can provide sharding functionality
pub trait Shardable {
/// Get the shard ID for a given value
fn shard(&self, value: i64) -> Shard;
}

impl Shardable for ShardRangeMap {
fn shard(&self, value: i64) -> Shard {
if self.0.is_empty() {
return Shard::All;
}

self.find_shard_key(value).unwrap_or_else(|| Shard::All)
}
}

impl Shardable for ShardListMap {
fn shard(&self, value: i64) -> Shard {
if self.0.is_empty() {
return Shard::All;
}

self.find_shard_key(value).unwrap_or_else(|| Shard::All)
}
}
12 changes: 10 additions & 2 deletions pgdog/src/frontend/router/sharding/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::frontend::router::parser::Shard;

use super::{Error, Operator, Value};

#[derive(Debug)]
Expand All @@ -16,7 +15,6 @@ impl<'a> Context<'a> {
return Ok(Shard::Direct(hash as usize % shards));
}
}

Operator::Centroids {
shards,
probes,
Expand All @@ -26,6 +24,16 @@ impl<'a> Context<'a> {
return Ok(centroids.shard(&vector, *shards, *probes));
}
}
Operator::Ranges(srm)=> {
if let Some(i) = self.value.int()? {
return Ok(srm.find_shard_key(i).unwrap())
}
}
Operator::Lists(slm)=> {
if let Some(i) = self.value.int()? {
return Ok(slm.find_shard_key(i).unwrap())
}
}
}

Ok(Shard::All)
Expand Down
Loading