Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
evolution: convert some functions to async
  • Loading branch information
bnjmnt4n committed Mar 7, 2026
commit 879589f9de106a08f9fc483fe2fa602dccdbefd2
7 changes: 4 additions & 3 deletions cli/src/commands/operation/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub async fn show_op_diff(
with_content_format: &LogContentFormat,
diff_renderer: Option<&DiffRenderer<'_>>,
) -> Result<(), CommandError> {
let changes = compute_operation_commits_diff(current_repo, from_repo, to_repo)?;
let changes = compute_operation_commits_diff(current_repo, from_repo, to_repo).await?;
if !changes.is_empty() {
let revset =
RevsetExpression::commits(changes.keys().cloned().collect()).evaluate(current_repo)?;
Expand Down Expand Up @@ -528,7 +528,7 @@ impl ModifiedChange {
/// Returns a map of [`ModifiedChange`]s containing the new and old commits. For
/// created/rewritten commits, the map entries are indexed by new ids. For
/// abandoned commits, the entries are indexed by old ids.
fn compute_operation_commits_diff(
async fn compute_operation_commits_diff(
repo: &dyn Repo,
from_repo: &ReadonlyRepo,
to_repo: &ReadonlyRepo,
Expand All @@ -542,7 +542,8 @@ fn compute_operation_commits_diff(
let predecessor_commits = accumulate_predecessors(
slice::from_ref(to_repo.operation()),
slice::from_ref(from_repo.operation()),
)?;
)
.await?;

// Collect hidden commits to find abandoned/rewritten changes.
let mut hidden_commits_by_change: HashMap<ChangeId, CommitId> = HashMap::new();
Expand Down
57 changes: 31 additions & 26 deletions lib/src/evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,27 @@ impl<I> WalkPredecessors<'_, I>
where
I: Stream<Item = OpStoreResult<Operation>> + Unpin,
{
fn try_next(&mut self) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
async fn try_next(&mut self) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
while !self.to_visit.is_empty() && self.queued.is_empty() {
let Some(op) = self.op_ancestors.next().block_on().transpose()? else {
let Some(op) = self.op_ancestors.next().await.transpose()? else {
// Scanned all operations, no fallback needed.
self.flush_commits()?;
self.flush_commits().await?;
break;
};
if !op.stores_commit_predecessors() {
// There may be concurrent ops, but let's simply switch to the
// legacy commit traversal. Operation history should be mostly
// linear.
self.scan_commits()?;
self.scan_commits().await?;
break;
}
self.visit_op(&op)?;
self.visit_op(&op).await?;
}
Ok(self.queued.pop_front())
}

/// Looks for predecessors within the given operation.
fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
async fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
let mut to_emit = Vec::new(); // transitive edges should be short
let mut has_dup = false;
let mut i = 0;
Expand All @@ -147,8 +147,8 @@ where
}

let store = self.repo.store();
let mut emit = |id: &CommitId| -> BackendResult<()> {
let commit = store.get_commit(id)?;
let mut emit = async |id: &CommitId| -> BackendResult<()> {
let commit = store.get_commit_async(id).await?;
self.queued.push_back(CommitEvolutionEntry {
commit,
operation: Some(op.clone()),
Expand All @@ -158,19 +158,19 @@ where
};
match &*to_emit {
[] => {}
[id] if !has_dup => emit(id)?,
[id] if !has_dup => emit(id).await?,
_ => {
let sorted_ids = dag_walk::topo_order_reverse_ok(
to_emit.iter().map(Ok),
|&id| id,
async |&id| op.predecessors_for_commit(id).into_iter().flatten().map(Ok),
|id| id, // Err(&CommitId) if graph has cycle
)
.block_on()
.await
.map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))?;
for &id in &sorted_ids {
if op.predecessors_for_commit(id).is_some() {
emit(id)?;
emit(id).await?;
}
}
}
Expand All @@ -179,16 +179,18 @@ where
}

/// Traverses predecessors from remainder commits.
fn scan_commits(&mut self) -> Result<(), WalkPredecessorsError> {
async fn scan_commits(&mut self) -> Result<(), WalkPredecessorsError> {
let store = self.repo.store();
let index = self.repo.index();
let mut commit_predecessors: HashMap<CommitId, Vec<CommitId>> = HashMap::new();
let commits = dag_walk::topo_order_reverse_ok(
self.to_visit.drain(..).map(|id| {
join_all(self.to_visit.drain(..).map(async |id| {
store
.get_commit(&id)
.get_commit_async(&id)
.await
.map_err(WalkPredecessorsError::Backend)
}),
}))
.await,
|commit: &Commit| commit.id().clone(),
async |commit: &Commit| {
let ids = match commit_predecessors.entry(commit.id().clone()) {
Expand Down Expand Up @@ -222,7 +224,7 @@ where
},
|_| panic!("graph has cycle"),
)
.block_on()?;
.await?;
self.queued.extend(commits.into_iter().map(|commit| {
let predecessors = commit_predecessors
.remove(commit.id())
Expand All @@ -237,10 +239,10 @@ where
}

/// Moves remainder commits to output queue.
fn flush_commits(&mut self) -> BackendResult<()> {
async fn flush_commits(&mut self) -> BackendResult<()> {
self.queued.reserve(self.to_visit.len());
for id in self.to_visit.drain(..) {
let commit = self.repo.store().get_commit(&id)?;
let commit = self.repo.store().get_commit_async(&id).await?;
self.queued.push_back(CommitEvolutionEntry {
commit,
operation: None,
Expand All @@ -253,14 +255,15 @@ where
}
}

// TODO: Convert to `Stream`.
impl<I> Iterator for WalkPredecessors<'_, I>
where
I: Stream<Item = OpStoreResult<Operation>> + Unpin,
{
type Item = Result<CommitEvolutionEntry, WalkPredecessorsError>;

fn next(&mut self) -> Option<Self::Item> {
self.try_next().transpose()
self.try_next().block_on().transpose()
}
}

Expand All @@ -271,7 +274,7 @@ where
/// between `old_ops` and `new_ops`. If `old_ops` and `new_ops` have ancestors
/// and descendants each other, or if criss-crossed merges exist between these
/// operations, the returned mapping would be lossy.
pub fn accumulate_predecessors(
pub async fn accumulate_predecessors(
new_ops: &[Operation],
old_ops: &[Operation],
) -> Result<BTreeMap<CommitId, Vec<CommitId>>, WalkPredecessorsError> {
Expand All @@ -287,36 +290,38 @@ pub fn accumulate_predecessors(
return Ok(BTreeMap::new());
};
return resolve_transitive_edges(map, map.keys())
.await
.map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()));
}

// Follow reverse edges from the common ancestor to old_ops. Here we use
// BTreeMap to stabilize order of the reversed edges.
let mut accumulated = BTreeMap::new();
let reverse_ops = op_walk::walk_ancestors_range(old_ops, new_ops);
if !try_collect_predecessors_into(&mut accumulated, reverse_ops)? {
if !try_collect_predecessors_into(&mut accumulated, reverse_ops).await? {
return Ok(BTreeMap::new());
}
let mut accumulated = reverse_edges(accumulated);
// Follow forward edges from new_ops to the common ancestor.
let forward_ops = op_walk::walk_ancestors_range(new_ops, old_ops);
if !try_collect_predecessors_into(&mut accumulated, forward_ops)? {
if !try_collect_predecessors_into(&mut accumulated, forward_ops).await? {
return Ok(BTreeMap::new());
}
let new_commit_ids = new_ops
.iter()
.filter_map(|op| op.store_operation().commit_predecessors.as_ref())
.flat_map(|map| map.keys());
resolve_transitive_edges(&accumulated, new_commit_ids)
.await
.map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))
}

fn try_collect_predecessors_into(
async fn try_collect_predecessors_into(
collected: &mut BTreeMap<CommitId, Vec<CommitId>>,
ops: impl Stream<Item = OpStoreResult<Operation>>,
) -> OpStoreResult<bool> {
let mut ops = pin!(ops);
while let Some(op) = ops.next().block_on() {
while let Some(op) = ops.next().await {
let op = op?;
let Some(map) = &op.store_operation().commit_predecessors else {
return Ok(false);
Expand All @@ -330,7 +335,7 @@ fn try_collect_predecessors_into(
/// Resolves transitive edges in `graph` starting from the `start` nodes,
/// returns new DAG. The returned DAG only includes edges reachable from the
/// `start` nodes.
fn resolve_transitive_edges<'a: 'b, 'b>(
async fn resolve_transitive_edges<'a: 'b, 'b>(
graph: &'a BTreeMap<CommitId, Vec<CommitId>>,
start: impl IntoIterator<Item = &'b CommitId>,
) -> Result<BTreeMap<CommitId, Vec<CommitId>>, &'b CommitId> {
Expand All @@ -341,7 +346,7 @@ fn resolve_transitive_edges<'a: 'b, 'b>(
async |&id| graph.get(id).into_iter().flatten().map(Ok),
|id| id, // Err(&CommitId) if graph has cycle
)
.block_on()?;
.await?;
for cur_id in sorted_ids {
let Some(neighbors) = graph.get(cur_id) else {
continue;
Expand Down
13 changes: 11 additions & 2 deletions lib/tests/test_evolution_predecessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,16 +541,21 @@ fn test_accumulate_predecessors() {
let repo_d = tx.commit("d").block_on().unwrap();

// Empty old/new ops
let predecessors = accumulate_predecessors(&[], slice::from_ref(repo_c.operation())).unwrap();
let predecessors = accumulate_predecessors(&[], slice::from_ref(repo_c.operation()))
.block_on()
.unwrap();
assert!(predecessors.is_empty());
let predecessors = accumulate_predecessors(slice::from_ref(repo_c.operation()), &[]).unwrap();
let predecessors = accumulate_predecessors(slice::from_ref(repo_c.operation()), &[])
.block_on()
.unwrap();
assert!(predecessors.is_empty());

// Empty range
let predecessors = accumulate_predecessors(
slice::from_ref(repo_c.operation()),
slice::from_ref(repo_c.operation()),
)
.block_on()
.unwrap();
assert!(predecessors.is_empty());

Expand All @@ -559,6 +564,7 @@ fn test_accumulate_predecessors() {
slice::from_ref(repo_c.operation()),
slice::from_ref(repo_b.operation()),
)
.block_on()
.unwrap();
assert_eq!(
predecessors,
Expand All @@ -574,6 +580,7 @@ fn test_accumulate_predecessors() {
slice::from_ref(repo_c.operation()),
slice::from_ref(repo_a.operation()),
)
.block_on()
.unwrap();
assert_eq!(
predecessors,
Expand All @@ -591,6 +598,7 @@ fn test_accumulate_predecessors() {
slice::from_ref(repo_a.operation()),
slice::from_ref(repo_c.operation()),
)
.block_on()
.unwrap();
assert_eq!(
predecessors,
Expand All @@ -609,6 +617,7 @@ fn test_accumulate_predecessors() {
slice::from_ref(repo_d.operation()),
slice::from_ref(repo_c.operation()),
)
.block_on()
.unwrap();
assert_eq!(
predecessors,
Expand Down