Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
  • Loading branch information
niclaflamme committed Aug 7, 2025
commit 0097c3b5220ad608dc142452204708e12f156439
144 changes: 140 additions & 4 deletions pgdog/src/frontend/logical_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,9 @@ mod tests {
#[test]
fn test_set_manual_shard_after_touch_different_errors() {
let mut tx = LogicalTransaction::new();

// touch shard 0
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();

// manually set shard 1
let err = tx.set_manual_shard(Shard::Direct(1)).unwrap_err();
assert!(matches!(err, TransactionError::ShardConflict));
Expand All @@ -497,10 +495,8 @@ mod tests {
fn test_manual_then_dirty_conflict() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();

// pin to shard 0
tx.set_manual_shard(Shard::Direct(0)).unwrap();

// touching another shard must fail
let err = tx.execute_query(Shard::Direct(1)).unwrap_err();
assert!(matches!(err, TransactionError::ShardConflict));
Expand All @@ -527,6 +523,146 @@ mod tests {
tx.set_manual_shard(Shard::Direct(1)).unwrap();
assert_eq!(tx.active_shard(), Some(Shard::Direct(1)));
}

#[test]
fn test_rollback_from_idle_errors() {
let mut tx = LogicalTransaction::new();
let err = tx.rollback().unwrap_err();
assert!(matches!(err, TransactionError::NoPendingBegins));
}

#[test]
fn test_commit_after_rollback_errors() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
tx.rollback().unwrap();
let err = tx.commit().unwrap_err();
assert!(matches!(err, TransactionError::AlreadyFinalized));
}

#[test]
fn test_rollback_after_commit_errors() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
tx.commit().unwrap();
let err = tx.rollback().unwrap_err();
assert!(matches!(err, TransactionError::AlreadyFinalized));
}

#[test]
fn test_rollback_already_rolledback_errors() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
tx.rollback().unwrap();
let err = tx.rollback().unwrap_err();
assert!(matches!(err, TransactionError::AlreadyFinalized));
}

#[test]
fn test_execute_query_after_rollback_errors() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
tx.rollback().unwrap();
let err = tx.execute_query(Shard::Direct(0)).unwrap_err();
assert!(matches!(err, TransactionError::AlreadyFinalized));
}

#[test]
fn test_set_manual_shard_multiple_changes_before_execute() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.set_manual_shard(Shard::Direct(1)).unwrap();
tx.set_manual_shard(Shard::Direct(2)).unwrap();
assert_eq!(tx.manual_shard, Some(Shard::Direct(2)));
tx.execute_query(Shard::Direct(2)).unwrap();
let err = tx.execute_query(Shard::Direct(1)).unwrap_err();
assert!(matches!(err, TransactionError::ShardConflict));
}

#[test]
fn test_set_manual_shard_after_commit_same_ok() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
tx.commit().unwrap();
tx.set_manual_shard(Shard::Direct(0)).unwrap();
assert_eq!(tx.manual_shard, Some(Shard::Direct(0)));
}

#[test]
fn test_set_manual_shard_after_commit_different_errors() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
tx.commit().unwrap();
let err = tx.set_manual_shard(Shard::Direct(1)).unwrap_err();
assert!(matches!(err, TransactionError::ShardConflict));
}

#[test]
fn test_set_manual_shard_after_rollback_same_ok() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
tx.rollback().unwrap();
tx.set_manual_shard(Shard::Direct(0)).unwrap();
assert_eq!(tx.manual_shard, Some(Shard::Direct(0)));
}

#[test]
fn test_set_manual_shard_after_rollback_different_errors() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
tx.rollback().unwrap();
let err = tx.set_manual_shard(Shard::Direct(1)).unwrap_err();
assert!(matches!(err, TransactionError::ShardConflict));
}

#[test]
fn test_active_shard_none() {
let tx = LogicalTransaction::new();
assert_eq!(tx.active_shard(), None);
}

#[test]
fn test_set_manual_shard_in_idle() {
let mut tx = LogicalTransaction::new();
tx.set_manual_shard(Shard::Direct(0)).unwrap();
assert_eq!(tx.manual_shard, Some(Shard::Direct(0)));
}

#[test]
fn test_soft_begin_after_reset_from_finalized() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
tx.commit().unwrap();
tx.reset();
tx.soft_begin().unwrap();
assert_eq!(tx.status, TransactionStatus::BeginPending);
}

#[test]
fn test_active_shard_both_same() {
let mut tx = LogicalTransaction::new();
tx.set_manual_shard(Shard::Direct(3)).unwrap();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(3)).unwrap();
assert_eq!(tx.active_shard(), Some(Shard::Direct(3)));
}

#[test]
fn test_statements_executed_remains_zero_after_execute() {
let mut tx = LogicalTransaction::new();
tx.soft_begin().unwrap();
tx.execute_query(Shard::Direct(0)).unwrap();
assert_eq!(tx.statements_executed, 0);
}
}

// -----------------------------------------------------------------------------
Expand Down