Skip to content

Commit ab90dd2

Browse files
committed
Only collect cycle heads one level deep
1 parent bdab78a commit ab90dd2

File tree

4 files changed

+103
-86
lines changed

4 files changed

+103
-86
lines changed

src/cycle.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,19 @@ pub struct CycleHead {
114114
removed: AtomicBool,
115115
}
116116

117+
impl CycleHead {
118+
pub const fn new(
119+
database_key_index: DatabaseKeyIndex,
120+
iteration_count: IterationCount,
121+
) -> Self {
122+
Self {
123+
database_key_index,
124+
iteration_count: AtomicIterationCount(AtomicU8::new(iteration_count.0)),
125+
removed: AtomicBool::new(false),
126+
}
127+
}
128+
}
129+
117130
impl Clone for CycleHead {
118131
fn clone(&self) -> Self {
119132
Self {
@@ -147,6 +160,10 @@ impl IterationCount {
147160
Self(u8::MAX)
148161
}
149162

163+
pub(crate) const fn is_panicked(self) -> bool {
164+
self.0 == u8::MAX
165+
}
166+
150167
pub(crate) const fn increment(self) -> Option<Self> {
151168
let next = Self(self.0 + 1);
152169
if next.0 <= MAX_ITERATIONS.0 {
@@ -248,6 +265,11 @@ impl CycleHeads {
248265
}
249266
}
250267

268+
pub(crate) fn iter_skip_own(&self, own: DatabaseKeyIndex) -> impl Iterator<Item = &CycleHead> {
269+
self.iter()
270+
.filter(move |head| head.database_key_index != own)
271+
}
272+
251273
pub(crate) fn contains(&self, value: &DatabaseKeyIndex) -> bool {
252274
self.into_iter()
253275
.any(|head| head.database_key_index == *value && !head.removed.load(Ordering::Relaxed))
@@ -307,17 +329,20 @@ impl CycleHeads {
307329
self.0.reserve(other.0.len());
308330

309331
for head in other {
310-
self.insert(head);
332+
debug_assert!(!head.removed.load(Ordering::Relaxed));
333+
self.insert(head.database_key_index, head.iteration_count.load());
311334
}
312335
}
313336

314-
pub(crate) fn insert(&mut self, head: &CycleHead) -> bool {
315-
debug_assert!(!head.removed.load(Ordering::Relaxed));
316-
337+
pub(crate) fn insert(
338+
&mut self,
339+
database_key_index: DatabaseKeyIndex,
340+
iteration_count: IterationCount,
341+
) -> bool {
317342
if let Some(existing) = self
318343
.0
319344
.iter_mut()
320-
.find(|candidate| candidate.database_key_index == head.database_key_index)
345+
.find(|candidate| candidate.database_key_index == database_key_index)
321346
{
322347
let removed = existing.removed.get_mut();
323348

@@ -327,18 +352,18 @@ impl CycleHeads {
327352
true
328353
} else {
329354
let existing_count = existing.iteration_count.load_mut();
330-
let head_count = head.iteration_count.load();
331355

332356
assert_eq!(
333-
existing_count, head_count,
334-
"Can't merge cycle heads {:?} with different iteration counts ({existing_count:?}, {head_count:?})",
357+
existing_count, iteration_count,
358+
"Can't merge cycle heads {:?} with different iteration counts ({existing_count:?}, {iteration_count:?})",
335359
existing.database_key_index
336360
);
337361

338362
false
339363
}
340364
} else {
341-
self.0.push(head.clone());
365+
self.0
366+
.push(CycleHead::new(database_key_index, iteration_count));
342367
true
343368
}
344369
}

src/function/execute.rs

Lines changed: 66 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,14 @@ where
160160
let mut iteration_count = IterationCount::initial();
161161

162162
if let Some(old_memo) = opt_old_memo {
163+
let memo_iteration_count = old_memo.revisions.iteration();
164+
163165
if old_memo.verified_at.load() == zalsa.current_revision()
164166
&& old_memo.cycle_heads().contains(&database_key_index)
167+
&& !memo_iteration_count.is_panicked()
165168
{
166169
previous_memo = Some(old_memo);
167-
iteration_count = old_memo.revisions.iteration();
170+
iteration_count = memo_iteration_count;
168171
}
169172
}
170173

@@ -185,44 +188,55 @@ where
185188

186189
// If there are no cycle heads, break out of the loop (`cycle_heads_mut` returns `None` if the cycle head list is empty)
187190
let Some(cycle_heads) = completed_query.revisions.cycle_heads_mut() else {
191+
claim_guard.set_release_mode(ReleaseMode::SelfOnly);
188192
break (new_value, completed_query);
189193
};
190194

191-
// TODO: Remove "removed" cycle heads"
192195
let mut cycle_heads = std::mem::take(cycle_heads);
196+
let mut missing_heads: SmallVec<[(DatabaseKeyIndex, IterationCount); 1]> =
197+
SmallVec::new_const();
198+
let mut max_iteration_count = iteration_count;
193199

194-
// Recursively resolve all cycle heads that this head depends on.
195-
// This isn't required in a single-threaded execution but it's not guaranteed that all nested cycles are listed
196-
// in cycle heads in a multi-threaded execution:
200+
// Ensure that we resolve the latest cycle heads from any provisional value this query depended on during execution.
201+
// This isn't required in a single-threaded execution, but it's not guaranteed that `cycle_heads` contains all cycles
202+
// in a multi-threaded execution:
197203
//
198204
// t1: a -> b
199205
// t2: c -> b (blocks on t1)
200206
// t1: a -> b -> c (cycle, returns fixpoint initial with c(0) in heads)
201207
// t1: a -> b (completes b, b has c(0) in its cycle heads, releases `b`, which resumes `t2`, and `retry_provisional` blocks on `c` (t2))
202208
// t2: c -> a (cycle, returns fixpoint initial for a with a(0) in heads)
203209
// t2: completes c, `provisional_retry` blocks on `a` (t2)
204-
// t1: a (complets `b` with `c` in heads)
210+
// t1: a (completes `b` with `c` in heads)
205211
//
206212
// Note how `a` only depends on `c` but not `a`. This is because `a` only saw the initial value of `c` and wasn't updated when `c` completed.
207213
// That's why we need to resolve the cycle heads recursively to `cycle_heads` contains all cycle heads at the moment this query completed.
208-
let mut queue: SmallVec<[DatabaseKeyIndex; 4]> = cycle_heads
209-
.iter()
210-
.map(|head| head.database_key_index)
211-
.filter(|head| *head != database_key_index)
212-
.collect();
213-
214-
// TODO: Can we also resolve whether the cycles have converged here?
215-
while let Some(head) = queue.pop() {
216-
let ingredient = zalsa.lookup_ingredient(head.ingredient_index());
217-
let nested_heads = ingredient.cycle_heads(zalsa, head.key_index());
218-
219-
for head in nested_heads {
220-
if cycle_heads.insert(head) && !queue.contains(&head.database_key_index) {
221-
queue.push(head.database_key_index);
214+
for head in &cycle_heads {
215+
let ingredient =
216+
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
217+
let nested_heads =
218+
ingredient.cycle_heads(zalsa, head.database_key_index.key_index());
219+
220+
max_iteration_count = max_iteration_count.max(head.iteration_count.load());
221+
222+
for nested_head in nested_heads {
223+
let nested_tuple = (
224+
nested_head.database_key_index,
225+
nested_head.iteration_count.load(),
226+
);
227+
if !cycle_heads.contains(&nested_head.database_key_index)
228+
&& !missing_heads.contains(&nested_tuple)
229+
{
230+
max_iteration_count = max_iteration_count.max(nested_tuple.1);
231+
missing_heads.push(nested_tuple);
222232
}
223233
}
224234
}
225235

236+
for (head_key, iteration_count) in missing_heads {
237+
cycle_heads.insert(head_key, iteration_count);
238+
}
239+
226240
let outer_cycle = outer_cycle(zalsa, zalsa_local, &cycle_heads, database_key_index);
227241

228242
// Did the new result we got depend on our own provisional value, in a cycle?
@@ -265,25 +279,14 @@ where
265279
I am a cycle head, comparing last provisional value with new value"
266280
);
267281

268-
// determine if it is a nested query.
269-
// This is a nested query if it depends on any other cycle head than itself
270-
// where claiming it results in a cycle. In that case, both queries form a single connected component
271-
// that we can iterate together rather than having separate nested fixpoint iterations.
272-
273282
let this_converged = C::values_equal(&new_value, last_provisional_value);
274283

275284
iteration_count = if outer_cycle.is_some() {
276285
iteration_count
277286
} else {
278-
cycle_heads
279-
.iter()
280-
.map(|head| head.iteration_count.load())
281-
.max()
282-
.unwrap_or(iteration_count)
287+
max_iteration_count
283288
};
284289

285-
// If the new result is equal to the last provisional result, the cycle has
286-
// converged and we are done.
287290
if !this_converged {
288291
// We are in a cycle that hasn't converged; ask the user's
289292
// cycle-recovery function what to do:
@@ -295,60 +298,53 @@ where
295298
) {
296299
crate::CycleRecoveryAction::Iterate => {}
297300
crate::CycleRecoveryAction::Fallback(fallback_value) => {
298-
crate::tracing::debug!(
301+
tracing::debug!(
299302
"{database_key_index:?}: execute: user cycle_fn says to fall back"
300303
);
301304
new_value = fallback_value;
302305
}
303306
}
304307
}
305308

306-
completed_query
307-
.revisions
308-
.set_cycle_converged(this_converged);
309-
310309
if let Some(outer_cycle) = outer_cycle {
311310
tracing::debug!(
312311
"Detected nested cycle {database_key_index:?}, iterate it as part of the outer cycle {outer_cycle:?}"
313312
);
314313

315314
completed_query.revisions.set_cycle_heads(cycle_heads);
315+
// Store whether this cycle has converged, so that the outer cycle can check it.
316+
completed_query
317+
.revisions
318+
.set_cycle_converged(this_converged);
316319
claim_guard.set_release_mode(ReleaseMode::TransferTo(outer_cycle));
317320

318321
break (new_value, completed_query);
319322
}
320323

321-
// Verify that all cycles have converged, including all inner cycles.
324+
// Verify that this cycle and all inner cycles have converged.
322325
let converged = this_converged
323-
&& cycle_heads
324-
.iter()
325-
.filter(|head| head.database_key_index != database_key_index)
326-
.all(|head| {
327-
let ingredient =
328-
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
326+
&& cycle_heads.iter_skip_own(database_key_index).all(|head| {
327+
let ingredient =
328+
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
329329

330-
let converged =
331-
ingredient.cycle_converged(zalsa, head.database_key_index.key_index());
330+
let converged =
331+
ingredient.cycle_converged(zalsa, head.database_key_index.key_index());
332332

333-
if !converged {
334-
tracing::debug!("inner cycle {database_key_index:?} has not converged");
335-
}
333+
if !converged {
334+
tracing::debug!("inner cycle {database_key_index:?} has not converged");
335+
}
336336

337-
converged
338-
});
337+
converged
338+
});
339339

340340
if converged {
341-
crate::tracing::debug!(
342-
"{database_key_index:?}: execute: fixpoint iteration has a final value after {iteration_count:?} iterations"
343-
);
341+
tracing::debug!(
342+
"{database_key_index:?}: execute: fixpoint iteration has a final value after {iteration_count:?} iterations"
343+
);
344344

345345
// Set the nested cycles as verified. This is necessary because
346346
// `validate_provisional` doesn't follow cycle heads recursively (and the inner memos now depend on all cycle heads).
347-
for head in cycle_heads {
348-
if head.database_key_index == database_key_index {
349-
continue;
350-
}
351-
347+
for head in cycle_heads.iter_skip_own(database_key_index) {
352348
let ingredient =
353349
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
354350
ingredient.finalize_cycle_head(zalsa, head.database_key_index.key_index());
@@ -359,10 +355,7 @@ where
359355
break (new_value, completed_query);
360356
}
361357

362-
completed_query.revisions.set_cycle_heads(cycle_heads);
363-
364-
// `iteration_count` can't overflow as we check it against `MAX_ITERATIONS`
365-
// which is less than `u32::MAX`.
358+
// The fixpoint iteration hasn't converged. Iterate again...
366359
iteration_count = iteration_count.increment().unwrap_or_else(|| {
367360
::tracing::warn!("{database_key_index:?}: execute: too many cycle iterations");
368361
panic!("{database_key_index:?}: execute: too many cycle iterations")
@@ -379,31 +372,29 @@ where
379372
"{database_key_index:?}: execute: iterate again ({iteration_count:?})...",
380373
);
381374

382-
completed_query
383-
.revisions
384-
.update_iteration_count_mut(database_key_index, iteration_count);
385-
386-
for head in completed_query.revisions.cycle_heads() {
375+
// Update the iteration count of nested cycles
376+
for head in &cycle_heads {
387377
if head.database_key_index == database_key_index {
388378
continue;
389379
}
390380

391381
let ingredient =
392382
zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
393383

394-
// let iteration_count = if was_initial && !head.iteration_count.load().is_initial() {
395-
// IterationCount::first_after_restart()
396-
// } else {
397-
// iteration_count
398-
// };
399-
400384
ingredient.set_cycle_iteration_count(
401385
zalsa,
402386
head.database_key_index.key_index(),
403387
iteration_count,
404388
);
405389
}
406390

391+
// Update the iteration count of this cycle head, but only after restoring
392+
// the cycle heads array.
393+
completed_query.revisions.set_cycle_heads(cycle_heads);
394+
completed_query
395+
.revisions
396+
.update_iteration_count_mut(database_key_index, iteration_count);
397+
407398
let new_memo = self.insert_memo(
408399
zalsa,
409400
id,
@@ -527,8 +518,7 @@ fn outer_cycle(
527518
current_key: DatabaseKeyIndex,
528519
) -> Option<DatabaseKeyIndex> {
529520
cycle_heads
530-
.iter()
531-
.filter(|head| head.database_key_index != current_key)
521+
.iter_skip_own(current_key)
532522
.find(|head| {
533523
// SAFETY: We don't call into with_query_stack recursively
534524
let is_on_stack = unsafe {

src/zalsa_local.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,8 @@ struct QueryRevisionsExtraInner {
564564

565565
iteration: AtomicIterationCount,
566566

567+
/// Stores for nested cycle heads whether they've converged in the last iteration.
568+
/// This value is always `false` for other queries.
567569
cycle_converged: bool,
568570
}
569571

tests/cycle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,7 @@ fn cycle_unchanged_nested() {
920920
a.assert_value(&db, 59);
921921
b.assert_value(&db, 60);
922922

923-
db.assert_logs_len(15);
923+
db.assert_logs_len(13);
924924

925925
// next revision, we change only A, which is not part of the cycle and the cycle does not
926926
// depend on.

0 commit comments

Comments
 (0)