@@ -109,7 +109,7 @@ impl<I> WalkPredecessors<'_, I>
109109where
110110 I : Stream < Item = OpStoreResult < Operation > > + Unpin ,
111111{
112- fn try_next ( & mut self ) -> Result < Option < CommitEvolutionEntry > , WalkPredecessorsError > {
112+ async fn try_next ( & mut self ) -> Result < Option < CommitEvolutionEntry > , WalkPredecessorsError > {
113113 while !self . to_visit . is_empty ( ) && self . queued . is_empty ( ) {
114114 let Some ( op) = self . op_ancestors . next ( ) . block_on ( ) . transpose ( ) ? else {
115115 // Scanned all operations, no fallback needed.
@@ -120,16 +120,16 @@ where
120120 // There may be concurrent ops, but let's simply switch to the
121121 // legacy commit traversal. Operation history should be mostly
122122 // linear.
123- self . scan_commits ( ) ?;
123+ self . scan_commits ( ) . await ?;
124124 break ;
125125 }
126- self . visit_op ( & op) ?;
126+ self . visit_op ( & op) . await ?;
127127 }
128128 Ok ( self . queued . pop_front ( ) )
129129 }
130130
131131 /// Looks for predecessors within the given operation.
132- fn visit_op ( & mut self , op : & Operation ) -> Result < ( ) , WalkPredecessorsError > {
132+ async fn visit_op ( & mut self , op : & Operation ) -> Result < ( ) , WalkPredecessorsError > {
133133 let mut to_emit = Vec :: new ( ) ; // transitive edges should be short
134134 let mut has_dup = false ;
135135 let mut i = 0 ;
@@ -166,7 +166,7 @@ where
166166 async |& id| op. predecessors_for_commit ( id) . into_iter ( ) . flatten ( ) . map ( Ok ) ,
167167 |id| id, // Err(&CommitId) if graph has cycle
168168 )
169- . block_on ( )
169+ . await
170170 . map_err ( |id| WalkPredecessorsError :: CycleDetected ( id. clone ( ) ) ) ?;
171171 for & id in & sorted_ids {
172172 if op. predecessors_for_commit ( id) . is_some ( ) {
@@ -179,7 +179,7 @@ where
179179 }
180180
181181 /// Traverses predecessors from remainder commits.
182- fn scan_commits ( & mut self ) -> Result < ( ) , WalkPredecessorsError > {
182+ async fn scan_commits ( & mut self ) -> Result < ( ) , WalkPredecessorsError > {
183183 let store = self . repo . store ( ) ;
184184 let index = self . repo . index ( ) ;
185185 let mut commit_predecessors: HashMap < CommitId , Vec < CommitId > > = HashMap :: new ( ) ;
@@ -222,7 +222,7 @@ where
222222 } ,
223223 |_| panic ! ( "graph has cycle" ) ,
224224 )
225- . block_on ( ) ?;
225+ . await ?;
226226 self . queued . extend ( commits. into_iter ( ) . map ( |commit| {
227227 let predecessors = commit_predecessors
228228 . remove ( commit. id ( ) )
@@ -253,14 +253,15 @@ where
253253 }
254254}
255255
256+ // TODO: Convert to `Stream`.
256257impl < I > Iterator for WalkPredecessors < ' _ , I >
257258where
258259 I : Stream < Item = OpStoreResult < Operation > > + Unpin ,
259260{
260261 type Item = Result < CommitEvolutionEntry , WalkPredecessorsError > ;
261262
262263 fn next ( & mut self ) -> Option < Self :: Item > {
263- self . try_next ( ) . transpose ( )
264+ self . try_next ( ) . block_on ( ) . transpose ( )
264265 }
265266}
266267
@@ -271,7 +272,7 @@ where
271272/// between `old_ops` and `new_ops`. If `old_ops` and `new_ops` have ancestors
272273/// and descendants each other, or if criss-crossed merges exist between these
273274/// operations, the returned mapping would be lossy.
274- pub fn accumulate_predecessors (
275+ pub async fn accumulate_predecessors (
275276 new_ops : & [ Operation ] ,
276277 old_ops : & [ Operation ] ,
277278) -> Result < BTreeMap < CommitId , Vec < CommitId > > , WalkPredecessorsError > {
@@ -287,36 +288,38 @@ pub fn accumulate_predecessors(
287288 return Ok ( BTreeMap :: new ( ) ) ;
288289 } ;
289290 return resolve_transitive_edges ( map, map. keys ( ) )
291+ . await
290292 . map_err ( |id| WalkPredecessorsError :: CycleDetected ( id. clone ( ) ) ) ;
291293 }
292294
293295 // Follow reverse edges from the common ancestor to old_ops. Here we use
294296 // BTreeMap to stabilize order of the reversed edges.
295297 let mut accumulated = BTreeMap :: new ( ) ;
296298 let reverse_ops = op_walk:: walk_ancestors_range ( old_ops, new_ops) ;
297- if !try_collect_predecessors_into ( & mut accumulated, reverse_ops) ? {
299+ if !try_collect_predecessors_into ( & mut accumulated, reverse_ops) . await ? {
298300 return Ok ( BTreeMap :: new ( ) ) ;
299301 }
300302 let mut accumulated = reverse_edges ( accumulated) ;
301303 // Follow forward edges from new_ops to the common ancestor.
302304 let forward_ops = op_walk:: walk_ancestors_range ( new_ops, old_ops) ;
303- if !try_collect_predecessors_into ( & mut accumulated, forward_ops) ? {
305+ if !try_collect_predecessors_into ( & mut accumulated, forward_ops) . await ? {
304306 return Ok ( BTreeMap :: new ( ) ) ;
305307 }
306308 let new_commit_ids = new_ops
307309 . iter ( )
308310 . filter_map ( |op| op. store_operation ( ) . commit_predecessors . as_ref ( ) )
309311 . flat_map ( |map| map. keys ( ) ) ;
310312 resolve_transitive_edges ( & accumulated, new_commit_ids)
313+ . await
311314 . map_err ( |id| WalkPredecessorsError :: CycleDetected ( id. clone ( ) ) )
312315}
313316
314- fn try_collect_predecessors_into (
317+ async fn try_collect_predecessors_into (
315318 collected : & mut BTreeMap < CommitId , Vec < CommitId > > ,
316319 ops : impl Stream < Item = OpStoreResult < Operation > > ,
317320) -> OpStoreResult < bool > {
318321 let mut ops = pin ! ( ops) ;
319- while let Some ( op) = ops. next ( ) . block_on ( ) {
322+ while let Some ( op) = ops. next ( ) . await {
320323 let op = op?;
321324 let Some ( map) = & op. store_operation ( ) . commit_predecessors else {
322325 return Ok ( false ) ;
@@ -330,7 +333,7 @@ fn try_collect_predecessors_into(
330333/// Resolves transitive edges in `graph` starting from the `start` nodes,
331334/// returns new DAG. The returned DAG only includes edges reachable from the
332335/// `start` nodes.
333- fn resolve_transitive_edges < ' a : ' b , ' b > (
336+ async fn resolve_transitive_edges < ' a : ' b , ' b > (
334337 graph : & ' a BTreeMap < CommitId , Vec < CommitId > > ,
335338 start : impl IntoIterator < Item = & ' b CommitId > ,
336339) -> Result < BTreeMap < CommitId , Vec < CommitId > > , & ' b CommitId > {
@@ -341,7 +344,7 @@ fn resolve_transitive_edges<'a: 'b, 'b>(
341344 async |& id| graph. get ( id) . into_iter ( ) . flatten ( ) . map ( Ok ) ,
342345 |id| id, // Err(&CommitId) if graph has cycle
343346 )
344- . block_on ( ) ?;
347+ . await ?;
345348 for cur_id in sorted_ids {
346349 let Some ( neighbors) = graph. get ( cur_id) else {
347350 continue ;
0 commit comments