Skip to content

Commit 0ab1ebc

Browse files
rabizanesq
authored andcommitted
fix: use BEGIN IMMEDIATE to prevent SQLite deadlocks (#7429)
Signed-off-by: rabi <ramishra@redhat.com>
1 parent 2ef804e commit 0ab1ebc

1 file changed

Lines changed: 99 additions & 8 deletions

File tree

crates/goose/src/session/session_manager.rs

Lines changed: 99 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ impl SessionStorage {
665665
}
666666

667667
async fn import_legacy_session(pool: &Pool<Sqlite>, session: &Session) -> Result<()> {
668-
let mut tx = pool.begin().await?;
668+
let mut tx = pool.begin_with("BEGIN IMMEDIATE").await?;
669669

670670
let recipe_json = match &session.recipe {
671671
Some(recipe) => Some(serde_json::to_string(recipe)?),
@@ -724,7 +724,7 @@ impl SessionStorage {
724724
}
725725

726726
async fn run_migrations(pool: &Pool<Sqlite>) -> Result<()> {
727-
let mut tx = pool.begin().await?;
727+
let mut tx = pool.begin_with("BEGIN IMMEDIATE").await?;
728728

729729
let current_version = Self::get_schema_version(&mut tx).await?;
730730

@@ -899,7 +899,7 @@ impl SessionStorage {
899899
session_type: SessionType,
900900
) -> Result<Session> {
901901
let pool = self.pool().await?;
902-
let mut tx = pool.begin().await?;
902+
let mut tx = pool.begin_with("BEGIN IMMEDIATE").await?;
903903

904904
let today = chrono::Utc::now().format("%Y%m%d").to_string();
905905
let session = sqlx::query_as(
@@ -1071,7 +1071,7 @@ impl SessionStorage {
10711071
}
10721072

10731073
let pool = self.pool().await?;
1074-
let mut tx = pool.begin().await?;
1074+
let mut tx = pool.begin_with("BEGIN IMMEDIATE").await?;
10751075
q = q.bind(&builder.session_id);
10761076
q.execute(&mut *tx).await?;
10771077

@@ -1116,7 +1116,7 @@ impl SessionStorage {
11161116

11171117
async fn add_message(&self, session_id: &str, message: &Message) -> Result<()> {
11181118
let pool = self.pool().await?;
1119-
let mut tx = pool.begin().await?;
1119+
let mut tx = pool.begin_with("BEGIN IMMEDIATE").await?;
11201120

11211121
let metadata_json = serde_json::to_string(&message.metadata)?;
11221122

@@ -1154,7 +1154,7 @@ impl SessionStorage {
11541154
session_id: &str,
11551155
conversation: &Conversation,
11561156
) -> Result<()> {
1157-
let mut tx = pool.begin().await?;
1157+
let mut tx = pool.begin_with("BEGIN IMMEDIATE").await?;
11581158

11591159
sqlx::query("DELETE FROM messages WHERE session_id = ?")
11601160
.bind(session_id)
@@ -1237,7 +1237,7 @@ impl SessionStorage {
12371237

12381238
async fn delete_session(&self, session_id: &str) -> Result<()> {
12391239
let pool = self.pool().await?;
1240-
let mut tx = pool.begin().await?;
1240+
let mut tx = pool.begin_with("BEGIN IMMEDIATE").await?;
12411241

12421242
let exists =
12431243
sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM sessions WHERE id = ?)")
@@ -1416,7 +1416,7 @@ impl SessionStorage {
14161416
) -> crate::conversation::message::MessageMetadata,
14171417
{
14181418
let pool = self.pool().await?;
1419-
let mut tx = pool.begin().await?;
1419+
let mut tx = pool.begin_with("BEGIN IMMEDIATE").await?;
14201420

14211421
let current_metadata_json = sqlx::query_scalar::<_, String>(
14221422
"SELECT metadata_json FROM messages WHERE message_id = ? AND session_id = ?",
@@ -1455,6 +1455,97 @@ mod tests {
14551455

14561456
const NUM_CONCURRENT_SESSIONS: i32 = 10;
14571457

1458+
async fn run_lock_upgrade_attempt(
1459+
pool: Pool<Sqlite>,
1460+
session_id: String,
1461+
begin_statement: &'static str,
1462+
worker_id: i32,
1463+
barrier: Option<Arc<tokio::sync::Barrier>>,
1464+
) -> anyhow::Result<()> {
1465+
let mut tx = pool.begin_with(begin_statement).await?;
1466+
1467+
sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM sessions WHERE id = ?")
1468+
.bind(&session_id)
1469+
.fetch_one(&mut *tx)
1470+
.await?;
1471+
1472+
if let Some(barrier) = barrier {
1473+
barrier.wait().await;
1474+
}
1475+
1476+
sqlx::query("UPDATE sessions SET total_tokens = ? WHERE id = ?")
1477+
.bind(worker_id)
1478+
.bind(&session_id)
1479+
.execute(&mut *tx)
1480+
.await?;
1481+
1482+
tx.commit().await?;
1483+
Ok(())
1484+
}
1485+
1486+
async fn run_lock_upgrade_race(
1487+
pool: Pool<Sqlite>,
1488+
session_id: String,
1489+
begin_statement: &'static str,
1490+
use_barrier: bool,
1491+
) -> Vec<anyhow::Result<()>> {
1492+
let barrier = if use_barrier {
1493+
Some(Arc::new(tokio::sync::Barrier::new(2)))
1494+
} else {
1495+
None
1496+
};
1497+
let mut handles = Vec::new();
1498+
1499+
for worker_id in 0..2 {
1500+
let pool = pool.clone();
1501+
let session_id = session_id.clone();
1502+
let barrier = barrier.clone();
1503+
handles.push(tokio::spawn(async move {
1504+
run_lock_upgrade_attempt(pool, session_id, begin_statement, worker_id, barrier)
1505+
.await
1506+
}));
1507+
}
1508+
1509+
let mut results = Vec::new();
1510+
for handle in handles {
1511+
results.push(handle.await.expect("lock-upgrade task panicked"));
1512+
}
1513+
results
1514+
}
1515+
1516+
#[tokio::test]
1517+
async fn test_begin_immediate_prevents_lock_upgrade_deadlock() {
1518+
let temp_dir = TempDir::new().unwrap();
1519+
let session_manager = SessionManager::new(temp_dir.path().to_path_buf());
1520+
1521+
let session = session_manager
1522+
.create_session(
1523+
PathBuf::from("/tmp/lock-upgrade-test"),
1524+
"Lock Upgrade Session".to_string(),
1525+
SessionType::User,
1526+
)
1527+
.await
1528+
.unwrap();
1529+
1530+
let pool = session_manager.storage().pool.clone();
1531+
1532+
let results = run_lock_upgrade_race(pool.clone(), session.id.clone(), "BEGIN", true).await;
1533+
assert!(
1534+
results.iter().any(Result::is_err),
1535+
"BEGIN (DEFERRED) should cause SQLITE_BUSY when two tasks try to upgrade SHARED → RESERVED"
1536+
);
1537+
1538+
let results = run_lock_upgrade_race(pool, session.id, "BEGIN IMMEDIATE", false).await;
1539+
assert!(
1540+
results.iter().all(Result::is_ok),
1541+
"BEGIN IMMEDIATE should serialize contention without SQLITE_BUSY: {:?}",
1542+
results
1543+
.iter()
1544+
.filter_map(|r| r.as_ref().err().map(ToString::to_string))
1545+
.collect::<Vec<_>>()
1546+
);
1547+
}
1548+
14581549
#[tokio::test]
14591550
async fn test_concurrent_session_creation() {
14601551
let temp_dir = TempDir::new().unwrap();

0 commit comments

Comments
 (0)