Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
  • Loading branch information
niclaflamme committed Aug 8, 2025
commit 6f88b4200b9daeeb8410c599b9faaf5a03816518
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pgdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ indexmap = "2.9"
lru = "0.16"
hickory-resolver = "0.25.2"
lazy_static = "1"
smallvec = "1.15.1"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
Expand Down
67 changes: 38 additions & 29 deletions pgdog/src/frontend/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ use std::time::Instant;

use bytes::BytesMut;
use engine::EngineContext;
use smallvec::SmallVec;
use timeouts::Timeouts;
use tokio::time::timeout;
use tokio::{select, spawn};
use tracing::{debug, enabled, error, info, trace, Level as LogLevel};

use super::logical_transaction::{LogicalTransaction, TransactionStatus};
use super::logical_transaction::{LogicalTransaction, TransactionError, TransactionStatus};
use super::{Buffer, Command, Comms, Error, PreparedStatements};

use crate::auth::{md5, scram::Server};
use crate::backend::{
databases,
pool::{Connection, Request},
};
use crate::config::{self, AuthType};
use crate::frontend::buffer::BufferedQuery;
use crate::frontend::logical_transaction::TransactionError;
#[cfg(debug_assertions)]
use crate::frontend::QueryLogger;
use crate::net::messages::{
Expand Down Expand Up @@ -645,17 +646,18 @@ impl Client {
// ReadyForQuery (B)
if code == 'Z' {
inner.stats.query();
// 1) Should we logically be in‐txn?
// In transaction if buffered BEGIN from client or server is telling us we are.
// 1) Does the backend server say we're in a transaction?
let should_be_tx = message.in_transaction() || inner.start_transaction.is_some();

// 2) Is the frontend client in a logical transaction?
let in_transaction = self.logical_transaction.in_transaction();

// 2) Reconcile against our LogicalTransaction
// 3) Reconcile against our LogicalTransaction
if should_be_tx && !in_transaction {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably clarify that the query parser isn't always active and therefore we don't catch BEGIN statements always. So sometimes, the only way to know we're inside a transaction is because the server told us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, I also think it might be cheap to check twice. I'll check 👀

self.logical_transaction.soft_begin()?;
}
if !should_be_tx && in_transaction {
self.logical_transaction.reset(); // COMMIT/ROLLBACK just happened?
self.logical_transaction.reset();
}

inner.stats.idle(self.logical_transaction.in_transaction());
Expand Down Expand Up @@ -786,15 +788,27 @@ impl Client {

/// Tell the client we started a transaction.
async fn start_transaction(&mut self) -> Result<(), Error> {
self.logical_transaction.soft_begin()?;
// stack‐allocate up to 3 messages: optional NOTICE + BEGIN + Ready
let mut messages: SmallVec<[Message; 3]> = SmallVec::new();

self.stream
.send_many(&[
CommandComplete::new_begin().message()?.backend(),
ReadyForQuery::in_transaction(true).message()?,
])
.await?;
match self.logical_transaction.soft_begin() {
Err(TransactionError::ExpectedActive) => {
let notice = NoticeResponse::from(ErrorResponse::already_in_transaction())
.message()?
.backend();

messages.push(notice);
}
Err(e) => return Err(e.into()), // any other error is fatal
Ok(()) => {}
}

// push the BEGIN + in-transaction ready
messages.push(CommandComplete::new_begin().message()?.backend());
messages.push(ReadyForQuery::in_transaction(true).message()?.backend());

// send all messages
self.stream.send_many(&messages).await?;
debug!("transaction started");

Ok(())
Expand All @@ -805,41 +819,36 @@ impl Client {
/// This avoids connecting to servers when clients start and commit transactions
/// with no queries.
async fn end_transaction(&mut self, rollback: bool) -> Result<(), Error> {
// attempt to commit or rollback the logical transaction
// stack‐allocate up to 3 messages: NOTICE + COMMIT/ROLLBACK + READY
let mut messages: SmallVec<[Message; 3]> = SmallVec::new();

let logical_result = if rollback {
self.logical_transaction.rollback()
} else {
self.logical_transaction.commit()
};

match logical_result {
// no transaction in progress → send a NOTICE and READY
Err(TransactionError::ExpectedActive) => {
let notice = NoticeResponse::from(ErrorResponse::no_transaction())
.message()?
.backend();
let ready = ReadyForQuery::idle().message()?.backend();
self.stream.send_many(&[notice, ready]).await?;
return Ok(());
messages.push(
NoticeResponse::from(ErrorResponse::no_transaction())
.message()?
.backend(),
);
}
// any other logical‐txn error is fatal
Err(e) => return Err(e.into()),
// on Ok, fall through to send CommandComplete
Ok(()) => {}
}

// build and send COMMIT/ROLLBACK + READY
let cmd = if rollback {
CommandComplete::new_rollback()
} else {
CommandComplete::new_commit()
};
messages.push(cmd.message()?.backend());
messages.push(ReadyForQuery::idle().message()?.backend());

let complete_msg = cmd.message()?.backend();
let ready_msg = ReadyForQuery::idle().message()?.backend();

self.stream.send_many(&[complete_msg, ready_msg]).await?;

self.stream.send_many(&messages).await?;
debug!("transaction ended");
Ok(())
}
Expand Down
10 changes: 10 additions & 0 deletions pgdog/src/net/messages/error_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ impl ErrorResponse {
..Default::default()
}
}

/// Warning for issuing BEGIN inside an existing transaction.
pub fn already_in_transaction() -> ErrorResponse {
ErrorResponse {
severity: "WARNING".into(),
code: "25001".into(),
message: "there is already a transaction in progress".into(),
..Default::default()
}
}
}

impl Display for ErrorResponse {
Expand Down
Loading