Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
  • Loading branch information
niclaflamme committed Aug 11, 2025
commit a3f6d46b67d530c06a0a704720dbf468ca21dd45
15 changes: 0 additions & 15 deletions pgdog/src/frontend/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,16 +635,12 @@ impl Client {
let message = message.backend();
let has_more_messages = inner.backend.has_more_messages();

println!("\n --> ONE");

// Messages that we need to send to the client immediately.
// ReadyForQuery (B) | CopyInResponse (B) | ErrorResponse(B) | NoticeResponse(B) | NotificationResponse (B)
let flush = matches!(code, 'Z' | 'G' | 'E' | 'N' | 'A')
|| !has_more_messages
|| message.streaming();

println!("\n --> TWO");

// Server finished executing a query.
// ReadyForQuery (B)
if code == 'Z' {
Expand All @@ -660,16 +656,13 @@ impl Client {
}
}

println!("\n --> THREE");

inner.stats.sent(message.len());

// Release the connection back into the pool
// before flushing data to client.
// Flushing can take a minute and we don't want to block
// the connection from being reused.
if inner.backend.done() {
println!("\n --> FOUR");
let changed_params = inner.backend.changed_params();
if inner.transaction_mode() && !self.replication_mode {
inner.disconnect();
Expand All @@ -681,8 +674,6 @@ impl Client {
inner.stats.last_transaction_time.as_secs_f64() * 1000.0
);

println!("\n --> FIVE");

// Update client params with values
// sent from the server using ParameterStatus(B) messages.
if !changed_params.is_empty() {
Expand All @@ -694,23 +685,17 @@ impl Client {
}
}

println!("\n --> SIX");

if flush {
self.stream.send_flush(&message).await?;
} else {
self.stream.send(&message).await?;
}

println!("\n --> SEVEN");

// Pooler is offline or the client requested to disconnect and the transaction is done.
if inner.backend.done() && (inner.comms.offline() || self.shutdown) && !self.admin {
return Ok(true);
}

println!("\n --> EIGHT");

Ok(false)
}

Expand Down
24 changes: 5 additions & 19 deletions pgdog/src/frontend/client/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ async fn test_transaction_state() {

assert!(client.in_transaction());
assert!(inner.router.route().is_write());
assert!(client.logical_transaction.in_transaction());

conn.write_all(&buffer!(
{ Parse::named("test", "SELECT $1") },
Expand All @@ -471,7 +470,6 @@ async fn test_transaction_state() {
assert!(inner.router.routed());
assert!(client.in_transaction());
assert!(inner.router.route().is_write());
assert!(client.logical_transaction.in_transaction());

for c in ['1', 't', 'T', 'Z'] {
let msg = inner.backend.read().await.unwrap();
Expand Down Expand Up @@ -507,47 +505,35 @@ async fn test_transaction_state() {
let msg = inner.backend.read().await.unwrap();
assert_eq!(msg.code(), c);

println!("Loop -> {} :: pre-message {:?}", c, inner.router.route());
println!("Before: {}: routed?{}", c, inner.router.routed());

client.server_message(&mut inner.get(), msg).await.unwrap();
println!("Loop -> {} :: post-message {:?}", c, inner.router.route());
}

println!("Out of loop route {:?}", inner.router.route());
println!("After: {}: routed? {}", c, inner.router.routed());
}

read!(conn, ['2', 'D', 'C', 'Z']);

// assert!(inner.router.routed());
assert!(inner.router.routed());
assert!(client.in_transaction());
assert!(inner.router.route().is_write());
assert!(client.logical_transaction.in_transaction());

conn.write_all(&buffer!({ Query::new("COMMIT") }))
.await
.unwrap();

println!("I JUST COMMITED IN THE TESTS!");

client.buffer(&State::Idle).await.unwrap();
client.client_messages(inner.get()).await.unwrap();

println!("\nONE");

for c in ['C', 'Z'] {
println!("{} A", c);
let msg = inner.backend.read().await.unwrap();
assert_eq!(msg.code(), c);
println!("{} B", c);

client.server_message(&mut inner.get(), msg).await.unwrap();
println!("{} C", c);
}

println!("\nTWO");

read!(conn, ['C', 'Z']);

println!("\nTHRE");

assert!(!client.in_transaction());
assert!(!inner.router.routed());
}
Expand Down