diff --git a/Cargo.toml b/Cargo.toml index cc1cd0347..9c419e339 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ intrusive-collections = "0.9.7" parking_lot = "0.12" portable-atomic = "1" rustc-hash = "2" -smallvec = "1" +smallvec = { version = "1", features = ["const_new"] } thin-vec = { version = "0.2.14" } tracing = { version = "0.1", default-features = false, features = ["std"] } diff --git a/src/active_query.rs b/src/active_query.rs index 0b2231052..d830fece1 100644 --- a/src/active_query.rs +++ b/src/active_query.rs @@ -498,7 +498,7 @@ impl fmt::Display for Backtrace { if full { write!(fmt, " -> ({changed_at:?}, {durability:#?}")?; if !cycle_heads.is_empty() || !iteration_count.is_initial() { - write!(fmt, ", iteration = {iteration_count:?}")?; + write!(fmt, ", iteration = {iteration_count}")?; } write!(fmt, ")")?; } @@ -517,7 +517,7 @@ impl fmt::Display for Backtrace { } write!( fmt, - "{:?} -> {:?}", + "{:?} -> iteration = {}", head.database_key_index, head.iteration_count )?; } diff --git a/src/cancelled.rs b/src/cancelled.rs index 2f2f315d9..3c31bae5a 100644 --- a/src/cancelled.rs +++ b/src/cancelled.rs @@ -20,6 +20,7 @@ pub enum Cancelled { } impl Cancelled { + #[cold] pub(crate) fn throw(self) -> ! { // We use resume and not panic here to avoid running the panic // hook (that is, to avoid collecting and printing backtrace). diff --git a/src/cycle.rs b/src/cycle.rs index 12cb1cdc9..c9a9b82c1 100644 --- a/src/cycle.rs +++ b/src/cycle.rs @@ -44,14 +44,18 @@ //! result in a stable, converged cycle. If it does not (that is, if the result of another //! iteration of the cycle is not the same as the fallback value), we'll panic. //! -//! In nested cycle cases, the inner cycle head will iterate until its own cycle is resolved, but -//! the "final" value it then returns will still be provisional on the outer cycle head. The outer -//! cycle head may then iterate, which may result in a new set of iterations on the inner cycle, -//! for each iteration of the outer cycle. - +//! In nested cycle cases, the inner cycles are iterated as part of the outer cycle iteration. This helps +//! to significantly reduce the number of iterations needed to reach a fixpoint. For nested cycles, +//! the inner cycles head will transfer their lock ownership to the outer cycle. This ensures +//! that, over time, the outer cycle will hold all necessary locks to complete the fixpoint iteration. +//! Without this, different threads would compete for the locks of inner cycle heads, leading to potential +//! hangs (but not deadlocks). + +use std::iter::FusedIterator; use thin_vec::{thin_vec, ThinVec}; use crate::key::DatabaseKeyIndex; +use crate::sync::atomic::{AtomicBool, AtomicU8, Ordering}; use crate::sync::OnceLock; use crate::Revision; @@ -96,14 +100,47 @@ pub enum CycleRecoveryStrategy { /// would be the cycle head. It returns an "initial value" when the cycle is encountered (if /// fixpoint iteration is enabled for that query), and then is responsible for re-iterating the /// cycle until it converges. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Debug)] #[cfg_attr(feature = "persistence", derive(serde::Serialize, serde::Deserialize))] pub struct CycleHead { pub(crate) database_key_index: DatabaseKeyIndex, - pub(crate) iteration_count: IterationCount, + pub(crate) iteration_count: AtomicIterationCount, + + /// Marks a cycle head as removed within its `CycleHeads` container. + /// + /// Cycle heads are marked as removed when the memo from the last iteration (a provisional memo) + /// is used as the initial value for the next iteration. It's necessary to remove all but its own + /// head from the `CycleHeads` container, because the query might now depend on fewer cycles + /// (in case of conditional dependencies). However, we can't actually remove the cycle head + /// within `fetch_cold_cycle` because we only have a readonly memo. That's what `removed` is used for. + #[cfg_attr(feature = "persistence", serde(skip))] + removed: AtomicBool, +} + +impl CycleHead { + pub const fn new( + database_key_index: DatabaseKeyIndex, + iteration_count: IterationCount, + ) -> Self { + Self { + database_key_index, + iteration_count: AtomicIterationCount(AtomicU8::new(iteration_count.0)), + removed: AtomicBool::new(false), + } + } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Default)] +impl Clone for CycleHead { + fn clone(&self) -> Self { + Self { + database_key_index: self.database_key_index, + iteration_count: self.iteration_count.load().into(), + removed: self.removed.load(Ordering::Relaxed).into(), + } + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Default, PartialOrd, Ord)] #[cfg_attr(feature = "persistence", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "persistence", serde(transparent))] pub struct IterationCount(u8); @@ -131,11 +168,69 @@ impl IterationCount { } } +impl std::fmt::Display for IterationCount { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +#[derive(Debug)] +pub(crate) struct AtomicIterationCount(AtomicU8); + +impl AtomicIterationCount { + pub(crate) fn load(&self) -> IterationCount { + IterationCount(self.0.load(Ordering::Relaxed)) + } + + pub(crate) fn load_mut(&mut self) -> IterationCount { + IterationCount(*self.0.get_mut()) + } + + pub(crate) fn store(&self, value: IterationCount) { + self.0.store(value.0, Ordering::Release); + } + + pub(crate) fn store_mut(&mut self, value: IterationCount) { + *self.0.get_mut() = value.0; + } +} + +impl std::fmt::Display for AtomicIterationCount { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.load().fmt(f) + } +} + +impl From for AtomicIterationCount { + fn from(iteration_count: IterationCount) -> Self { + AtomicIterationCount(iteration_count.0.into()) + } +} + +#[cfg(feature = "persistence")] +impl serde::Serialize for AtomicIterationCount { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.load().serialize(serializer) + } +} + +#[cfg(feature = "persistence")] +impl<'de> serde::Deserialize<'de> for AtomicIterationCount { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + IterationCount::deserialize(deserializer).map(Into::into) + } +} + /// Any provisional value generated by any query in a cycle will track the cycle head(s) (can be /// plural in case of nested cycles) representing the cycles it is part of, and the current /// iteration count for each cycle head. This struct tracks these cycle heads. #[derive(Clone, Debug, Default)] -#[cfg_attr(feature = "persistence", derive(serde::Serialize, serde::Deserialize))] pub struct CycleHeads(ThinVec); impl CycleHeads { @@ -143,15 +238,30 @@ impl CycleHeads { self.0.is_empty() } - pub(crate) fn initial(database_key_index: DatabaseKeyIndex) -> Self { + pub(crate) fn initial( + database_key_index: DatabaseKeyIndex, + iteration_count: IterationCount, + ) -> Self { Self(thin_vec![CycleHead { database_key_index, - iteration_count: IterationCount::initial(), + iteration_count: iteration_count.into(), + removed: false.into() }]) } - pub(crate) fn iter(&self) -> std::slice::Iter<'_, CycleHead> { - self.0.iter() + pub(crate) fn iter(&self) -> CycleHeadsIterator<'_> { + CycleHeadsIterator { + inner: self.0.iter(), + } + } + + /// Iterates over all cycle heads that aren't equal to `own`. + pub(crate) fn iter_not_eq( + &self, + own: DatabaseKeyIndex, + ) -> impl DoubleEndedIterator { + self.iter() + .filter(move |head| head.database_key_index != own) } pub(crate) fn contains(&self, value: &DatabaseKeyIndex) -> bool { @@ -159,17 +269,25 @@ impl CycleHeads { .any(|head| head.database_key_index == *value) } - pub(crate) fn remove(&mut self, value: &DatabaseKeyIndex) -> bool { - let found = self - .0 - .iter() - .position(|&head| head.database_key_index == *value); - let Some(found) = found else { return false }; - self.0.swap_remove(found); - true + /// Removes all cycle heads except `except` by marking them as removed. + /// + /// Note that the heads aren't actually removed. They're only marked as removed and will be + /// skipped when iterating. This is because we might not have a mutable reference. + pub(crate) fn remove_all_except(&self, except: DatabaseKeyIndex) { + for head in self.0.iter() { + if head.database_key_index == except { + continue; + } + + head.removed.store(true, Ordering::Release); + } } - pub(crate) fn update_iteration_count( + /// Updates the iteration count for the head `cycle_head_index` to `new_iteration_count`. + /// + /// Unlike [`update_iteration_count`], this method takes a `&mut self` reference. It should + /// be preferred if possible, as it avoids atomic operations. + pub(crate) fn update_iteration_count_mut( &mut self, cycle_head_index: DatabaseKeyIndex, new_iteration_count: IterationCount, @@ -179,7 +297,24 @@ impl CycleHeads { .iter_mut() .find(|cycle_head| cycle_head.database_key_index == cycle_head_index) { - cycle_head.iteration_count = new_iteration_count; + cycle_head.iteration_count.store_mut(new_iteration_count); + } + } + + /// Updates the iteration count for the head `cycle_head_index` to `new_iteration_count`. + /// + /// Unlike [`update_iteration_count_mut`], this method takes a `&self` reference. + pub(crate) fn update_iteration_count( + &self, + cycle_head_index: DatabaseKeyIndex, + new_iteration_count: IterationCount, + ) { + if let Some(cycle_head) = self + .0 + .iter() + .find(|cycle_head| cycle_head.database_key_index == cycle_head_index) + { + cycle_head.iteration_count.store(new_iteration_count); } } @@ -188,15 +323,42 @@ impl CycleHeads { self.0.reserve(other.0.len()); for head in other { - if let Some(existing) = self - .0 - .iter() - .find(|candidate| candidate.database_key_index == head.database_key_index) - { - assert_eq!(existing.iteration_count, head.iteration_count); + debug_assert!(!head.removed.load(Ordering::Relaxed)); + self.insert(head.database_key_index, head.iteration_count.load()); + } + } + + pub(crate) fn insert( + &mut self, + database_key_index: DatabaseKeyIndex, + iteration_count: IterationCount, + ) -> bool { + if let Some(existing) = self + .0 + .iter_mut() + .find(|candidate| candidate.database_key_index == database_key_index) + { + let removed = existing.removed.get_mut(); + + if *removed { + *removed = false; + + true } else { - self.0.push(*head); + let existing_count = existing.iteration_count.load_mut(); + + assert_eq!( + existing_count, iteration_count, + "Can't merge cycle heads {:?} with different iteration counts ({existing_count:?}, {iteration_count:?})", + existing.database_key_index + ); + + false } + } else { + self.0 + .push(CycleHead::new(database_key_index, iteration_count)); + true } } @@ -206,6 +368,37 @@ impl CycleHeads { } } +#[cfg(feature = "persistence")] +impl serde::Serialize for CycleHeads { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeSeq; + + let mut seq = serializer.serialize_seq(None)?; + for e in self { + if e.removed.load(Ordering::Relaxed) { + continue; + } + + seq.serialize_element(e)?; + } + seq.end() + } +} + +#[cfg(feature = "persistence")] +impl<'de> serde::Deserialize<'de> for CycleHeads { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let vec: ThinVec = serde::Deserialize::deserialize(deserializer)?; + Ok(CycleHeads(vec)) + } +} + impl IntoIterator for CycleHeads { type Item = CycleHead; type IntoIter = as IntoIterator>::IntoIter; @@ -215,9 +408,44 @@ impl IntoIterator for CycleHeads { } } +pub struct CycleHeadsIterator<'a> { + inner: std::slice::Iter<'a, CycleHead>, +} + +impl<'a> Iterator for CycleHeadsIterator<'a> { + type Item = &'a CycleHead; + + fn next(&mut self) -> Option { + loop { + let next = self.inner.next()?; + + if next.removed.load(Ordering::Relaxed) { + continue; + } + + return Some(next); + } + } +} + +impl FusedIterator for CycleHeadsIterator<'_> {} +impl DoubleEndedIterator for CycleHeadsIterator<'_> { + fn next_back(&mut self) -> Option { + loop { + let next = self.inner.next_back()?; + + if next.removed.load(Ordering::Relaxed) { + continue; + } + + return Some(next); + } + } +} + impl<'a> std::iter::IntoIterator for &'a CycleHeads { type Item = &'a CycleHead; - type IntoIter = std::slice::Iter<'a, CycleHead>; + type IntoIter = CycleHeadsIterator<'a>; fn into_iter(self) -> Self::IntoIter { self.iter() @@ -248,21 +476,3 @@ pub enum ProvisionalStatus { }, FallbackImmediate, } - -impl ProvisionalStatus { - pub(crate) const fn iteration(&self) -> Option { - match self { - ProvisionalStatus::Provisional { iteration, .. } => Some(*iteration), - ProvisionalStatus::Final { iteration, .. } => Some(*iteration), - ProvisionalStatus::FallbackImmediate => None, - } - } - - pub(crate) const fn verified_at(&self) -> Option { - match self { - ProvisionalStatus::Provisional { verified_at, .. } => Some(*verified_at), - ProvisionalStatus::Final { verified_at, .. } => Some(*verified_at), - ProvisionalStatus::FallbackImmediate => None, - } - } -} diff --git a/src/function.rs b/src/function.rs index 58f773895..7499b575b 100644 --- a/src/function.rs +++ b/src/function.rs @@ -1,5 +1,5 @@ pub(crate) use maybe_changed_after::{VerifyCycleHeads, VerifyResult}; -pub(crate) use sync::SyncGuard; +pub(crate) use sync::{ClaimGuard, ClaimResult, Reentrant, SyncGuard, SyncOwnerId, SyncTable}; use std::any::Any; use std::fmt; @@ -8,11 +8,11 @@ use std::sync::atomic::Ordering; use std::sync::OnceLock; use crate::cycle::{ - empty_cycle_heads, CycleHeads, CycleRecoveryAction, CycleRecoveryStrategy, ProvisionalStatus, + empty_cycle_heads, CycleHeads, CycleRecoveryAction, CycleRecoveryStrategy, IterationCount, + ProvisionalStatus, }; use crate::database::RawDatabase; use crate::function::delete::DeletedEntries; -use crate::function::sync::{ClaimResult, SyncTable}; use crate::hash::{FxHashSet, FxIndexSet}; use crate::ingredient::{Ingredient, WaitForResult}; use crate::key::DatabaseKeyIndex; @@ -92,7 +92,18 @@ pub trait Configuration: Any { /// Decide whether to iterate a cycle again or fallback. `value` is the provisional return /// value from the latest iteration of this cycle. `count` is the number of cycle iterations - /// we've already completed. + /// completed so far. + /// + /// # Iteration count semantics + /// + /// The `count` parameter isn't guaranteed to start from zero or to be contiguous: + /// + /// * **Initial value**: `count` may be non-zero on the first call for a given query if that + /// query becomes the outermost cycle head after a nested cycle complete a few iterations. In this case, + /// `count` continues from the nested cycle's iteration count rather than resetting to zero. + /// * **Non-contiguous values**: This function isn't called if this cycle is part of an outer cycle + /// and the value for this query remains unchanged for one iteration. But the outer cycle might + /// keep iterating because other heads keep changing. fn recover_from_cycle<'db>( db: &'db Self::DbView, value: &Self::Output<'db>, @@ -358,6 +369,41 @@ where }) } + fn set_cycle_iteration_count(&self, zalsa: &Zalsa, input: Id, iteration_count: IterationCount) { + let Some(memo) = + self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input)) + else { + return; + }; + + memo.revisions + .set_iteration_count(Self::database_key_index(self, input), iteration_count); + } + + fn finalize_cycle_head(&self, zalsa: &Zalsa, input: Id) { + let Some(memo) = + self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input)) + else { + return; + }; + + memo.revisions.verified_final.store(true, Ordering::Release); + } + + fn cycle_converged(&self, zalsa: &Zalsa, input: Id) -> bool { + let Some(memo) = + self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input)) + else { + return true; + }; + + memo.revisions.cycle_converged() + } + + fn mark_as_transfer_target(&self, key_index: Id) -> Option { + self.sync_table.mark_as_transfer_target(key_index) + } + fn cycle_heads<'db>(&self, zalsa: &'db Zalsa, input: Id) -> &'db CycleHeads { self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input)) .map(|memo| memo.cycle_heads()) @@ -372,9 +418,9 @@ where /// * [`WaitResult::Cycle`] Claiming the `key_index` results in a cycle because it's on the current's thread query stack or /// running on another thread that is blocked on this thread. fn wait_for<'me>(&'me self, zalsa: &'me Zalsa, key_index: Id) -> WaitForResult<'me> { - match self.sync_table.try_claim(zalsa, key_index) { + match self.sync_table.try_claim(zalsa, key_index, Reentrant::Deny) { ClaimResult::Running(blocked_on) => WaitForResult::Running(blocked_on), - ClaimResult::Cycle => WaitForResult::Cycle, + ClaimResult::Cycle { inner } => WaitForResult::Cycle { inner }, ClaimResult::Claimed(_) => WaitForResult::Available, } } @@ -435,10 +481,6 @@ where unreachable!("function does not allocate pages") } - fn cycle_recovery_strategy(&self) -> CycleRecoveryStrategy { - C::CYCLE_STRATEGY - } - #[cfg(feature = "accumulator")] unsafe fn accumulated<'db>( &'db self, diff --git a/src/function/execute.rs b/src/function/execute.rs index 9521a9dce..b48b3250d 100644 --- a/src/function/execute.rs +++ b/src/function/execute.rs @@ -1,12 +1,18 @@ +use smallvec::SmallVec; + use crate::active_query::CompletedQuery; -use crate::cycle::{CycleRecoveryStrategy, IterationCount}; +use crate::cycle::{CycleHeads, CycleRecoveryStrategy, IterationCount}; use crate::function::memo::Memo; -use crate::function::{Configuration, IngredientImpl}; +use crate::function::sync::ReleaseMode; +use crate::function::{ClaimGuard, Configuration, IngredientImpl}; +use crate::ingredient::WaitForResult; use crate::plumbing::ZalsaLocal; use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sync::thread; use crate::tracked_struct::Identity; use crate::zalsa::{MemoIngredientIndex, Zalsa}; use crate::zalsa_local::{ActiveQueryGuard, QueryRevisions}; +use crate::{tracing, Cancelled}; use crate::{DatabaseKeyIndex, Event, EventKind, Id}; impl IngredientImpl @@ -26,12 +32,15 @@ where pub(super) fn execute<'db>( &'db self, db: &'db C::DbView, - zalsa: &'db Zalsa, + mut claim_guard: ClaimGuard<'db>, zalsa_local: &'db ZalsaLocal, - database_key_index: DatabaseKeyIndex, opt_old_memo: Option<&Memo<'db, C>>, ) -> &'db Memo<'db, C> { + let database_key_index = claim_guard.database_key_index(); + let zalsa = claim_guard.zalsa(); + let id = database_key_index.key_index(); + let memo_ingredient_index = self.memo_ingredient_index(zalsa, id); crate::tracing::info!("{:?}: executing query", database_key_index); @@ -40,7 +49,6 @@ where database_key: database_key_index, }) }); - let memo_ingredient_index = self.memo_ingredient_index(zalsa, id); let (new_value, mut completed_query) = match C::CYCLE_STRATEGY { CycleRecoveryStrategy::Panic => Self::execute_query( @@ -94,9 +102,8 @@ where CycleRecoveryStrategy::Fixpoint => self.execute_maybe_iterate( db, opt_old_memo, - zalsa, + &mut claim_guard, zalsa_local, - database_key_index, memo_ingredient_index, ), }; @@ -117,6 +124,7 @@ where // outputs and update the tracked struct IDs for seeding the next revision. self.diff_outputs(zalsa, database_key_index, old_memo, &completed_query); } + self.insert_memo( zalsa, id, @@ -133,25 +141,52 @@ where &'db self, db: &'db C::DbView, opt_old_memo: Option<&Memo<'db, C>>, - zalsa: &'db Zalsa, + claim_guard: &mut ClaimGuard<'db>, zalsa_local: &'db ZalsaLocal, - database_key_index: DatabaseKeyIndex, memo_ingredient_index: MemoIngredientIndex, ) -> (C::Output<'db>, CompletedQuery) { + claim_guard.set_release_mode(ReleaseMode::Default); + + let database_key_index = claim_guard.database_key_index(); + let zalsa = claim_guard.zalsa(); + let id = database_key_index.key_index(); - let mut iteration_count = IterationCount::initial(); - let mut active_query = zalsa_local.push_query(database_key_index, iteration_count); // Our provisional value from the previous iteration, when doing fixpoint iteration. - // Initially it's set to None, because the initial provisional value is created lazily, - // only when a cycle is actually encountered. - let mut opt_last_provisional: Option<&Memo<'db, C>> = None; + // This is different from `opt_old_memo` which might be from a different revision. + let mut last_provisional_memo: Option<&Memo<'db, C>> = None; + + // TODO: Can we seed those somehow? let mut last_stale_tracked_ids: Vec<(Identity, Id)> = Vec::new(); - let _guard = ClearCycleHeadIfPanicking::new(self, zalsa, id, memo_ingredient_index); + let mut iteration_count = IterationCount::initial(); + + if let Some(old_memo) = opt_old_memo { + if old_memo.verified_at.load() == zalsa.current_revision() + && old_memo.cycle_heads().contains(&database_key_index) + { + let memo_iteration_count = old_memo.revisions.iteration(); + + // The `DependencyGraph` locking propagates panics when another thread is blocked on a panicking query. + // However, the locking doesn't handle the case where a thread fetches the result of a panicking cycle head query **after** all locks were released. + // That's what we do here. We could consider re-executing the entire cycle but: + // a) It's tricky to ensure that all queries participating in the cycle will re-execute + // (we can't rely on `iteration_count` being updated for nested cycles because the nested cycles may have completed successfully). + // b) It's guaranteed that this query will panic again anyway. + // That's why we simply propagate the panic here. It simplifies our lives and it also avoids duplicate panic messages. + if old_memo.value.is_none() { + ::tracing::warn!("Propagating panic for cycle head that panicked in an earlier execution in that revision"); + Cancelled::PropagatedPanic.throw(); + } + last_provisional_memo = Some(old_memo); + iteration_count = memo_iteration_count; + } + } - loop { - let previous_memo = opt_last_provisional.or(opt_old_memo); + let _poison_guard = + PoisonProvisionalIfPanicking::new(self, zalsa, id, memo_ingredient_index); + let mut active_query = zalsa_local.push_query(database_key_index, iteration_count); + let (new_value, completed_query) = loop { // Tracked struct ids that existed in the previous revision // but weren't recreated in the last iteration. It's important that we seed the next // query with these ids because the query might re-create them as part of the next iteration. @@ -160,118 +195,267 @@ where // if they aren't recreated when reaching the final iteration. active_query.seed_tracked_struct_ids(&last_stale_tracked_ids); - let (mut new_value, mut completed_query) = - Self::execute_query(db, zalsa, active_query, previous_memo); + let (mut new_value, mut completed_query) = Self::execute_query( + db, + zalsa, + active_query, + last_provisional_memo.or(opt_old_memo), + ); + + // If there are no cycle heads, break out of the loop (`cycle_heads_mut` returns `None` if the cycle head list is empty) + let Some(cycle_heads) = completed_query.revisions.cycle_heads_mut() else { + claim_guard.set_release_mode(ReleaseMode::SelfOnly); + break (new_value, completed_query); + }; + + // Take the cycle heads to not-fight-rust's-borrow-checker. + let mut cycle_heads = std::mem::take(cycle_heads); + let mut missing_heads: SmallVec<[(DatabaseKeyIndex, IterationCount); 1]> = + SmallVec::new_const(); + let mut max_iteration_count = iteration_count; + let mut depends_on_self = false; + + // Ensure that we resolve the latest cycle heads from any provisional value this query depended on during execution. + // This isn't required in a single-threaded execution, but it's not guaranteed that `cycle_heads` contains all cycles + // in a multi-threaded execution: + // + // t1: a -> b + // t2: c -> b (blocks on t1) + // t1: a -> b -> c (cycle, returns fixpoint initial with c(0) in heads) + // t1: a -> b (completes b, b has c(0) in its cycle heads, releases `b`, which resumes `t2`, and `retry_provisional` blocks on `c` (t2)) + // t2: c -> a (cycle, returns fixpoint initial for a with a(0) in heads) + // t2: completes c, `provisional_retry` blocks on `a` (t2) + // t1: a (completes `b` with `c` in heads) + // + // Note how `a` only depends on `c` but not `a`. This is because `a` only saw the initial value of `c` and wasn't updated when `c` completed. + // That's why we need to resolve the cycle heads recursively to `cycle_heads` contains all cycle heads at the moment this query completed. + for head in &cycle_heads { + max_iteration_count = max_iteration_count.max(head.iteration_count.load()); + depends_on_self |= head.database_key_index == database_key_index; + + let ingredient = + zalsa.lookup_ingredient(head.database_key_index.ingredient_index()); + + for nested_head in + ingredient.cycle_heads(zalsa, head.database_key_index.key_index()) + { + let nested_as_tuple = ( + nested_head.database_key_index, + nested_head.iteration_count.load(), + ); + + if !cycle_heads.contains(&nested_head.database_key_index) + && !missing_heads.contains(&nested_as_tuple) + { + missing_heads.push(nested_as_tuple); + } + } + } + + for (head_key, iteration_count) in missing_heads { + max_iteration_count = max_iteration_count.max(iteration_count); + depends_on_self |= head_key == database_key_index; + + cycle_heads.insert(head_key, iteration_count); + } + + let outer_cycle = outer_cycle(zalsa, zalsa_local, &cycle_heads, database_key_index); // Did the new result we got depend on our own provisional value, in a cycle? - if let Some(cycle_heads) = completed_query - .revisions - .cycle_heads_mut() - .filter(|cycle_heads| cycle_heads.contains(&database_key_index)) - { - let last_provisional_value = if let Some(last_provisional) = opt_last_provisional { - // We have a last provisional value from our previous time around the loop. - last_provisional.value.as_ref() + // If not, return because this query is not a cycle head. + if !depends_on_self { + // For as long as this query participates in any cycle, don't release its lock, instead + // transfer it to the outermost cycle head (if any). This prevents any other thread + // from claiming this query (all cycle heads are potential entry points to the same cycle), + // which would result in them competing for the same locks (we want the locks to converge to a single cycle head). + if let Some(outer_cycle) = outer_cycle { + claim_guard.set_release_mode(ReleaseMode::TransferTo(outer_cycle)); } else { - // This is our first time around the loop; a provisional value must have been - // inserted into the memo table when the cycle was hit, so let's pull our - // initial provisional value from there. - let memo = self - .get_memo_from_table_for(zalsa, id, memo_ingredient_index) - .filter(|memo| memo.verified_at.load() == zalsa.current_revision()) - .unwrap_or_else(|| { - unreachable!( - "{database_key_index:#?} is a cycle head, \ + claim_guard.set_release_mode(ReleaseMode::SelfOnly); + } + + completed_query.revisions.set_cycle_heads(cycle_heads); + break (new_value, completed_query); + } + + // Get the last provisional value for this query so that we can compare it with the new value + // to test if the cycle converged. + let last_provisional_value = if let Some(last_provisional) = last_provisional_memo { + // We have a last provisional value from our previous time around the loop. + last_provisional.value.as_ref() + } else { + // This is our first time around the loop; a provisional value must have been + // inserted into the memo table when the cycle was hit, so let's pull our + // initial provisional value from there. + let memo = self + .get_memo_from_table_for(zalsa, id, memo_ingredient_index) + .unwrap_or_else(|| { + unreachable!( + "{database_key_index:#?} is a cycle head, \ but no provisional memo found" - ) - }); + ) + }); - debug_assert!(memo.may_be_provisional()); - memo.value.as_ref() - }; + debug_assert!(memo.may_be_provisional()); + memo.value.as_ref() + }; - let last_provisional_value = last_provisional_value.expect( - "`fetch_cold_cycle` should have inserted a provisional memo with Cycle::initial", - ); - crate::tracing::debug!( - "{database_key_index:?}: execute: \ - I am a cycle head, comparing last provisional value with new value" - ); - // If the new result is equal to the last provisional result, the cycle has - // converged and we are done. - if !C::values_equal(&new_value, last_provisional_value) { - // We are in a cycle that hasn't converged; ask the user's - // cycle-recovery function what to do: - match C::recover_from_cycle( - db, - &new_value, - iteration_count.as_u32(), - C::id_to_input(zalsa, id), - ) { - crate::CycleRecoveryAction::Iterate => {} - crate::CycleRecoveryAction::Fallback(fallback_value) => { - crate::tracing::debug!( - "{database_key_index:?}: execute: user cycle_fn says to fall back" - ); - new_value = fallback_value; - } - } - // `iteration_count` can't overflow as we check it against `MAX_ITERATIONS` - // which is less than `u32::MAX`. - iteration_count = iteration_count.increment().unwrap_or_else(|| { - tracing::warn!( - "{database_key_index:?}: execute: too many cycle iterations" + let last_provisional_value = last_provisional_value.expect( + "`fetch_cold_cycle` should have inserted a provisional memo with Cycle::initial", + ); + tracing::debug!( + "{database_key_index:?}: execute: \ + I am a cycle head, comparing last provisional value with new value" + ); + + let this_converged = C::values_equal(&new_value, last_provisional_value); + + // If this is the outermost cycle, use the maximum iteration count of all cycles. + // This is important for when later iterations introduce new cycle heads (that then + // become the outermost cycle). We want to ensure that the iteration count keeps increasing + // for all queries or they won't be re-executed because `validate_same_iteration` would + // pass when we go from 1 -> 0 and then increment by 1 to 1). + iteration_count = if outer_cycle.is_none() { + max_iteration_count + } else { + // Otherwise keep the iteration count because outer cycles + // already have a cycle head with this exact iteration count (and we don't allow + // heads from different iterations). + iteration_count + }; + + if !this_converged { + // We are in a cycle that hasn't converged; ask the user's + // cycle-recovery function what to do: + match C::recover_from_cycle( + db, + &new_value, + iteration_count.as_u32(), + C::id_to_input(zalsa, id), + ) { + crate::CycleRecoveryAction::Iterate => {} + crate::CycleRecoveryAction::Fallback(fallback_value) => { + tracing::debug!( + "{database_key_index:?}: execute: user cycle_fn says to fall back" ); - panic!("{database_key_index:?}: execute: too many cycle iterations") - }); - zalsa.event(&|| { - Event::new(EventKind::WillIterateCycle { - database_key: database_key_index, - iteration_count, - }) - }); - cycle_heads.update_iteration_count(database_key_index, iteration_count); - completed_query - .revisions - .update_iteration_count(iteration_count); - crate::tracing::info!("{database_key_index:?}: execute: iterate again...",); - opt_last_provisional = Some(self.insert_memo( - zalsa, - id, - Memo::new( - Some(new_value), - zalsa.current_revision(), - completed_query.revisions, - ), - memo_ingredient_index, - )); - last_stale_tracked_ids = completed_query.stale_tracked_structs; - - active_query = zalsa_local.push_query(database_key_index, iteration_count); - - continue; + new_value = fallback_value; + } } - crate::tracing::debug!( - "{database_key_index:?}: execute: fixpoint iteration has a final value" + } + + if let Some(outer_cycle) = outer_cycle { + tracing::info!( + "Detected nested cycle {database_key_index:?}, iterate it as part of the outer cycle {outer_cycle:?}" ); - cycle_heads.remove(&database_key_index); - - if cycle_heads.is_empty() { - // If there are no more cycle heads, we can mark this as verified. - completed_query - .revisions - .verified_final - .store(true, Ordering::Relaxed); + + completed_query.revisions.set_cycle_heads(cycle_heads); + // Store whether this cycle has converged, so that the outer cycle can check it. + completed_query + .revisions + .set_cycle_converged(this_converged); + + // Transfer ownership of this query to the outer cycle, so that it can claim it + // and other threads don't compete for the same lock. + claim_guard.set_release_mode(ReleaseMode::TransferTo(outer_cycle)); + + break (new_value, completed_query); + } + + // If this is the outermost cycle, test if all inner cycles have converged as well. + let converged = this_converged + && cycle_heads.iter_not_eq(database_key_index).all(|head| { + let ingredient = + zalsa.lookup_ingredient(head.database_key_index.ingredient_index()); + + let converged = + ingredient.cycle_converged(zalsa, head.database_key_index.key_index()); + + if !converged { + tracing::debug!("inner cycle {database_key_index:?} has not converged"); + } + + converged + }); + + if converged { + tracing::debug!( + "{database_key_index:?}: execute: fixpoint iteration has a final value after {iteration_count:?} iterations" + ); + + // Set the nested cycles as verified. This is necessary because + // `validate_provisional` doesn't follow cycle heads recursively (and the memos now depend on all cycle heads). + for head in cycle_heads.iter_not_eq(database_key_index) { + let ingredient = + zalsa.lookup_ingredient(head.database_key_index.ingredient_index()); + ingredient.finalize_cycle_head(zalsa, head.database_key_index.key_index()); } + + *completed_query.revisions.verified_final.get_mut() = true; + + break (new_value, completed_query); + } + + // The fixpoint iteration hasn't converged. Iterate again... + iteration_count = iteration_count.increment().unwrap_or_else(|| { + ::tracing::warn!("{database_key_index:?}: execute: too many cycle iterations"); + panic!("{database_key_index:?}: execute: too many cycle iterations") + }); + + zalsa.event(&|| { + Event::new(EventKind::WillIterateCycle { + database_key: database_key_index, + iteration_count, + }) + }); + + tracing::info!( + "{database_key_index:?}: execute: iterate again ({iteration_count:?})...", + ); + + // Update the iteration count of nested cycles. + for head in cycle_heads.iter_not_eq(database_key_index) { + let ingredient = + zalsa.lookup_ingredient(head.database_key_index.ingredient_index()); + + ingredient.set_cycle_iteration_count( + zalsa, + head.database_key_index.key_index(), + iteration_count, + ); } - crate::tracing::debug!( - "{database_key_index:?}: execute: result.revisions = {revisions:#?}", - revisions = &completed_query.revisions + // Update the iteration count of this cycle head, but only after restoring + // the cycle heads array (or this becomes a no-op). + completed_query.revisions.set_cycle_heads(cycle_heads); + completed_query + .revisions + .update_iteration_count_mut(database_key_index, iteration_count); + + let new_memo = self.insert_memo( + zalsa, + id, + Memo::new( + Some(new_value), + zalsa.current_revision(), + completed_query.revisions, + ), + memo_ingredient_index, ); - break (new_value, completed_query); - } + last_provisional_memo = Some(new_memo); + + last_stale_tracked_ids = completed_query.stale_tracked_structs; + active_query = zalsa_local.push_query(database_key_index, iteration_count); + + continue; + }; + + tracing::debug!( + "{database_key_index:?}: execute_maybe_iterate: result.revisions = {revisions:#?}", + revisions = &completed_query.revisions + ); + + (new_value, completed_query) } #[inline] @@ -325,14 +509,14 @@ where /// a new fix point initial value if that happens. /// /// We could insert a fixpoint initial value here, but it seems unnecessary. -struct ClearCycleHeadIfPanicking<'a, C: Configuration> { +struct PoisonProvisionalIfPanicking<'a, C: Configuration> { ingredient: &'a IngredientImpl, zalsa: &'a Zalsa, id: Id, memo_ingredient_index: MemoIngredientIndex, } -impl<'a, C: Configuration> ClearCycleHeadIfPanicking<'a, C> { +impl<'a, C: Configuration> PoisonProvisionalIfPanicking<'a, C> { fn new( ingredient: &'a IngredientImpl, zalsa: &'a Zalsa, @@ -348,9 +532,9 @@ impl<'a, C: Configuration> ClearCycleHeadIfPanicking<'a, C> { } } -impl Drop for ClearCycleHeadIfPanicking<'_, C> { +impl Drop for PoisonProvisionalIfPanicking<'_, C> { fn drop(&mut self) { - if std::thread::panicking() { + if thread::panicking() { let revisions = QueryRevisions::fixpoint_initial(self.ingredient.database_key_index(self.id)); @@ -360,3 +544,44 @@ impl Drop for ClearCycleHeadIfPanicking<'_, C> { } } } + +/// Returns the key of any potential outer cycle head or `None` if there is no outer cycle. +/// +/// That is, any query that's currently blocked on the result computed by this query (claiming it results in a cycle). +fn outer_cycle( + zalsa: &Zalsa, + zalsa_local: &ZalsaLocal, + cycle_heads: &CycleHeads, + current_key: DatabaseKeyIndex, +) -> Option { + // First, look for the outer most cycle head on the same thread. + // Using the outer most over the inner most should reduce the need + // for transitive transfers. + // SAFETY: We don't call into with_query_stack recursively + if let Some(same_thread) = unsafe { + zalsa_local.with_query_stack_unchecked(|stack| { + stack + .iter() + .find(|active_query| { + cycle_heads.contains(&active_query.database_key_index) + && active_query.database_key_index != current_key + }) + .map(|active_query| active_query.database_key_index) + }) + } { + return Some(same_thread); + } + + // Check for any outer cycle head running on a different thread. + cycle_heads + .iter_not_eq(current_key) + .rfind(|head| { + let ingredient = zalsa.lookup_ingredient(head.database_key_index.ingredient_index()); + + matches!( + ingredient.wait_for(zalsa, head.database_key_index.key_index()), + WaitForResult::Cycle { inner: false } + ) + }) + .map(|head| head.database_key_index) +} diff --git a/src/function/fetch.rs b/src/function/fetch.rs index a1b6658f6..2285bbd2e 100644 --- a/src/function/fetch.rs +++ b/src/function/fetch.rs @@ -4,7 +4,7 @@ use crate::cycle::{CycleHeads, CycleRecoveryStrategy, IterationCount}; use crate::function::maybe_changed_after::VerifyCycleHeads; use crate::function::memo::Memo; use crate::function::sync::ClaimResult; -use crate::function::{Configuration, IngredientImpl}; +use crate::function::{Configuration, IngredientImpl, Reentrant}; use crate::zalsa::{MemoIngredientIndex, Zalsa}; use crate::zalsa_local::{QueryRevisions, ZalsaLocal}; use crate::{DatabaseKeyIndex, Id}; @@ -13,6 +13,7 @@ impl IngredientImpl where C: Configuration, { + #[inline] pub fn fetch<'db>( &'db self, db: &'db C::DbView, @@ -57,11 +58,19 @@ where id: Id, ) -> &'db Memo<'db, C> { let memo_ingredient_index = self.memo_ingredient_index(zalsa, id); + let mut retry_count = 0; loop { if let Some(memo) = self .fetch_hot(zalsa, id, memo_ingredient_index) .or_else(|| { - self.fetch_cold_with_retry(zalsa, zalsa_local, db, id, memo_ingredient_index) + self.fetch_cold_with_retry( + zalsa, + zalsa_local, + db, + id, + memo_ingredient_index, + &mut retry_count, + ) }) { return memo; @@ -95,7 +104,6 @@ where } } - #[inline(never)] fn fetch_cold_with_retry<'db>( &'db self, zalsa: &'db Zalsa, @@ -103,6 +111,7 @@ where db: &'db C::DbView, id: Id, memo_ingredient_index: MemoIngredientIndex, + retry_count: &mut u32, ) -> Option<&'db Memo<'db, C>> { let memo = self.fetch_cold(zalsa, zalsa_local, db, id, memo_ingredient_index)?; @@ -114,7 +123,7 @@ where // That is only correct for fixpoint cycles, though: `FallbackImmediate` cycles // never have provisional entries. if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate - || !memo.provisional_retry(zalsa, zalsa_local, self.database_key_index(id)) + || !memo.provisional_retry(zalsa, zalsa_local, self.database_key_index(id), retry_count) { Some(memo) } else { @@ -132,21 +141,21 @@ where ) -> Option<&'db Memo<'db, C>> { let database_key_index = self.database_key_index(id); // Try to claim this query: if someone else has claimed it already, go back and start again. - let claim_guard = match self.sync_table.try_claim(zalsa, id) { + let claim_guard = match self.sync_table.try_claim(zalsa, id, Reentrant::Allow) { ClaimResult::Claimed(guard) => guard, ClaimResult::Running(blocked_on) => { blocked_on.block_on(zalsa); - let memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index); + if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate { + let memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index); - if let Some(memo) = memo { - // This isn't strictly necessary, but if this is a provisional memo for an inner cycle, - // await all outer cycle heads to give the thread driving it a chance to complete - // (we don't want multiple threads competing for the queries participating in the same cycle). - if memo.value.is_some() && memo.may_be_provisional() { - memo.block_on_heads(zalsa, zalsa_local); + if let Some(memo) = memo { + if memo.value.is_some() { + memo.block_on_heads(zalsa, zalsa_local); + } } } + return None; } ClaimResult::Cycle { .. } => { @@ -200,39 +209,10 @@ where // still valid for the current revision. return unsafe { Some(self.extend_memo_lifetime(old_memo)) }; } - - // If this is a provisional memo from the same revision, await all its cycle heads because - // we need to ensure that only one thread is iterating on a cycle at a given time. - // For example, if we have a nested cycle like so: - // ``` - // a -> b -> c -> b - // -> a - // - // d -> b - // ``` - // thread 1 calls `a` and `a` completes the inner cycle `b -> c` but hasn't finished the outer cycle `a` yet. - // thread 2 now calls `b`. We don't want that thread 2 iterates `b` while thread 1 is iterating `a` at the same time - // because it can result in thread b overriding provisional memos that thread a has accessed already and still relies upon. - // - // By waiting, we ensure that thread 1 completes a (based on a provisional value for `b`) and `b` - // becomes the new outer cycle, which thread 2 drives to completion. - if old_memo.may_be_provisional() - && old_memo.verified_at.load() == zalsa.current_revision() - { - // Try to claim all cycle heads of the provisional memo. If we can't because - // some head is running on another thread, drop our claim guard to give that thread - // a chance to take ownership of this query and complete it as part of its fixpoint iteration. - // We will then block on the cycle head and retry once all cycle heads completed. - if !old_memo.try_claim_heads(zalsa, zalsa_local) { - drop(claim_guard); - old_memo.block_on_heads(zalsa, zalsa_local); - return None; - } - } } } - let memo = self.execute(db, zalsa, zalsa_local, database_key_index, opt_old_memo); + let memo = self.execute(db, claim_guard, zalsa_local, opt_old_memo); Some(memo) } @@ -257,6 +237,19 @@ where let can_shallow_update = self.shallow_verify_memo(zalsa, database_key_index, memo); if can_shallow_update.yes() { self.update_shallow(zalsa, database_key_index, memo, can_shallow_update); + + if C::CYCLE_STRATEGY == CycleRecoveryStrategy::Fixpoint { + memo.revisions + .cycle_heads() + .remove_all_except(database_key_index); + } + + crate::tracing::debug!( + "hit cycle at {database_key_index:#?}, \ + returning last provisional value: {:#?}", + memo.revisions + ); + // SAFETY: memo is present in memo_map. return unsafe { self.extend_memo_lifetime(memo) }; } @@ -299,7 +292,10 @@ where let mut completed_query = active_query.pop(); completed_query .revisions - .set_cycle_heads(CycleHeads::initial(database_key_index)); + .set_cycle_heads(CycleHeads::initial( + database_key_index, + IterationCount::initial(), + )); // We need this for `cycle_heads()` to work. We will unset this in the outer `execute()`. *completed_query.revisions.verified_final.get_mut() = false; self.insert_memo( diff --git a/src/function/maybe_changed_after.rs b/src/function/maybe_changed_after.rs index 4f69655cd..bbe62a608 100644 --- a/src/function/maybe_changed_after.rs +++ b/src/function/maybe_changed_after.rs @@ -2,10 +2,10 @@ use rustc_hash::FxHashMap; #[cfg(feature = "accumulator")] use crate::accumulator::accumulated_map::InputAccumulatedValues; -use crate::cycle::{CycleRecoveryStrategy, ProvisionalStatus}; -use crate::function::memo::Memo; +use crate::cycle::{CycleHeads, CycleRecoveryStrategy, ProvisionalStatus}; +use crate::function::memo::{Memo, TryClaimCycleHeadsIter, TryClaimHeadsResult}; use crate::function::sync::ClaimResult; -use crate::function::{Configuration, IngredientImpl}; +use crate::function::{Configuration, IngredientImpl, Reentrant}; use crate::key::DatabaseKeyIndex; use crate::sync::atomic::Ordering; @@ -141,7 +141,7 @@ where ) -> Option { let database_key_index = self.database_key_index(key_index); - let _claim_guard = match self.sync_table.try_claim(zalsa, key_index) { + let claim_guard = match self.sync_table.try_claim(zalsa, key_index, Reentrant::Deny) { ClaimResult::Claimed(guard) => guard, ClaimResult::Running(blocked_on) => { blocked_on.block_on(zalsa); @@ -175,10 +175,8 @@ where // If `validate_maybe_provisional` returns `true`, but only because all cycle heads are from the same iteration, // carry over the cycle heads so that the caller verifies them. - if old_memo.may_be_provisional() { - for head in old_memo.cycle_heads() { - cycle_heads.insert_head(head.database_key_index); - } + for head in old_memo.cycle_heads() { + cycle_heads.insert_head(head.database_key_index); } return Some(if old_memo.revisions.changed_at > revision { @@ -227,7 +225,7 @@ where // `in_cycle` tracks if the enclosing query is in a cycle. `deep_verify.cycle_heads` tracks // if **this query** encountered a cycle (which means there's some provisional value somewhere floating around). if old_memo.value.is_some() && !cycle_heads.has_any() { - let memo = self.execute(db, zalsa, zalsa_local, database_key_index, Some(old_memo)); + let memo = self.execute(db, claim_guard, zalsa_local, Some(old_memo)); let changed_at = memo.revisions.changed_at; // Always assume that a provisional value has changed. @@ -323,12 +321,11 @@ where } let last_changed = zalsa.last_changed_revision(memo.revisions.durability); - crate::tracing::debug!( - "{database_key_index:?}: check_durability(memo = {memo:#?}, last_changed={:?} <= verified_at={:?}) = {:?}", + crate::tracing::trace!( + "{database_key_index:?}: check_durability({database_key_index:#?}, last_changed={:?} <= verified_at={:?}) = {:?}", last_changed, verified_at, last_changed <= verified_at, - memo = memo.tracing_debug() ); if last_changed <= verified_at { // No input of the suitable durability has changed since last verified. @@ -365,28 +362,48 @@ where database_key_index: DatabaseKeyIndex, memo: &Memo<'_, C>, ) -> bool { - !memo.may_be_provisional() - || self.validate_provisional(zalsa, database_key_index, memo) - || self.validate_same_iteration(zalsa, zalsa_local, database_key_index, memo) + if !memo.may_be_provisional() { + return true; + } + + let cycle_heads = memo.cycle_heads(); + + if cycle_heads.is_empty() { + return true; + } + + crate::tracing::trace!( + "{database_key_index:?}: validate_may_be_provisional(memo = {memo:#?})", + memo = memo.tracing_debug() + ); + + let verified_at = memo.verified_at.load(); + + self.validate_provisional(zalsa, database_key_index, memo, verified_at, cycle_heads) + || self.validate_same_iteration( + zalsa, + zalsa_local, + database_key_index, + verified_at, + cycle_heads, + ) } /// Check if this memo's cycle heads have all been finalized. If so, mark it verified final and /// return true, if not return false. - #[inline] fn validate_provisional( &self, zalsa: &Zalsa, database_key_index: DatabaseKeyIndex, memo: &Memo<'_, C>, + memo_verified_at: Revision, + cycle_heads: &CycleHeads, ) -> bool { crate::tracing::trace!( - "{database_key_index:?}: validate_provisional(memo = {memo:#?})", - memo = memo.tracing_debug() + "{database_key_index:?}: validate_provisional({database_key_index:?})", ); - let memo_verified_at = memo.verified_at.load(); - - for cycle_head in memo.revisions.cycle_heads() { + for cycle_head in cycle_heads { // Test if our cycle heads (with the same revision) are now finalized. let Some(kind) = zalsa .lookup_ingredient(cycle_head.database_key_index.ingredient_index()) @@ -413,7 +430,7 @@ where // // If we don't account for the iteration, then `a` (from iteration 0) will be finalized // because its cycle head `b` is now finalized, but `b` never pulled `a` in the last iteration. - if iteration != cycle_head.iteration_count { + if iteration != cycle_head.iteration_count.load() { return false; } @@ -449,92 +466,61 @@ where &self, zalsa: &Zalsa, zalsa_local: &ZalsaLocal, - database_key_index: DatabaseKeyIndex, - memo: &Memo<'_, C>, + memo_database_key_index: DatabaseKeyIndex, + memo_verified_at: Revision, + cycle_heads: &CycleHeads, ) -> bool { - crate::tracing::trace!( - "{database_key_index:?}: validate_same_iteration(memo = {memo:#?})", - memo = memo.tracing_debug() - ); - - let cycle_heads = memo.revisions.cycle_heads(); - if cycle_heads.is_empty() { - return true; - } - - let verified_at = memo.verified_at.load(); + crate::tracing::trace!("validate_same_iteration({memo_database_key_index:?})",); // This is an optimization to avoid unnecessary re-execution within the same revision. // Don't apply it when verifying memos from past revisions. We want them to re-execute // to verify their cycle heads and all participating queries. - if verified_at != zalsa.current_revision() { + if memo_verified_at != zalsa.current_revision() { return false; } - // SAFETY: We do not access the query stack reentrantly. - unsafe { - zalsa_local.with_query_stack_unchecked(|stack| { - cycle_heads.iter().all(|cycle_head| { + // Always return `false` for cycle initial values "unless" they are running in the same thread. + if cycle_heads + .iter() + .all(|head| head.database_key_index == memo_database_key_index) + { + // SAFETY: We do not access the query stack reentrantly. + let on_stack = unsafe { + zalsa_local.with_query_stack_unchecked(|stack| { stack .iter() .rev() - .find(|query| query.database_key_index == cycle_head.database_key_index) - .map(|query| query.iteration_count()) - .or_else(|| { - // If the cycle head isn't on our stack because: - // - // * another thread holds the lock on the cycle head (but it waits for the current query to complete) - // * we're in `maybe_changed_after` because `maybe_changed_after` doesn't modify the cycle stack - // - // check if the latest memo has the same iteration count. - - // However, we've to be careful to skip over fixpoint initial values: - // If the head is the memo we're trying to validate, always return `None` - // to force a re-execution of the query. This is necessary because the query - // has obviously not completed its iteration yet. - // - // This should be rare but the `cycle_panic` test fails on some platforms (mainly GitHub actions) - // without this check. What happens there is that: - // - // * query a blocks on query b - // * query b tries to claim a, fails to do so and inserts the fixpoint initial value - // * query b completes and has `a` as head. It returns its query result Salsa blocks query b from - // exiting inside `block_on` (or the thread would complete before the cycle iteration is complete) - // * query a resumes but panics because of the fixpoint iteration function - // * query b resumes. It rexecutes its own query which then tries to fetch a (which depends on itself because it's a fixpoint initial value). - // Without this check, `validate_same_iteration` would return `true` because the latest memo for `a` is the fixpoint initial value. - // But it should return `false` so that query b's thread re-executes `a` (which then also causes the panic). - // - // That's why we always return `None` if the cycle head is the same as the current database key index. - if cycle_head.database_key_index == database_key_index { - return None; - } + .any(|query| query.database_key_index == memo_database_key_index) + }) + }; - let ingredient = zalsa.lookup_ingredient( - cycle_head.database_key_index.ingredient_index(), - ); - let wait_result = ingredient - .wait_for(zalsa, cycle_head.database_key_index.key_index()); + return on_stack; + } - if !wait_result.is_cycle() { - return None; - } + let cycle_heads_iter = TryClaimCycleHeadsIter::new(zalsa, zalsa_local, cycle_heads); - let provisional_status = ingredient.provisional_status( - zalsa, - cycle_head.database_key_index.key_index(), - )?; + for cycle_head in cycle_heads_iter { + match cycle_head { + TryClaimHeadsResult::Cycle { + head_iteration_count, + memo_iteration_count: current_iteration_count, + verified_at: head_verified_at, + } => { + if head_verified_at != memo_verified_at { + return false; + } - if provisional_status.verified_at() == Some(verified_at) { - provisional_status.iteration() - } else { - None - } - }) - == Some(cycle_head.iteration_count) - }) - }) + if head_iteration_count != current_iteration_count { + return false; + } + } + _ => { + return false; + } + } } + + true } /// VerifyResult::Unchanged if the memo's value and `changed_at` time is up-to-date in the @@ -553,6 +539,12 @@ where cycle_heads: &mut VerifyCycleHeads, can_shallow_update: ShallowUpdate, ) -> VerifyResult { + // If the value is from the same revision but is still provisional, consider it changed + // because we're now in a new iteration. + if can_shallow_update == ShallowUpdate::Verified && old_memo.may_be_provisional() { + return VerifyResult::changed(); + } + crate::tracing::debug!( "{database_key_index:?}: deep_verify_memo(old_memo = {old_memo:#?})", old_memo = old_memo.tracing_debug() @@ -562,12 +554,6 @@ where match old_memo.revisions.origin.as_ref() { QueryOriginRef::Derived(edges) => { - // If the value is from the same revision but is still provisional, consider it changed - // because we're now in a new iteration. - if can_shallow_update == ShallowUpdate::Verified && old_memo.may_be_provisional() { - return VerifyResult::changed(); - } - #[cfg(feature = "accumulator")] let mut inputs = InputAccumulatedValues::Empty; let mut child_cycle_heads = Vec::new(); diff --git a/src/function/memo.rs b/src/function/memo.rs index 793f4832a..302ca73c3 100644 --- a/src/function/memo.rs +++ b/src/function/memo.rs @@ -3,10 +3,11 @@ use std::fmt::{Debug, Formatter}; use std::mem::transmute; use std::ptr::NonNull; -use crate::cycle::{empty_cycle_heads, CycleHead, CycleHeads, IterationCount, ProvisionalStatus}; +use crate::cycle::{ + empty_cycle_heads, CycleHeads, CycleHeadsIterator, IterationCount, ProvisionalStatus, +}; use crate::function::{Configuration, IngredientImpl}; -use crate::hash::FxHashSet; -use crate::ingredient::{Ingredient, WaitForResult}; +use crate::ingredient::WaitForResult; use crate::key::DatabaseKeyIndex; use crate::revision::AtomicRevision; use crate::runtime::Running; @@ -143,21 +144,23 @@ impl<'db, C: Configuration> Memo<'db, C> { zalsa: &Zalsa, zalsa_local: &ZalsaLocal, database_key_index: DatabaseKeyIndex, + retry_count: &mut u32, ) -> bool { - if self.revisions.cycle_heads().is_empty() { - return false; - } - - if !self.may_be_provisional() { - return false; - }; - if self.block_on_heads(zalsa, zalsa_local) { // If we get here, we are a provisional value of // the cycle head (either initial value, or from a later iteration) and should be // returned to caller to allow fixpoint iteration to proceed. false } else { + assert!( + *retry_count <= 20000, + "Provisional memo retry limit exceeded for {database_key_index:?}; \ + this usually indicates a bug in salsa's cycle caching/locking. \ + (retried {retry_count} times)", + ); + + *retry_count += 1; + // all our cycle heads are complete; re-fetch // and we should get a non-provisional memo. crate::tracing::debug!( @@ -176,33 +179,50 @@ impl<'db, C: Configuration> Memo<'db, C> { // IMPORTANT: If you make changes to this function, make sure to run `cycle_nested_deep` with // shuttle with at least 10k iterations. - // The most common case is that the entire cycle is running in the same thread. - // If that's the case, short circuit and return `true` immediately. - if self.all_cycles_on_stack(zalsa_local) { + let cycle_heads = self.cycle_heads(); + if cycle_heads.is_empty() { return true; } - // Otherwise, await all cycle heads, recursively. - return block_on_heads_cold(zalsa, self.cycle_heads()); + return block_on_heads_cold(zalsa, zalsa_local, cycle_heads); #[inline(never)] - fn block_on_heads_cold(zalsa: &Zalsa, heads: &CycleHeads) -> bool { + fn block_on_heads_cold( + zalsa: &Zalsa, + zalsa_local: &ZalsaLocal, + heads: &CycleHeads, + ) -> bool { let _entered = crate::tracing::debug_span!("block_on_heads").entered(); - let mut cycle_heads = TryClaimCycleHeadsIter::new(zalsa, heads); + let cycle_heads = TryClaimCycleHeadsIter::new(zalsa, zalsa_local, heads); let mut all_cycles = true; - while let Some(claim_result) = cycle_heads.next() { + for claim_result in cycle_heads { match claim_result { - TryClaimHeadsResult::Cycle => {} - TryClaimHeadsResult::Finalized => { - all_cycles = false; + TryClaimHeadsResult::Cycle { + memo_iteration_count: current_iteration_count, + head_iteration_count, + .. + } => { + // We need to refetch if the head now has a new iteration count. + // This is to avoid a race between thread A and B: + // * thread A is in `blocks_on` (`retry_provisional`) for the memo `c`. It owns the lock for `e` + // * thread B owns `d` and calls `c`. `c` didn't depend on `e` in the first iteration. + // Thread B completes the first iteration (which bumps the iteration count on `c`). + // `c` now depends on E in the second iteration, introducing a new cycle head. + // Thread B transfers ownership of `c` to thread A (which awakes A). + // * Thread A now continues, there are no other cycle heads, so all queries result in a cycle. + // However, `d` has now a new iteration count, so it's important that we refetch `c`. + + if current_iteration_count != head_iteration_count { + all_cycles = false; + } } TryClaimHeadsResult::Available => { all_cycles = false; } TryClaimHeadsResult::Running(running) => { all_cycles = false; - running.block_on(&mut cycle_heads); + running.block_on(zalsa); } } } @@ -211,51 +231,6 @@ impl<'db, C: Configuration> Memo<'db, C> { } } - /// Tries to claim all cycle heads to see if they're finalized or available. - /// - /// Unlike `block_on_heads`, this code does not block on any cycle head. Instead it returns `false` if - /// claiming all cycle heads failed because one of them is running on another thread. - pub(super) fn try_claim_heads(&self, zalsa: &Zalsa, zalsa_local: &ZalsaLocal) -> bool { - let _entered = crate::tracing::debug_span!("try_claim_heads").entered(); - if self.all_cycles_on_stack(zalsa_local) { - return true; - } - - let cycle_heads = TryClaimCycleHeadsIter::new(zalsa, self.revisions.cycle_heads()); - - for claim_result in cycle_heads { - match claim_result { - TryClaimHeadsResult::Cycle - | TryClaimHeadsResult::Finalized - | TryClaimHeadsResult::Available => {} - TryClaimHeadsResult::Running(_) => { - return false; - } - } - } - - true - } - - fn all_cycles_on_stack(&self, zalsa_local: &ZalsaLocal) -> bool { - let cycle_heads = self.revisions.cycle_heads(); - if cycle_heads.is_empty() { - return true; - } - - // SAFETY: We do not access the query stack reentrantly. - unsafe { - zalsa_local.with_query_stack_unchecked(|stack| { - cycle_heads.iter().all(|cycle_head| { - stack - .iter() - .rev() - .any(|query| query.database_key_index == cycle_head.database_key_index) - }) - }) - } - } - /// Cycle heads that should be propagated to dependent queries. #[inline(always)] pub(super) fn cycle_heads(&self) -> &CycleHeads { @@ -473,118 +448,111 @@ mod persistence { } pub(super) enum TryClaimHeadsResult<'me> { - /// Claiming every cycle head results in a cycle head. - Cycle, - - /// The cycle head has been finalized. - Finalized, + /// Claiming the cycle head results in a cycle. + Cycle { + head_iteration_count: IterationCount, + memo_iteration_count: IterationCount, + verified_at: Revision, + }, /// The cycle head is not finalized, but it can be claimed. Available, /// The cycle head is currently executed on another thread. - Running(RunningCycleHead<'me>), -} - -pub(super) struct RunningCycleHead<'me> { - inner: Running<'me>, - ingredient: &'me dyn Ingredient, -} - -impl<'a> RunningCycleHead<'a> { - fn block_on(self, cycle_heads: &mut TryClaimCycleHeadsIter<'a>) { - let key_index = self.inner.database_key().key_index(); - self.inner.block_on(cycle_heads.zalsa); - - cycle_heads.queue_ingredient_heads(self.ingredient, key_index); - } + Running(Running<'me>), } /// Iterator to try claiming the transitive cycle heads of a memo. -struct TryClaimCycleHeadsIter<'a> { +pub(super) struct TryClaimCycleHeadsIter<'a> { zalsa: &'a Zalsa, - queue: Vec, - queued: FxHashSet, + zalsa_local: &'a ZalsaLocal, + cycle_heads: CycleHeadsIterator<'a>, } impl<'a> TryClaimCycleHeadsIter<'a> { - fn new(zalsa: &'a Zalsa, heads: &CycleHeads) -> Self { - let queue: Vec<_> = heads.iter().copied().collect(); - let queued: FxHashSet<_> = queue.iter().copied().collect(); - + pub(super) fn new( + zalsa: &'a Zalsa, + zalsa_local: &'a ZalsaLocal, + cycle_heads: &'a CycleHeads, + ) -> Self { Self { zalsa, - queue, - queued, + zalsa_local, + cycle_heads: cycle_heads.iter(), } } - - fn queue_ingredient_heads(&mut self, ingredient: &dyn Ingredient, key: Id) { - // Recursively wait for all cycle heads that this head depends on. It's important - // that we fetch those from the updated memo because the cycle heads can change - // between iterations and new cycle heads can be added if a query depeonds on - // some cycle heads depending on a specific condition being met - // (`a` calls `b` and `c` in iteration 0 but `c` and `d` in iteration 1 or later). - // IMPORTANT: It's critical that we get the cycle head from the latest memo - // here, in case the memo has become part of another cycle (we need to block on that too!). - self.queue.extend( - ingredient - .cycle_heads(self.zalsa, key) - .iter() - .copied() - .filter(|head| self.queued.insert(*head)), - ) - } } impl<'me> Iterator for TryClaimCycleHeadsIter<'me> { type Item = TryClaimHeadsResult<'me>; fn next(&mut self) -> Option { - let head = self.queue.pop()?; + let head = self.cycle_heads.next()?; let head_database_key = head.database_key_index; + let head_iteration_count = head.iteration_count.load(); + + // The most common case is that the head is already in the query stack. So let's check that first. + // SAFETY: We do not access the query stack reentrantly. + if let Some(current_iteration_count) = unsafe { + self.zalsa_local.with_query_stack_unchecked(|stack| { + stack + .iter() + .rev() + .find(|query| query.database_key_index == head_database_key) + .map(|query| query.iteration_count()) + }) + } { + crate::tracing::trace!( + "Waiting for {head_database_key:?} results in a cycle (because it is already in the query stack)" + ); + return Some(TryClaimHeadsResult::Cycle { + head_iteration_count, + memo_iteration_count: current_iteration_count, + verified_at: self.zalsa.current_revision(), + }); + } + let head_key_index = head_database_key.key_index(); let ingredient = self .zalsa .lookup_ingredient(head_database_key.ingredient_index()); - let cycle_head_kind = ingredient - .provisional_status(self.zalsa, head_key_index) - .unwrap_or(ProvisionalStatus::Provisional { - iteration: IterationCount::initial(), - verified_at: Revision::start(), - }); + match ingredient.wait_for(self.zalsa, head_key_index) { + WaitForResult::Cycle { .. } => { + // We hit a cycle blocking on the cycle head; this means this query actively + // participates in the cycle and some other query is blocked on this thread. + crate::tracing::trace!("Waiting for {head_database_key:?} results in a cycle"); + + let provisional_status = ingredient + .provisional_status(self.zalsa, head_key_index) + .expect("cycle head memo to exist"); + let (current_iteration_count, verified_at) = match provisional_status { + ProvisionalStatus::Provisional { + iteration, + verified_at, + } + | ProvisionalStatus::Final { + iteration, + verified_at, + } => (iteration, verified_at), + ProvisionalStatus::FallbackImmediate => { + (IterationCount::initial(), self.zalsa.current_revision()) + } + }; - match cycle_head_kind { - ProvisionalStatus::Final { .. } | ProvisionalStatus::FallbackImmediate => { - // This cycle is already finalized, so we don't need to wait on it; - // keep looping through cycle heads. - crate::tracing::trace!("Dependent cycle head {head:?} has been finalized."); - Some(TryClaimHeadsResult::Finalized) + Some(TryClaimHeadsResult::Cycle { + memo_iteration_count: current_iteration_count, + head_iteration_count, + verified_at, + }) } - ProvisionalStatus::Provisional { .. } => { - match ingredient.wait_for(self.zalsa, head_key_index) { - WaitForResult::Cycle { .. } => { - // We hit a cycle blocking on the cycle head; this means this query actively - // participates in the cycle and some other query is blocked on this thread. - crate::tracing::debug!("Waiting for {head:?} results in a cycle"); - Some(TryClaimHeadsResult::Cycle) - } - WaitForResult::Running(running) => { - crate::tracing::debug!("Ingredient {head:?} is running: {running:?}"); + WaitForResult::Running(running) => { + crate::tracing::trace!("Ingredient {head_database_key:?} is running: {running:?}"); - Some(TryClaimHeadsResult::Running(RunningCycleHead { - inner: running, - ingredient, - })) - } - WaitForResult::Available => { - self.queue_ingredient_heads(ingredient, head_key_index); - Some(TryClaimHeadsResult::Available) - } - } + Some(TryClaimHeadsResult::Running(running)) } + WaitForResult::Available => Some(TryClaimHeadsResult::Available), } } } diff --git a/src/function/sync.rs b/src/function/sync.rs index 0a88844af..09225cde7 100644 --- a/src/function/sync.rs +++ b/src/function/sync.rs @@ -1,9 +1,13 @@ use rustc_hash::FxHashMap; +use std::collections::hash_map::OccupiedEntry; use crate::key::DatabaseKeyIndex; -use crate::runtime::{BlockResult, Running, WaitResult}; -use crate::sync::thread::{self, ThreadId}; +use crate::runtime::{ + BlockOnTransferredOwner, BlockResult, BlockTransferredResult, Running, WaitResult, +}; +use crate::sync::thread::{self}; use crate::sync::Mutex; +use crate::tracing; use crate::zalsa::Zalsa; use crate::{Id, IngredientIndex}; @@ -20,17 +24,36 @@ pub(crate) enum ClaimResult<'a> { /// Can't claim the query because it is running on an other thread. Running(Running<'a>), /// Claiming the query results in a cycle. - Cycle, + Cycle { + /// `true` if this is a cycle with an inner query. For example, if `a` transferred its ownership to + /// `b`. If the thread claiming `b` tries to claim `a`, then this results in a cycle except when calling + /// [`SyncTable::try_claim`] with [`Reentrant::Allow`]. + inner: bool, + }, /// Successfully claimed the query. Claimed(ClaimGuard<'a>), } pub(crate) struct SyncState { - id: ThreadId, + /// The thread id that is owning this query (actively executing it or iterating it as part of a larger cycle). + id: SyncOwnerId, /// Set to true if any other queries are blocked, /// waiting for this query to complete. anyone_waiting: bool, + + /// Whether any other query has transferred its lock ownership to this query. + /// This is only an optimization so that the expensive unblocking of transferred queries + /// can be skipped if `false`. This field might be `true` in cases where queries *were* transferred + /// to this query, but have since then been transferred to another query (in a later iteration). + is_transfer_target: bool, + + /// Whether this query has been claimed by the query that currently owns it. + /// + /// If `a` has been transferred to `b` and the stack for t1 is `b -> a`, then `a` can be claimed + /// and `claimed_transferred` is set to `true`. However, t2 won't be able to claim `a` because + /// it doesn't own `b`. + claimed_twice: bool, } impl SyncTable { @@ -41,14 +64,34 @@ impl SyncTable { } } - pub(crate) fn try_claim<'me>(&'me self, zalsa: &'me Zalsa, key_index: Id) -> ClaimResult<'me> { + /// Claims the given key index, or blocks if it is running on another thread. + pub(crate) fn try_claim<'me>( + &'me self, + zalsa: &'me Zalsa, + key_index: Id, + reentrant: Reentrant, + ) -> ClaimResult<'me> { let mut write = self.syncs.lock(); match write.entry(key_index) { std::collections::hash_map::Entry::Occupied(occupied_entry) => { + let id = match occupied_entry.get().id { + SyncOwnerId::Thread(id) => id, + SyncOwnerId::Transferred => { + return match self.try_claim_transferred(zalsa, occupied_entry, reentrant) { + Ok(claimed) => claimed, + Err(other_thread) => match other_thread.block(write) { + BlockResult::Cycle => ClaimResult::Cycle { inner: false }, + BlockResult::Running(running) => ClaimResult::Running(running), + }, + } + } + }; + let &mut SyncState { - id, ref mut anyone_waiting, + .. } = occupied_entry.into_mut(); + // NB: `Ordering::Relaxed` is sufficient here, // as there are no loads that are "gated" on this // value. Everything that is written is also protected @@ -62,22 +105,112 @@ impl SyncTable { write, ) { BlockResult::Running(blocked_on) => ClaimResult::Running(blocked_on), - BlockResult::Cycle => ClaimResult::Cycle, + BlockResult::Cycle => ClaimResult::Cycle { inner: false }, } } std::collections::hash_map::Entry::Vacant(vacant_entry) => { vacant_entry.insert(SyncState { - id: thread::current().id(), + id: SyncOwnerId::Thread(thread::current().id()), anyone_waiting: false, + is_transfer_target: false, + claimed_twice: false, }); ClaimResult::Claimed(ClaimGuard { key_index, zalsa, sync_table: self, + mode: ReleaseMode::Default, }) } } } + + #[cold] + #[inline(never)] + fn try_claim_transferred<'me>( + &'me self, + zalsa: &'me Zalsa, + mut entry: OccupiedEntry, + reentrant: Reentrant, + ) -> Result, Box>> { + let key_index = *entry.key(); + let database_key_index = DatabaseKeyIndex::new(self.ingredient, key_index); + let thread_id = thread::current().id(); + + match zalsa + .runtime() + .block_transferred(database_key_index, thread_id) + { + BlockTransferredResult::ImTheOwner if reentrant.is_allow() => { + let SyncState { + id, claimed_twice, .. + } = entry.into_mut(); + debug_assert!(!*claimed_twice); + + *id = SyncOwnerId::Thread(thread_id); + *claimed_twice = true; + + Ok(ClaimResult::Claimed(ClaimGuard { + key_index, + zalsa, + sync_table: self, + mode: ReleaseMode::SelfOnly, + })) + } + BlockTransferredResult::ImTheOwner => Ok(ClaimResult::Cycle { inner: true }), + BlockTransferredResult::OwnedBy(other_thread) => { + entry.get_mut().anyone_waiting = true; + Err(other_thread) + } + BlockTransferredResult::Released => { + entry.insert(SyncState { + id: SyncOwnerId::Thread(thread_id), + anyone_waiting: false, + is_transfer_target: false, + claimed_twice: false, + }); + Ok(ClaimResult::Claimed(ClaimGuard { + key_index, + zalsa, + sync_table: self, + mode: ReleaseMode::Default, + })) + } + } + } + + /// Marks `key_index` as a transfer target. + /// + /// Returns the `SyncOwnerId` of the thread that currently owns this query. + /// + /// Note: The result of this method will immediately become stale unless the thread owning `key_index` + /// is currently blocked on this thread (claiming `key_index` from this thread results in a cycle). + pub(super) fn mark_as_transfer_target(&self, key_index: Id) -> Option { + let mut syncs = self.syncs.lock(); + syncs.get_mut(&key_index).map(|state| { + state.anyone_waiting = true; + state.is_transfer_target = true; + + state.id + }) + } +} + +#[derive(Copy, Clone, Debug)] +pub enum SyncOwnerId { + /// Query is owned by this thread + Thread(thread::ThreadId), + + /// The query's lock ownership has been transferred to another query. + /// E.g. if `a` transfers its ownership to `b`, then only the thread in the critical path + /// to complete b` can claim `a` (in most instances, only the thread owning `b` can claim `a`). + /// + /// The thread owning `a` is stored in the `DependencyGraph`. + /// + /// A query can be marked as `Transferred` even if it has since then been released by the owning query. + /// In that case, the query is effectively unclaimed and the `Transferred` state is stale. The reason + /// for this is that it avoids the need for locking each sync table when releasing the transferred queries. + Transferred, } /// Marks an active 'claim' in the synchronization map. The claim is @@ -87,33 +220,147 @@ pub(crate) struct ClaimGuard<'me> { key_index: Id, zalsa: &'me Zalsa, sync_table: &'me SyncTable, + mode: ReleaseMode, } -impl ClaimGuard<'_> { - fn remove_from_map_and_unblock_queries(&self) { +impl<'me> ClaimGuard<'me> { + pub(crate) const fn zalsa(&self) -> &'me Zalsa { + self.zalsa + } + + pub(crate) const fn database_key_index(&self) -> DatabaseKeyIndex { + DatabaseKeyIndex::new(self.sync_table.ingredient, self.key_index) + } + + pub(crate) fn set_release_mode(&mut self, mode: ReleaseMode) { + self.mode = mode; + } + + #[cold] + #[inline(never)] + fn release_panicking(&self) { let mut syncs = self.sync_table.syncs.lock(); + let state = syncs.remove(&self.key_index).expect("key claimed twice?"); + tracing::debug!( + "Release claim on {:?} due to panic", + self.database_key_index() + ); + + self.release(state, WaitResult::Panicked); + } + + #[inline(always)] + fn release(&self, state: SyncState, wait_result: WaitResult) { + let SyncState { + anyone_waiting, + is_transfer_target, + claimed_twice, + .. + } = state; + + if !anyone_waiting { + return; + } + + let runtime = self.zalsa.runtime(); + let database_key_index = self.database_key_index(); - let SyncState { anyone_waiting, .. } = - syncs.remove(&self.key_index).expect("key claimed twice?"); - - if anyone_waiting { - let database_key = DatabaseKeyIndex::new(self.sync_table.ingredient, self.key_index); - self.zalsa.runtime().unblock_queries_blocked_on( - database_key, - if thread::panicking() { - tracing::info!("Unblocking queries blocked on {database_key:?} after a panick"); - WaitResult::Panicked - } else { - WaitResult::Completed - }, - ) + if claimed_twice { + runtime.undo_transfer_lock(database_key_index); } + + if is_transfer_target { + runtime.unblock_transferred_queries_owned_by(database_key_index, wait_result); + } + + runtime.unblock_queries_blocked_on(database_key_index, wait_result); + } + + #[cold] + #[inline(never)] + fn release_self(&self) { + let mut syncs = self.sync_table.syncs.lock(); + let std::collections::hash_map::Entry::Occupied(mut state) = syncs.entry(self.key_index) + else { + panic!("key should only be claimed/released once"); + }; + + if state.get().claimed_twice { + state.get_mut().claimed_twice = false; + state.get_mut().id = SyncOwnerId::Transferred; + } else { + self.release(state.remove(), WaitResult::Completed); + } + } + + #[cold] + #[inline(never)] + pub(crate) fn transfer(&self, new_owner: DatabaseKeyIndex) { + let owner_ingredient = self.zalsa.lookup_ingredient(new_owner.ingredient_index()); + + // Get the owning thread of `new_owner`. + // The thread id is guaranteed to not be stale because `new_owner` must be blocked on `self_key` + // or `transfer_lock` will panic (at least in debug builds). + let Some(new_owner_thread_id) = + owner_ingredient.mark_as_transfer_target(new_owner.key_index()) + else { + self.release( + self.sync_table + .syncs + .lock() + .remove(&self.key_index) + .expect("key should only be claimed/released once"), + WaitResult::Panicked, + ); + + panic!("new owner to be a locked query") + }; + + let mut syncs = self.sync_table.syncs.lock(); + + let self_key = self.database_key_index(); + tracing::debug!( + "Transferring lock ownership of {self_key:?} to {new_owner:?} ({new_owner_thread_id:?})" + ); + + let SyncState { + id, claimed_twice, .. + } = syncs + .get_mut(&self.key_index) + .expect("key should only be claimed/released once"); + + self.zalsa + .runtime() + .transfer_lock(self_key, new_owner, new_owner_thread_id); + + *id = SyncOwnerId::Transferred; + *claimed_twice = false; } } impl Drop for ClaimGuard<'_> { fn drop(&mut self) { - self.remove_from_map_and_unblock_queries() + if thread::panicking() { + self.release_panicking(); + return; + } + + match self.mode { + ReleaseMode::Default => { + let mut syncs = self.sync_table.syncs.lock(); + let state = syncs + .remove(&self.key_index) + .expect("key should only be claimed/released once"); + + self.release(state, WaitResult::Completed); + } + ReleaseMode::SelfOnly => { + self.release_self(); + } + ReleaseMode::TransferTo(new_owner) => { + self.transfer(new_owner); + } + } } } @@ -122,3 +369,60 @@ impl std::fmt::Debug for SyncTable { f.debug_struct("SyncTable").finish() } } + +/// Controls how the lock is released when the `ClaimGuard` is dropped. +#[derive(Copy, Clone, Debug, Default)] +pub(crate) enum ReleaseMode { + /// The default release mode. + /// + /// Releases the query for which this claim guard holds the lock and any queries that have + /// transferred ownership to this query. + #[default] + Default, + + /// Only releases the lock for this query. Any query that has transferred ownership to this query + /// will remain locked. + /// + /// If this thread panics, the query will be released as normal (default mode). + SelfOnly, + + /// Transfers the ownership of the lock to the specified query. + /// + /// The query will remain locked and only the thread owning the transfer target will be resumed. + /// + /// The transfer target must be a query that's blocked on this query to guarantee that the transfer target doesn't complete + /// before the transfer is finished (which would leave this query locked forever). + /// + /// If this thread panics, the query will be released as normal (default mode). + TransferTo(DatabaseKeyIndex), +} + +impl std::fmt::Debug for ClaimGuard<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClaimGuard") + .field("key_index", &self.key_index) + .field("mode", &self.mode) + .finish_non_exhaustive() + } +} + +/// Controls whether this thread can claim a query that transferred its ownership to a query +/// this thread currently holds the lock for. +/// +/// For example: if query `a` transferred its ownership to query `b`, and this thread holds +/// the lock for `b`, then this thread can also claim `a` — but only when using [`Self::Allow`]. +#[derive(Copy, Clone, PartialEq, Eq)] +pub(crate) enum Reentrant { + /// Allow `try_claim` to reclaim a query's that transferred its ownership to a query + /// hold by this thread. + Allow, + + /// Only allow claiming queries that haven't been claimed by any thread. + Deny, +} + +impl Reentrant { + const fn is_allow(self) -> bool { + matches!(self, Reentrant::Allow) + } +} diff --git a/src/ingredient.rs b/src/ingredient.rs index 3cf36ae61..7999b08fb 100644 --- a/src/ingredient.rs +++ b/src/ingredient.rs @@ -1,7 +1,7 @@ use std::any::{Any, TypeId}; use std::fmt; -use crate::cycle::{empty_cycle_heads, CycleHeads, CycleRecoveryStrategy, ProvisionalStatus}; +use crate::cycle::{empty_cycle_heads, CycleHeads, IterationCount, ProvisionalStatus}; use crate::database::RawDatabase; use crate::function::{VerifyCycleHeads, VerifyResult}; use crate::hash::{FxHashSet, FxIndexSet}; @@ -93,9 +93,19 @@ pub trait Ingredient: Any + std::fmt::Debug + Send + Sync { /// on an other thread, it's up to caller to block until the result becomes available if desired. /// A return value of [`WaitForResult::Cycle`] means that a cycle was encountered; the waited-on query is either already claimed /// by the current thread, or by a thread waiting on the current thread. - fn wait_for<'me>(&'me self, zalsa: &'me Zalsa, key_index: Id) -> WaitForResult<'me> { - _ = (zalsa, key_index); - WaitForResult::Available + fn wait_for<'me>(&'me self, _zalsa: &'me Zalsa, _key_index: Id) -> WaitForResult<'me> { + unreachable!( + "wait_for should only be called on cycle heads and only functions can be cycle heads" + ); + } + + /// Invoked when a query transfers its lock-ownership to `_key_index`. Returns the thread + /// owning the lock for `_key_index` or `None` if `_key_index` is not claimed. + /// + /// Note: The returned `SyncOwnerId` may be outdated as soon as this function returns **unless** + /// it's guaranteed that `_key_index` is blocked on the current thread. + fn mark_as_transfer_target(&self, _key_index: Id) -> Option { + unreachable!("mark_as_transfer_target should only be called on functions"); } /// Invoked when the value `output_key` should be marked as valid in the current revision. @@ -157,11 +167,27 @@ pub trait Ingredient: Any + std::fmt::Debug + Send + Sync { } // Function ingredient methods - /// If this ingredient is a participant in a cycle, what is its cycle recovery strategy? - /// (Really only relevant to [`crate::function::FunctionIngredient`], - /// since only function ingredients push themselves onto the active query stack.) - fn cycle_recovery_strategy(&self) -> CycleRecoveryStrategy { - unreachable!("only function ingredients can be part of a cycle") + /// Tests if the (nested) cycle head `_input` has converged in the most recent iteration. + /// + /// Returns `false` if the Memo doesn't exist or if called on a non-cycle head. + fn cycle_converged(&self, _zalsa: &Zalsa, _input: Id) -> bool { + unreachable!("cycle_converged should only be called on cycle heads and only functions can be cycle heads"); + } + + /// Updates the iteration count for the (nested) cycle head `_input` to `iteration_count`. + /// + /// This is a no-op if the memo doesn't exist or if called on a Memo without cycle heads. + fn set_cycle_iteration_count( + &self, + _zalsa: &Zalsa, + _input: Id, + _iteration_count: IterationCount, + ) { + unreachable!("increment_iteration_count should only be called on cycle heads and only functions can be cycle heads"); + } + + fn finalize_cycle_head(&self, _zalsa: &Zalsa, _input: Id) { + unreachable!("finalize_cycle_head should only be called on cycle heads and only functions can be cycle heads"); } /// What were the inputs (if any) that were used to create the value at `key_index`. @@ -302,14 +328,9 @@ pub(crate) fn fmt_index(debug_name: &str, id: Id, fmt: &mut fmt::Formatter<'_>) write!(fmt, "{debug_name}({id:?})") } +#[derive(Debug)] pub enum WaitForResult<'me> { Running(Running<'me>), Available, - Cycle, -} - -impl WaitForResult<'_> { - pub const fn is_cycle(&self) -> bool { - matches!(self, WaitForResult::Cycle) - } + Cycle { inner: bool }, } diff --git a/src/key.rs b/src/key.rs index 82d922565..364015756 100644 --- a/src/key.rs +++ b/src/key.rs @@ -18,7 +18,7 @@ pub struct DatabaseKeyIndex { impl DatabaseKeyIndex { #[inline] - pub(crate) fn new(ingredient_index: IngredientIndex, key_index: Id) -> Self { + pub(crate) const fn new(ingredient_index: IngredientIndex, key_index: Id) -> Self { Self { key_index, ingredient_index, diff --git a/src/runtime.rs b/src/runtime.rs index 8436c684d..e1f4aadf2 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,6 +1,6 @@ use self::dependency_graph::DependencyGraph; use crate::durability::Durability; -use crate::function::SyncGuard; +use crate::function::{SyncGuard, SyncOwnerId}; use crate::key::DatabaseKeyIndex; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::thread::{self, ThreadId}; @@ -58,6 +58,57 @@ pub(crate) enum BlockResult<'me> { Cycle, } +pub(crate) enum BlockTransferredResult<'me> { + /// The current thread is the owner of the transferred query + /// and it can claim it if it wants to. + ImTheOwner, + + /// The query is owned/running on another thread. + OwnedBy(Box>), + + /// The query has transferred its ownership to another query previously but that query has + /// since then completed and released the lock. + Released, +} + +pub(super) struct BlockOnTransferredOwner<'me> { + dg: crate::sync::MutexGuard<'me, DependencyGraph>, + /// The query that we're trying to claim. + database_key: DatabaseKeyIndex, + /// The thread that currently owns the lock for the transferred query. + other_id: ThreadId, + /// The current thread that is trying to claim the transferred query. + thread_id: ThreadId, +} + +impl<'me> BlockOnTransferredOwner<'me> { + /// Block on the other thread to complete the computation. + pub(super) fn block(self, query_mutex_guard: SyncGuard<'me>) -> BlockResult<'me> { + // Cycle in the same thread. + if self.thread_id == self.other_id { + return BlockResult::Cycle; + } + + if self.dg.depends_on(self.other_id, self.thread_id) { + crate::tracing::debug!( + "block_on: cycle detected for {:?} in thread {thread_id:?} on {:?}", + self.database_key, + self.other_id, + thread_id = self.thread_id + ); + return BlockResult::Cycle; + } + + BlockResult::Running(Running(Box::new(BlockedOnInner { + dg: self.dg, + query_mutex_guard, + database_key: self.database_key, + other_id: self.other_id, + thread_id: self.thread_id, + }))) + } +} + pub struct Running<'me>(Box>); struct BlockedOnInner<'me> { @@ -69,10 +120,6 @@ struct BlockedOnInner<'me> { } impl Running<'_> { - pub(crate) fn database_key(&self) -> DatabaseKeyIndex { - self.0.database_key - } - /// Blocks on the other thread to complete the computation. pub(crate) fn block_on(self, zalsa: &Zalsa) { let BlockedOnInner { @@ -210,7 +257,7 @@ impl Runtime { let r_old = self.current_revision(); let r_new = r_old.next(); self.revisions[0] = r_new; - crate::tracing::debug!("new_revision: {r_old:?} -> {r_new:?}"); + crate::tracing::info!("new_revision: {r_old:?} -> {r_new:?}"); r_new } @@ -253,9 +300,40 @@ impl Runtime { }))) } + /// Tries to claim ownership of a transferred query where `thread_id` is the current thread and `query` + /// is the query (that had its ownership transferred) to claim. + /// + /// For this operation to be reasonable, the caller must ensure that the sync table lock on `query` is not released + /// before this operation completes. + pub(super) fn block_transferred( + &self, + query: DatabaseKeyIndex, + current_id: ThreadId, + ) -> BlockTransferredResult<'_> { + let dg = self.dependency_graph.lock(); + + let owner_thread = dg.thread_id_of_transferred_query(query, None); + + let Some(owner_thread_id) = owner_thread else { + // The query transferred its ownership but the owner has since then released the lock. + return BlockTransferredResult::Released; + }; + + if owner_thread_id == current_id || dg.depends_on(owner_thread_id, current_id) { + BlockTransferredResult::ImTheOwner + } else { + // Lock is owned by another thread, wait for it to be released. + BlockTransferredResult::OwnedBy(Box::new(BlockOnTransferredOwner { + dg, + database_key: query, + other_id: owner_thread_id, + thread_id: current_id, + })) + } + } + /// Invoked when this runtime completed computing `database_key` with - /// the given result `wait_result` (`wait_result` should be `None` if - /// computing `database_key` panicked and could not complete). + /// the given result `wait_result`. /// This function unblocks any dependent queries and allows them /// to continue executing. pub(crate) fn unblock_queries_blocked_on( @@ -268,6 +346,52 @@ impl Runtime { .unblock_runtimes_blocked_on(database_key, wait_result); } + /// Unblocks all transferred queries that are owned by `database_key` recursively. + /// + /// Invoked when a query completes that has been marked as transfer target (it has + /// queries that transferred their lock ownership to it) with the given `wait_result`. + /// + /// This function unblocks any dependent queries and allows them to continue executing. The + /// query `database_key` is not unblocked by this function. + #[cold] + pub(crate) fn unblock_transferred_queries_owned_by( + &self, + database_key: DatabaseKeyIndex, + wait_result: WaitResult, + ) { + self.dependency_graph + .lock() + .unblock_runtimes_blocked_on_transferred_queries_owned_by(database_key, wait_result); + } + + /// Removes the ownership transfer of `query`'s lock if it exists. + /// + /// If `query` has transferred its lock ownership to another query, this function will remove that transfer, + /// so that `query` now owns its lock again. + #[cold] + pub(super) fn undo_transfer_lock(&self, query: DatabaseKeyIndex) { + self.dependency_graph.lock().undo_transfer_lock(query); + } + + /// Transfers ownership of the lock for `query` to `new_owner_key`. + /// + /// For this operation to be reasonable, the caller must ensure that the sync table lock on `query` is not released + /// and that `new_owner_key` is currently blocked on `query`. Otherwise, `new_owner_key` might + /// complete before the lock is transferred, leaving `query` locked forever. + pub(super) fn transfer_lock( + &self, + query: DatabaseKeyIndex, + new_owner_key: DatabaseKeyIndex, + new_owner_id: SyncOwnerId, + ) { + self.dependency_graph.lock().transfer_lock( + query, + thread::current().id(), + new_owner_key, + new_owner_id, + ); + } + #[cfg(feature = "persistence")] pub(crate) fn deserialize_from(&mut self, other: &mut Runtime) { // The only field that is serialized is `revisions`. diff --git a/src/runtime/dependency_graph.rs b/src/runtime/dependency_graph.rs index fd26c04fa..366a98f53 100644 --- a/src/runtime/dependency_graph.rs +++ b/src/runtime/dependency_graph.rs @@ -3,11 +3,16 @@ use std::pin::Pin; use rustc_hash::FxHashMap; use smallvec::SmallVec; +use crate::function::SyncOwnerId; use crate::key::DatabaseKeyIndex; use crate::runtime::dependency_graph::edge::EdgeCondvar; use crate::runtime::WaitResult; use crate::sync::thread::ThreadId; use crate::sync::MutexGuard; +use crate::tracing; + +type QueryDependents = FxHashMap>; +type TransferredDependents = FxHashMap>; #[derive(Debug, Default)] pub(super) struct DependencyGraph { @@ -15,16 +20,26 @@ pub(super) struct DependencyGraph { /// `K` is blocked on some query executing in the runtime `V`. /// This encodes a graph that must be acyclic (or else deadlock /// will result). - edges: FxHashMap, + edges: Edges, /// Encodes the `ThreadId` that are blocked waiting for the result /// of a given query. - query_dependents: FxHashMap>, + query_dependents: QueryDependents, /// When a key K completes which had dependent queries Qs blocked on it, /// it stores its `WaitResult` here. As they wake up, each query Q in Qs will /// come here to fetch their results. wait_results: FxHashMap, + + /// A `K -> Q` pair indicates that the query `K`'s lock is now owned by the query + /// `Q`. It's important that `transferred` always forms a tree (must be acyclic), + /// or else deadlock will result. + transferred: FxHashMap, + + /// A `K -> [Q]` pair indicates that the query `K` owns the locks of + /// `Q`. This is the reverse mapping of `transferred` to allow efficient unlocking + /// of all dependent queries when `K` completes. + transferred_dependents: TransferredDependents, } impl DependencyGraph { @@ -32,15 +47,7 @@ impl DependencyGraph { /// /// (i.e., there is a path from `from_id` to `to_id` in the graph.) pub(super) fn depends_on(&self, from_id: ThreadId, to_id: ThreadId) -> bool { - let mut p = from_id; - while let Some(q) = self.edges.get(&p).map(|edge| edge.blocked_on_id) { - if q == to_id { - return true; - } - - p = q; - } - p == to_id + self.edges.depends_on(from_id, to_id) } /// Modifies the graph so that `from_id` is blocked @@ -138,6 +145,381 @@ impl DependencyGraph { // notify the thread. edge.notify(); } + + /// Invoked when the query `database_key` completes and it owns the locks of other queries + /// (the queries transferred their locks to `database_key`). + pub(super) fn unblock_runtimes_blocked_on_transferred_queries_owned_by( + &mut self, + database_key: DatabaseKeyIndex, + wait_result: WaitResult, + ) { + fn unblock_recursive( + me: &mut DependencyGraph, + query: DatabaseKeyIndex, + wait_result: WaitResult, + ) { + me.transferred.remove(&query); + + for query in me.transferred_dependents.remove(&query).unwrap_or_default() { + me.unblock_runtimes_blocked_on(query, wait_result); + unblock_recursive(me, query, wait_result); + } + } + + // If `database_key` is `c` and it has been transferred to `b` earlier, remove its entry. + tracing::trace!( + "unblock_runtimes_blocked_on_transferred_queries_owned_by({database_key:?}" + ); + + if let Some((_, owner)) = self.transferred.remove(&database_key) { + // If this query previously transferred its lock ownership to another query, remove + // it from that queries dependents as it is now completing. + self.transferred_dependents + .get_mut(&owner) + .unwrap() + .remove(&database_key); + } + + unblock_recursive(self, database_key, wait_result); + } + + pub(super) fn undo_transfer_lock(&mut self, database_key: DatabaseKeyIndex) { + if let Some((_, owner)) = self.transferred.remove(&database_key) { + self.transferred_dependents + .get_mut(&owner) + .unwrap() + .remove(&database_key); + } + } + + /// Recursively resolves the thread id that currently owns the lock for `database_key`. + /// + /// Returns `None` if `database_key` hasn't (or has since then been released) transferred its lock + /// and the thread id must be looked up in the `SyncTable` instead. + pub(super) fn thread_id_of_transferred_query( + &self, + database_key: DatabaseKeyIndex, + ignore: Option, + ) -> Option { + let &(mut resolved_thread, owner) = self.transferred.get(&database_key)?; + + let mut current_owner = owner; + + while let Some(&(next_thread, next_key)) = self.transferred.get(¤t_owner) { + if Some(next_key) == ignore { + break; + } + resolved_thread = next_thread; + current_owner = next_key; + } + + Some(resolved_thread) + } + + /// Modifies the graph so that the lock on `query` (currently owned by `current_thread`) is + /// transferred to `new_owner` (which is owned by `new_owner_id`). + pub(super) fn transfer_lock( + &mut self, + query: DatabaseKeyIndex, + current_thread: ThreadId, + new_owner: DatabaseKeyIndex, + new_owner_id: SyncOwnerId, + ) { + let new_owner_thread = match new_owner_id { + SyncOwnerId::Thread(thread) => thread, + SyncOwnerId::Transferred => { + // Skip over `query` to skip over any existing mapping from `new_owner` to `query` that may + // exist from previous transfers. + self.thread_id_of_transferred_query(new_owner, Some(query)) + .expect("new owner should be blocked on `query`") + } + }; + + debug_assert!( + new_owner_thread == current_thread || self.depends_on(new_owner_thread, current_thread), + "new owner {new_owner:?} ({new_owner_thread:?}) must be blocked on {query:?} ({current_thread:?})" + ); + + let thread_changed = match self.transferred.entry(query) { + std::collections::hash_map::Entry::Vacant(entry) => { + // Transfer `c -> b` and there's no existing entry for `c`. + entry.insert((new_owner_thread, new_owner)); + current_thread != new_owner_thread + } + std::collections::hash_map::Entry::Occupied(mut entry) => { + // If we transfer to the same owner as before, return immediately as this is a no-op. + if entry.get() == &(new_owner_thread, new_owner) { + return; + } + + // `Transfer `c -> b` after a previous `c -> d` mapping. + // Update the owner and remove the query from the old owner's dependents. + let &(old_owner_thread, old_owner) = entry.get(); + + // For the example below, remove `d` from `b`'s dependents.` + self.transferred_dependents + .get_mut(&old_owner) + .unwrap() + .remove(&query); + + entry.insert((new_owner_thread, new_owner)); + + // If we have `c -> a -> d` and we now insert a mapping `d -> c`, rewrite the mapping to + // `d -> c -> a` to avoid cycles. + // + // Or, starting with `e -> c -> a -> d -> b` insert `d -> c`. We need to respine the tree to + // ``` + // e -> c -> a -> b + // d / + // ``` + // + // + // A cycle between transfers can occur when a later iteration has a different outer most query than + // a previous iteration. The second iteration then hits `cycle_initial` for a different head, (e.g. for `c` where it previously was `d`). + let mut last_segment = self.transferred.entry(new_owner); + + while let std::collections::hash_map::Entry::Occupied(mut entry) = last_segment { + let source = *entry.key(); + let next_target = entry.get().1; + + // If it's `a -> d`, remove `a -> d` and insert an edge from `a -> b` + if next_target == query { + tracing::trace!( + "Remap edge {source:?} -> {next_target:?} to {source:?} -> {old_owner:?} to prevent a cycle", + ); + + // Remove `a` from the dependents of `d` and remove the mapping from `a -> d`. + self.transferred_dependents + .get_mut(&query) + .unwrap() + .remove(&source); + + // if the old mapping was `c -> d` and we now insert `d -> c`, remove `d -> c` + if old_owner == new_owner { + entry.remove(); + } else { + // otherwise (when `d` pointed to some other query, e.g. `b` in the example), + // add an edge from `a` to `b` + entry.insert((old_owner_thread, old_owner)); + self.transferred_dependents + .get_mut(&old_owner) + .unwrap() + .push(source); + } + + break; + } + + last_segment = self.transferred.entry(next_target); + } + + // We simply assume here that the thread has changed because we'd have to walk the entire + // transferred chaine of `old_owner` to know if the thread has changed. This won't save us much + // compared to just updating all dependent threads. + true + } + }; + + // Register `c` as a dependent of `b`. + let all_dependents = self.transferred_dependents.entry(new_owner).or_default(); + debug_assert!(!all_dependents.contains(&new_owner)); + all_dependents.push(query); + + if thread_changed { + tracing::debug!("Unblocking new owner of transfer target {new_owner:?}"); + self.unblock_transfer_target(query, new_owner_thread); + self.update_transferred_edges(query, new_owner_thread); + } + } + + /// Finds the one query in the dependents of the `source_query` (the one that is transferred to a new owner) + /// on which the `new_owner_id` thread blocks on and unblocks it, to ensure progress. + fn unblock_transfer_target(&mut self, source_query: DatabaseKeyIndex, new_owner_id: ThreadId) { + /// Finds the thread that's currently blocking the `new_owner_id` thread. + /// + /// Returns `Some` if there's such a thread where the first element is the query + /// that the thread is blocked on (key into `query_dependents`) and the second element + /// is the index in the list of blocked threads (index into the `query_dependents` value) for that query. + fn find_blocked_thread( + me: &DependencyGraph, + query: DatabaseKeyIndex, + new_owner_id: ThreadId, + ) -> Option<(DatabaseKeyIndex, usize)> { + if let Some(blocked_threads) = me.query_dependents.get(&query) { + for (i, id) in blocked_threads.iter().copied().enumerate() { + if id == new_owner_id || me.edges.depends_on(new_owner_id, id) { + return Some((query, i)); + } + } + } + + me.transferred_dependents + .get(&query) + .iter() + .copied() + .flatten() + .find_map(|dependent| find_blocked_thread(me, *dependent, new_owner_id)) + } + + if let Some((query, query_dependents_index)) = + find_blocked_thread(self, source_query, new_owner_id) + { + let blocked_threads = self.query_dependents.get_mut(&query).unwrap(); + + let thread_id = blocked_threads.swap_remove(query_dependents_index); + if blocked_threads.is_empty() { + self.query_dependents.remove(&query); + } + + self.unblock_runtime(thread_id, WaitResult::Completed); + } + } + + fn update_transferred_edges(&mut self, query: DatabaseKeyIndex, new_owner_thread: ThreadId) { + fn update_transferred_edges( + edges: &mut Edges, + query_dependents: &QueryDependents, + transferred_dependents: &TransferredDependents, + query: DatabaseKeyIndex, + new_owner_thread: ThreadId, + ) { + tracing::trace!("update_transferred_edges({query:?}"); + if let Some(dependents) = query_dependents.get(&query) { + for dependent in dependents.iter() { + let edge = edges.get_mut(dependent).unwrap(); + + tracing::trace!( + "Rewrite edge from {:?} to {new_owner_thread:?}", + edge.blocked_on_id + ); + edge.blocked_on_id = new_owner_thread; + debug_assert!( + !edges.depends_on(new_owner_thread, *dependent), + "Circular reference between blocked edges: {:#?}", + edges + ); + } + }; + + if let Some(dependents) = transferred_dependents.get(&query) { + for dependent in dependents { + update_transferred_edges( + edges, + query_dependents, + transferred_dependents, + *dependent, + new_owner_thread, + ) + } + } + } + + update_transferred_edges( + &mut self.edges, + &self.query_dependents, + &self.transferred_dependents, + query, + new_owner_thread, + ) + } +} + +#[derive(Debug, Default)] +struct Edges(FxHashMap); + +impl Edges { + fn depends_on(&self, from_id: ThreadId, to_id: ThreadId) -> bool { + let mut p = from_id; + while let Some(q) = self.0.get(&p).map(|edge| edge.blocked_on_id) { + if q == to_id { + return true; + } + + p = q; + } + p == to_id + } + + fn get_mut(&mut self, id: &ThreadId) -> Option<&mut edge::Edge> { + self.0.get_mut(id) + } + + fn contains_key(&self, id: &ThreadId) -> bool { + self.0.contains_key(id) + } + + fn insert(&mut self, id: ThreadId, edge: edge::Edge) { + self.0.insert(id, edge); + } + + fn remove(&mut self, id: &ThreadId) -> Option { + self.0.remove(id) + } +} + +#[derive(Debug)] +struct SmallSet(SmallVec<[T; N]>); + +impl SmallSet +where + T: PartialEq, +{ + const fn new() -> Self { + Self(SmallVec::new_const()) + } + + fn push(&mut self, value: T) { + debug_assert!(!self.0.contains(&value)); + + self.0.push(value); + } + + fn contains(&self, value: &T) -> bool { + self.0.contains(value) + } + + fn remove(&mut self, value: &T) -> bool { + if let Some(index) = self.0.iter().position(|x| x == value) { + self.0.swap_remove(index); + true + } else { + false + } + } + + fn iter(&self) -> std::slice::Iter<'_, T> { + self.0.iter() + } +} + +impl IntoIterator for SmallSet { + type Item = T; + type IntoIter = smallvec::IntoIter<[T; N]>; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl<'a, T, const N: usize> IntoIterator for &'a SmallSet +where + T: PartialEq, +{ + type Item = &'a T; + type IntoIter = std::slice::Iter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl Default for SmallSet +where + T: PartialEq, +{ + fn default() -> Self { + Self::new() + } } mod edge { @@ -165,7 +547,6 @@ mod edge { /// Signalled whenever a query with dependents completes. /// Allows those dependents to check if they are ready to unblock. - // condvar: unsafe<'stack_frame> Pin<&'stack_frame Condvar>, condvar: Pin<&'static EdgeCondvar>, } diff --git a/src/tracing.rs b/src/tracing.rs index 47f95d00e..d8b13e471 100644 --- a/src/tracing.rs +++ b/src/tracing.rs @@ -25,6 +25,13 @@ macro_rules! debug_span { }; } +#[expect(unused_macros)] +macro_rules! info_span { + ($($x:tt)*) => { + crate::tracing::span!(INFO, $($x)*) + }; +} + macro_rules! event { ($level:ident, $($x:tt)*) => {{ let event = { @@ -51,4 +58,5 @@ macro_rules! span { }}; } -pub(crate) use {debug, debug_span, event, info, span, trace}; +#[expect(unused_imports)] +pub(crate) use {debug, debug_span, event, info, info_span, span, trace}; diff --git a/src/zalsa_local.rs b/src/zalsa_local.rs index e332b516f..39d0c489c 100644 --- a/src/zalsa_local.rs +++ b/src/zalsa_local.rs @@ -1,4 +1,6 @@ use std::cell::{RefCell, UnsafeCell}; +use std::fmt; +use std::fmt::Formatter; use std::panic::UnwindSafe; use std::ptr::{self, NonNull}; @@ -11,7 +13,7 @@ use crate::accumulator::{ Accumulator, }; use crate::active_query::{CompletedQuery, QueryStack}; -use crate::cycle::{empty_cycle_heads, CycleHeads, IterationCount}; +use crate::cycle::{empty_cycle_heads, AtomicIterationCount, CycleHeads, IterationCount}; use crate::durability::Durability; use crate::key::DatabaseKeyIndex; use crate::runtime::Stamp; @@ -513,7 +515,8 @@ impl QueryRevisionsExtra { accumulated, cycle_heads, tracked_struct_ids, - iteration, + iteration: iteration.into(), + cycle_converged: false, })) }; @@ -521,7 +524,6 @@ impl QueryRevisionsExtra { } } -#[derive(Debug)] #[cfg_attr(feature = "persistence", derive(serde::Serialize, serde::Deserialize))] struct QueryRevisionsExtraInner { #[cfg(feature = "accumulator")] @@ -561,7 +563,12 @@ struct QueryRevisionsExtraInner { /// iterate again. cycle_heads: CycleHeads, - iteration: IterationCount, + iteration: AtomicIterationCount, + + /// Stores for nested cycle heads whether they've converged in the last iteration. + /// This value is always `false` for other queries. + #[cfg_attr(feature = "persistence", serde(skip))] + cycle_converged: bool, } impl QueryRevisionsExtraInner { @@ -573,6 +580,7 @@ impl QueryRevisionsExtraInner { tracked_struct_ids, cycle_heads, iteration: _, + cycle_converged: _, } = self; #[cfg(feature = "accumulator")] @@ -583,6 +591,44 @@ impl QueryRevisionsExtraInner { } } +impl fmt::Debug for QueryRevisionsExtraInner { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + struct FmtTrackedStructIds<'a>(&'a ThinVec<(Identity, Id)>); + + impl fmt::Debug for FmtTrackedStructIds<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let mut f = f.debug_list(); + + if self.0.len() > 5 { + f.entries(&self.0[..5]); + f.finish_non_exhaustive() + } else { + f.entries(self.0); + f.finish() + } + } + } + + let mut f = f.debug_struct("QueryRevisionsExtraInner"); + + f.field("cycle_heads", &self.cycle_heads) + .field("iteration", &self.iteration) + .field("cycle_converged", &self.cycle_converged); + + #[cfg(feature = "accumulator")] + { + f.field("accumulated", &self.accumulated); + } + + f.field( + "tracked_struct_ids", + &FmtTrackedStructIds(&self.tracked_struct_ids), + ); + + f.finish() + } +} + #[cfg(not(feature = "shuttle"))] #[cfg(target_pointer_width = "64")] const _: [(); std::mem::size_of::()] = [(); std::mem::size_of::<[usize; 4]>()]; @@ -605,7 +651,7 @@ impl QueryRevisions { #[cfg(feature = "accumulator")] AccumulatedMap::default(), ThinVec::default(), - CycleHeads::initial(query), + CycleHeads::initial(query, IterationCount::initial()), IterationCount::initial(), ), } @@ -654,17 +700,55 @@ impl QueryRevisions { }; } - pub(crate) const fn iteration(&self) -> IterationCount { + pub(crate) fn cycle_converged(&self) -> bool { match &self.extra.0 { - Some(extra) => extra.iteration, + Some(extra) => extra.cycle_converged, + None => false, + } + } + + pub(crate) fn set_cycle_converged(&mut self, cycle_converged: bool) { + if let Some(extra) = &mut self.extra.0 { + extra.cycle_converged = cycle_converged + } + } + + pub(crate) fn iteration(&self) -> IterationCount { + match &self.extra.0 { + Some(extra) => extra.iteration.load(), None => IterationCount::initial(), } } + pub(crate) fn set_iteration_count( + &self, + database_key_index: DatabaseKeyIndex, + iteration_count: IterationCount, + ) { + let Some(extra) = &self.extra.0 else { + return; + }; + debug_assert!(extra.iteration.load() <= iteration_count); + + extra.iteration.store(iteration_count); + + extra + .cycle_heads + .update_iteration_count(database_key_index, iteration_count); + } + /// Updates the iteration count if this query has any cycle heads. Otherwise it's a no-op. - pub(crate) fn update_iteration_count(&mut self, iteration_count: IterationCount) { + pub(crate) fn update_iteration_count_mut( + &mut self, + cycle_head_index: DatabaseKeyIndex, + iteration_count: IterationCount, + ) { if let Some(extra) = &mut self.extra.0 { - extra.iteration = iteration_count + extra.iteration.store_mut(iteration_count); + + extra + .cycle_heads + .update_iteration_count_mut(cycle_head_index, iteration_count); } } diff --git a/tests/backtrace.rs b/tests/backtrace.rs index 74124c1ab..b611cac86 100644 --- a/tests/backtrace.rs +++ b/tests/backtrace.rs @@ -108,7 +108,7 @@ fn backtrace_works() { at tests/backtrace.rs:32 1: query_cycle(Id(2)) at tests/backtrace.rs:45 - cycle heads: query_cycle(Id(2)) -> IterationCount(0) + cycle heads: query_cycle(Id(2)) -> iteration = 0 2: query_f(Id(2)) at tests/backtrace.rs:40 "#]] @@ -119,9 +119,9 @@ fn backtrace_works() { query stacktrace: 0: query_e(Id(3)) -> (R1, Durability::LOW) at tests/backtrace.rs:32 - 1: query_cycle(Id(3)) -> (R1, Durability::HIGH, iteration = IterationCount(0)) + 1: query_cycle(Id(3)) -> (R1, Durability::HIGH, iteration = 0) at tests/backtrace.rs:45 - cycle heads: query_cycle(Id(3)) -> IterationCount(0) + cycle heads: query_cycle(Id(3)) -> iteration = 0 2: query_f(Id(3)) -> (R1, Durability::HIGH) at tests/backtrace.rs:40 "#]] diff --git a/tests/cycle.rs b/tests/cycle.rs index 7a7e26a07..5e46cc0be 100644 --- a/tests/cycle.rs +++ b/tests/cycle.rs @@ -95,18 +95,22 @@ impl Input { } } + #[track_caller] fn assert(&self, db: &dyn Db, expected: Value) { assert_eq!(self.eval(db), expected) } + #[track_caller] fn assert_value(&self, db: &dyn Db, expected: u8) { self.assert(db, Value::N(expected)) } + #[track_caller] fn assert_bounds(&self, db: &dyn Db) { self.assert(db, Value::OutOfBounds) } + #[track_caller] fn assert_count(&self, db: &dyn Db) { self.assert(db, Value::TooManyIterations) } @@ -893,7 +897,7 @@ fn cycle_unchanged() { /// /// If nothing in a nested cycle changed in the new revision, no part of the cycle should /// re-execute. -#[test] +#[test_log::test] fn cycle_unchanged_nested() { let mut db = ExecuteValidateLoggerDatabase::default(); let a_in = Inputs::new(&db, vec![]); @@ -978,7 +982,7 @@ fn cycle_unchanged_nested_intertwined() { e.assert_value(&db, 60); } - db.assert_logs_len(15 + i); + db.assert_logs_len(13 + i); // next revision, we change only A, which is not part of the cycle and the cycle does not // depend on. diff --git a/tests/cycle_tracked.rs b/tests/cycle_tracked.rs index 154ba3370..2e0c2cfd0 100644 --- a/tests/cycle_tracked.rs +++ b/tests/cycle_tracked.rs @@ -269,7 +269,7 @@ fn cycle_recover_with_structs<'db>( CycleRecoveryAction::Iterate } -#[test] +#[test_log::test] fn test_cycle_with_fixpoint_structs() { let mut db = EventLoggerDatabase::default(); diff --git a/tests/parallel/cycle_a_t1_b_t2.rs b/tests/parallel/cycle_a_t1_b_t2.rs index d9d5ca365..ad21b7963 100644 --- a/tests/parallel/cycle_a_t1_b_t2.rs +++ b/tests/parallel/cycle_a_t1_b_t2.rs @@ -62,7 +62,7 @@ fn initial(_db: &dyn KnobsDatabase) -> CycleValue { #[test_log::test] fn the_test() { crate::sync::check(|| { - tracing::debug!("New run"); + tracing::debug!("Starting new run"); let db_t1 = Knobs::default(); let db_t2 = db_t1.clone(); diff --git a/tests/parallel/cycle_a_t1_b_t2_fallback.rs b/tests/parallel/cycle_a_t1_b_t2_fallback.rs index 8005a9c23..b2d6631cc 100644 --- a/tests/parallel/cycle_a_t1_b_t2_fallback.rs +++ b/tests/parallel/cycle_a_t1_b_t2_fallback.rs @@ -55,11 +55,18 @@ fn the_test() { use crate::Knobs; crate::sync::check(|| { + tracing::debug!("Starting new run"); let db_t1 = Knobs::default(); let db_t2 = db_t1.clone(); - let t1 = thread::spawn(move || query_a(&db_t1)); - let t2 = thread::spawn(move || query_b(&db_t2)); + let t1 = thread::spawn(move || { + let _span = tracing::debug_span!("t1", thread_id = ?thread::current().id()).entered(); + query_a(&db_t1) + }); + let t2 = thread::spawn(move || { + let _span = tracing::debug_span!("t2", thread_id = ?thread::current().id()).entered(); + query_b(&db_t2) + }); let (r_t1, r_t2) = (t1.join(), t2.join()); diff --git a/tests/parallel/cycle_nested_deep.rs b/tests/parallel/cycle_nested_deep.rs index 7b7c2f42a..f2b355616 100644 --- a/tests/parallel/cycle_nested_deep.rs +++ b/tests/parallel/cycle_nested_deep.rs @@ -63,6 +63,7 @@ fn initial(_db: &dyn KnobsDatabase) -> CycleValue { #[test_log::test] fn the_test() { crate::sync::check(|| { + tracing::debug!("Starting new run"); let db_t1 = Knobs::default(); let db_t2 = db_t1.clone(); let db_t3 = db_t1.clone(); diff --git a/tests/parallel/cycle_nested_deep_conditional.rs b/tests/parallel/cycle_nested_deep_conditional.rs index 316612845..4eff75189 100644 --- a/tests/parallel/cycle_nested_deep_conditional.rs +++ b/tests/parallel/cycle_nested_deep_conditional.rs @@ -72,7 +72,7 @@ fn initial(_db: &dyn KnobsDatabase) -> CycleValue { #[test_log::test] fn the_test() { crate::sync::check(|| { - tracing::debug!("New run"); + tracing::debug!("Starting new run"); let db_t1 = Knobs::default(); let db_t2 = db_t1.clone(); let db_t3 = db_t1.clone(); diff --git a/tests/parallel/cycle_nested_deep_conditional_changed.rs b/tests/parallel/cycle_nested_deep_conditional_changed.rs index 7c96d808d..51d506456 100644 --- a/tests/parallel/cycle_nested_deep_conditional_changed.rs +++ b/tests/parallel/cycle_nested_deep_conditional_changed.rs @@ -81,7 +81,7 @@ fn the_test() { use crate::sync; use salsa::Setter as _; sync::check(|| { - tracing::debug!("New run"); + tracing::debug!("Starting new run"); // This is a bit silly but it works around https://github.com/awslabs/shuttle/issues/192 static INITIALIZE: sync::Mutex> = @@ -108,36 +108,36 @@ fn the_test() { } let t1 = thread::spawn(move || { + let _span = tracing::info_span!("t1", thread_id = ?thread::current().id()).entered(); let (db, input) = get_db(|db, input| { query_a(db, input); }); - let _span = tracing::debug_span!("t1", thread_id = ?thread::current().id()).entered(); - query_a(&db, input) }); let t2 = thread::spawn(move || { + let _span = tracing::info_span!("t2", thread_id = ?thread::current().id()).entered(); let (db, input) = get_db(|db, input| { query_b(db, input); }); - let _span = tracing::debug_span!("t4", thread_id = ?thread::current().id()).entered(); query_b(&db, input) }); let t3 = thread::spawn(move || { + let _span = tracing::info_span!("t3", thread_id = ?thread::current().id()).entered(); let (db, input) = get_db(|db, input| { query_d(db, input); }); - let _span = tracing::debug_span!("t2", thread_id = ?thread::current().id()).entered(); query_d(&db, input) }); let t4 = thread::spawn(move || { + let _span = tracing::info_span!("t4", thread_id = ?thread::current().id()).entered(); + let (db, input) = get_db(|db, input| { query_e(db, input); }); - let _span = tracing::debug_span!("t3", thread_id = ?thread::current().id()).entered(); query_e(&db, input) }); diff --git a/tests/parallel/cycle_nested_deep_panic.rs b/tests/parallel/cycle_nested_deep_panic.rs new file mode 100644 index 000000000..98512fbd2 --- /dev/null +++ b/tests/parallel/cycle_nested_deep_panic.rs @@ -0,0 +1,140 @@ +// Shuttle doesn't like panics inside of its runtime. +#![cfg(not(feature = "shuttle"))] + +//! Tests that salsa doesn't get stuck after a panic in a nested cycle function. + +use crate::sync::thread; +use crate::{Knobs, KnobsDatabase}; +use std::fmt; +use std::panic::catch_unwind; + +use salsa::CycleRecoveryAction; + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, salsa::Update)] +struct CycleValue(u32); + +const MIN: CycleValue = CycleValue(0); +const MAX: CycleValue = CycleValue(3); + +#[salsa::tracked(cycle_fn=cycle_fn, cycle_initial=initial)] +fn query_a(db: &dyn KnobsDatabase) -> CycleValue { + query_b(db) +} + +#[salsa::tracked(cycle_fn=cycle_fn, cycle_initial=initial)] +fn query_b(db: &dyn KnobsDatabase) -> CycleValue { + let c_value = query_c(db); + CycleValue(c_value.0 + 1).min(MAX) +} + +#[salsa::tracked] +fn query_c(db: &dyn KnobsDatabase) -> CycleValue { + let d_value = query_d(db); + + if d_value > CycleValue(0) { + let e_value = query_e(db); + let b_value = query_b(db); + CycleValue(d_value.0.max(e_value.0).max(b_value.0)) + } else { + let a_value = query_a(db); + CycleValue(d_value.0.max(a_value.0)) + } +} + +#[salsa::tracked(cycle_fn=cycle_fn, cycle_initial=initial)] +fn query_d(db: &dyn KnobsDatabase) -> CycleValue { + query_b(db) +} + +#[salsa::tracked(cycle_fn=cycle_fn, cycle_initial=initial)] +fn query_e(db: &dyn KnobsDatabase) -> CycleValue { + query_c(db) +} + +fn cycle_fn( + _db: &dyn KnobsDatabase, + _value: &CycleValue, + _count: u32, +) -> CycleRecoveryAction { + CycleRecoveryAction::Iterate +} + +fn initial(_db: &dyn KnobsDatabase) -> CycleValue { + MIN +} + +fn run() { + tracing::debug!("Starting new run"); + let db_t1 = Knobs::default(); + let db_t2 = db_t1.clone(); + let db_t3 = db_t1.clone(); + let db_t4 = db_t1.clone(); + + let t1 = thread::spawn(move || { + let _span = tracing::debug_span!("t1", thread_id = ?thread::current().id()).entered(); + catch_unwind(|| { + db_t1.wait_for(1); + query_a(&db_t1) + }) + }); + let t2 = thread::spawn(move || { + let _span = tracing::debug_span!("t2", thread_id = ?thread::current().id()).entered(); + catch_unwind(|| { + db_t2.wait_for(1); + + query_b(&db_t2) + }) + }); + let t3 = thread::spawn(move || { + let _span = tracing::debug_span!("t3", thread_id = ?thread::current().id()).entered(); + catch_unwind(|| { + db_t3.signal(2); + query_d(&db_t3) + }) + }); + + let r_t1 = t1.join().unwrap(); + let r_t2 = t2.join().unwrap(); + let r_t3 = t3.join().unwrap(); + + assert_is_set_cycle_error(r_t1); + assert_is_set_cycle_error(r_t2); + assert_is_set_cycle_error(r_t3); + + // Pulling the cycle again at a later point should still result in a panic. + assert_is_set_cycle_error(catch_unwind(|| query_d(&db_t4))); +} + +#[test_log::test] +fn the_test() { + for _ in 0..200 { + run() + } +} + +#[track_caller] +fn assert_is_set_cycle_error(result: Result>) +where + T: fmt::Debug, +{ + let err = result.expect_err("expected an error"); + + if let Some(message) = err.downcast_ref::<&str>() { + assert!( + message.contains("set cycle_fn/cycle_initial to fixpoint iterate"), + "Expected error message to contain 'set cycle_fn/cycle_initial to fixpoint iterate', but got: {}", + message + ); + } else if let Some(message) = err.downcast_ref::() { + assert!( + message.contains("set cycle_fn/cycle_initial to fixpoint iterate"), + "Expected error message to contain 'set cycle_fn/cycle_initial to fixpoint iterate', but got: {}", + message + ); + } else if err.downcast_ref::().is_some() { + // This is okay, because Salsa throws a Cancelled::PropagatedPanic when a panic occurs in a query + // that it blocks on. + } else { + std::panic::resume_unwind(err); + } +} diff --git a/tests/parallel/cycle_nested_three_threads.rs b/tests/parallel/cycle_nested_three_threads.rs index c761a80f4..22232bd85 100644 --- a/tests/parallel/cycle_nested_three_threads.rs +++ b/tests/parallel/cycle_nested_three_threads.rs @@ -76,9 +76,18 @@ fn the_test() { let db_t2 = db_t1.clone(); let db_t3 = db_t1.clone(); - let t1 = thread::spawn(move || query_a(&db_t1)); - let t2 = thread::spawn(move || query_b(&db_t2)); - let t3 = thread::spawn(move || query_c(&db_t3)); + let t1 = thread::spawn(move || { + let _span = tracing::info_span!("t1", thread_id = ?thread::current().id()).entered(); + query_a(&db_t1) + }); + let t2 = thread::spawn(move || { + let _span = tracing::info_span!("t2", thread_id = ?thread::current().id()).entered(); + query_b(&db_t2) + }); + let t3 = thread::spawn(move || { + let _span = tracing::info_span!("t3", thread_id = ?thread::current().id()).entered(); + query_c(&db_t3) + }); let r_t1 = t1.join().unwrap(); let r_t2 = t2.join().unwrap(); diff --git a/tests/parallel/main.rs b/tests/parallel/main.rs index a764a864c..d2c780787 100644 --- a/tests/parallel/main.rs +++ b/tests/parallel/main.rs @@ -9,6 +9,7 @@ mod cycle_ab_peeping_c; mod cycle_nested_deep; mod cycle_nested_deep_conditional; mod cycle_nested_deep_conditional_changed; +mod cycle_nested_deep_panic; mod cycle_nested_three_threads; mod cycle_nested_three_threads_changed; mod cycle_panic; @@ -33,7 +34,7 @@ pub(crate) mod sync { pub use shuttle::thread; pub fn check(f: impl Fn() + Send + Sync + 'static) { - shuttle::check_pct(f, 1000, 50); + shuttle::check_pct(f, 10000, 50); } }