Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
wip
  • Loading branch information
niclaflamme committed Aug 12, 2025
commit b8f5fe672381540611d0ca101ca2810be5aba0f6
1 change: 1 addition & 0 deletions pgdog/src/backend/pool/connection/binding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl Binding {

Binding::Admin(backend) => Ok(backend.read().await?),
Binding::MultiShard(shards, state) => {
println!("2.1");
if shards.is_empty() {
loop {
debug!("multi-shard binding suspended");
Expand Down
86 changes: 59 additions & 27 deletions pgdog/src/frontend/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Instant;

use bytes::BytesMut;
use engine::EngineContext;
use pg_query::protobuf::PartitionElem;
use smallvec::SmallVec;
use timeouts::Timeouts;
use tokio::time::timeout;
Expand Down Expand Up @@ -373,9 +374,19 @@ impl Client {

/// Handle client messages.
async fn client_messages(&mut self, mut inner: InnerBorrow<'_>) -> Result<bool, Error> {
println!("");
println!("");
println!("Starting Inner.router.route: {}", inner.router.route());
// We don't start a transaction on the servers until a client is actually executing something.
// This prevents us holding open connections to multiple servers
if self.should_trigger_buffered_begin() {
println!("");
println!("****************************************");
println!("****************************************");
println!("****************************************");
println!("****************************************");
println!("****************************************");
println!("");
inner.backend.begin().await?;
self.logical_transaction.record_begin();
}

inner
.stats
Expand Down Expand Up @@ -439,6 +450,14 @@ impl Client {
}
};

println!("");
println!("COMMAND: {:?}", command);
println!("-- connected? {}", connected);

// AAAAAAA
// I decided that transactions keep the connection open. which makes total sense.
// how can we release a transaction to the connection pool and have it be reused by another client?

if !connected {
// Simulate transaction starting
// until client sends an actual query.
Expand All @@ -450,6 +469,13 @@ impl Client {
// to a shard.
//
match command {
Some(Command::CommitTransaction) => {
println!("HELLOOOOOOOO?");
self.end_transaction(false).await?;

inner.done(self.in_transaction());
return Ok(false);
}
Some(Command::StartTransaction(query)) => {
if let BufferedQuery::Query(_) = query {
self.start_transaction().await?;
Expand All @@ -464,12 +490,6 @@ impl Client {
inner.done(self.in_transaction());
return Ok(false);
}
Some(Command::CommitTransaction) => {
self.end_transaction(false).await?;

inner.done(self.in_transaction());
return Ok(false);
}
// How many shards are configured.
Some(Command::Shards(shards)) => {
let rd = RowDescription::new(&[Field::bigint("shards")]);
Expand Down Expand Up @@ -499,8 +519,6 @@ impl Client {

Some(Command::Query(route)) => {
if self.in_transaction() {
println!("I am in a transaction!");
println!("shard: {}", route.shard().clone());
let shard = route.shard().clone();
self.logical_transaction.execute_query(shard)?;
}
Expand Down Expand Up @@ -580,14 +598,6 @@ impl Client {

println!("Inner.router.route: {}", inner.router.route());

// We don't start a transaction on the servers until
// a client is actually executing something.
//
// This prevents us holding open connections to multiple servers
if self.should_trigger_buffered_begin() {
inner.backend.begin().await?;
}

for msg in self.request_buffer.iter() {
if let ProtocolMessage::Bind(bind) = msg {
inner.backend.bind(bind)?
Expand Down Expand Up @@ -635,6 +645,11 @@ impl Client {
let message = message.backend();
let has_more_messages = inner.backend.has_more_messages();

println!("");
println!("");
println!("");
println!("BACKKEND: \n{:?}", message);

// Messages that we need to send to the client immediately.
// ReadyForQuery (B) | CopyInResponse (B) | ErrorResponse(B) | NoticeResponse(B) | NotificationResponse (B)
let flush = matches!(code, 'Z' | 'G' | 'E' | 'N' | 'A')
Expand All @@ -658,19 +673,20 @@ impl Client {

inner.stats.sent(message.len());

println!("server_message: 1");

// Release the connection back into the pool
// before flushing data to client.
// Flushing can take a minute and we don't want to block
// the connection from being reused.
if inner.backend.done() {
let changed_params = inner.backend.changed_params();
if inner.transaction_mode() && !self.replication_mode {
if inner.transaction_mode() && !self.replication_mode && !self.in_transaction() {
inner.disconnect();
}

let changed_params = inner.backend.changed_params();

inner.stats.transaction();
inner.reset_router();

debug!(
"transaction finished [{:.3}ms]",
inner.stats.last_transaction_time.as_secs_f64() * 1000.0
Expand All @@ -695,12 +711,9 @@ impl Client {

// Pooler is offline or the client requested to disconnect and the transaction is done.
if inner.backend.done() && (inner.comms.offline() || self.shutdown) && !self.admin {
println!("i am exiting");
return Ok(true);
}

println!("i am exiting");

Ok(false)
}

Expand Down Expand Up @@ -812,6 +825,7 @@ impl Client {
/// This avoids connecting to servers when clients start and commit transactions
/// with no queries.
async fn end_transaction(&mut self, rollback: bool) -> Result<(), Error> {
println!("ENDING??? ---");
// stack‐allocate up to 3 messages: NOTICE + COMMIT/ROLLBACK + READY
let mut messages: SmallVec<[Message; 3]> = SmallVec::new();

Expand All @@ -821,6 +835,8 @@ impl Client {
self.logical_transaction.commit()
};

println!("ENDING --- {:?}", logical_result);

match logical_result {
Err(TransactionError::ExpectedActive) => {
messages.push(
Expand Down Expand Up @@ -882,7 +898,23 @@ impl Client {
}

fn should_trigger_buffered_begin(&self) -> bool {
self.request_buffer.executable() && !self.in_transaction()
let executable = self.request_buffer.executable();
let should_trigger_begin = self.logical_transaction.should_trigger_begin();

println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("buffer: {:?}", self.request_buffer);
println!("");
println!(
"Executable: {}, should_trigger_begin: {}",
executable, should_trigger_begin
);

executable && should_trigger_begin
}
}

Expand Down
36 changes: 26 additions & 10 deletions pgdog/src/frontend/client/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,29 @@ async fn test_lock_session() {
async fn test_transaction_state() {
let (mut conn, mut client, mut inner) = new_client!(true);

println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");
println!("");

conn.write_all(&buffer!({ Query::new("BEGIN") }))
.await
.unwrap();
Expand Down Expand Up @@ -496,34 +519,26 @@ async fn test_transaction_state() {
.await
.unwrap();

assert!(!inner.router.routed());
client.buffer(&State::Idle).await.unwrap();
client.client_messages(inner.get()).await.unwrap();
assert!(inner.router.routed());

for c in ['2', 'D', 'C', 'Z'] {
let msg = inner.backend.read().await.unwrap();
assert_eq!(msg.code(), c);

println!("Before: {}: routed?{}", c, inner.router.routed());

client.server_message(&mut inner.get(), msg).await.unwrap();

println!("After: {}: routed? {}", c, inner.router.routed());
}

read!(conn, ['2', 'D', 'C', 'Z']);

// assert!(inner.router.routed());
// assert!(client.in_transaction());
assert!(client.in_transaction());
assert!(inner.router.route().is_write());
println!("1.");

conn.write_all(&buffer!({ Query::new("COMMIT") }))
.await
.unwrap();

println!("2.");
assert!(client.in_transaction());

client.buffer(&State::Idle).await.unwrap();

Expand All @@ -536,6 +551,7 @@ async fn test_transaction_state() {
for c in ['C', 'Z'] {
println!("3.1");
let msg = inner.backend.read().await.unwrap();
println!("mssage: {:?}", &msg);
assert_eq!(msg.code(), c);

println!("3.2");
Expand Down
13 changes: 12 additions & 1 deletion pgdog/src/frontend/logical_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use super::router::parser::Shard;
#[derive(Debug)]
pub struct LogicalTransaction {
pub status: TransactionStatus,
begin_dispatched: bool,
manual_shard: Option<Shard>,
dirty_shard: Option<Shard>,
}
Expand All @@ -45,6 +46,7 @@ impl LogicalTransaction {
pub fn new() -> Self {
Self {
status: TransactionStatus::Idle,
begin_dispatched: false,
manual_shard: None,
dirty_shard: None,
}
Expand Down Expand Up @@ -164,10 +166,19 @@ impl LogicalTransaction {
/// Sets status to `Idle`, clears manual and dirty shard
/// Safe to call in any state.
pub fn reset(&mut self) {
println!("RESET");
self.status = TransactionStatus::Idle;
self.manual_shard = None;
self.dirty_shard = None;
self.begin_dispatched = false;
}

/// TODO
pub fn record_begin(&mut self) {
self.begin_dispatched = true;
}

pub fn should_trigger_begin(&self) -> bool {
self.in_transaction() && !self.begin_dispatched
}

/// Pin the transaction to a specific shard.
Expand Down
Loading