@@ -27,6 +27,7 @@ use futures::future::ready;
2727use futures:: future:: try_join_all;
2828use futures:: stream;
2929use itertools:: Itertools as _;
30+ use pollster:: FutureExt as _;
3031use thiserror:: Error ;
3132
3233use crate :: dag_walk;
@@ -161,7 +162,7 @@ async fn resolve_single_op(
161162 } ?;
162163 for ( i, c) in op_postfix. chars ( ) . enumerate ( ) {
163164 let mut neighbor_ops = match c {
164- '-' => operation. parents ( ) . try_collect ( ) ?,
165+ '-' => operation. parents ( ) . await ?,
165166 '+' => find_child_ops ( head_ops. as_ref ( ) . unwrap ( ) , operation. id ( ) ) . await ?,
166167 _ => unreachable ! ( ) ,
167168 } ;
@@ -276,7 +277,13 @@ pub fn walk_ancestors(
276277 stream:: iter ( dag_walk:: topo_order_reverse_lazy_ok (
277278 head_ops. into_iter ( ) . map ( Ok ) ,
278279 |OperationByEndTime ( op) | op. id ( ) . clone ( ) ,
279- |OperationByEndTime ( op) | op. parents ( ) . map_ok ( OperationByEndTime ) . collect_vec ( ) ,
280+ |OperationByEndTime ( op) | match op. parents ( ) . block_on ( ) {
281+ Ok ( parents) => parents
282+ . into_iter ( )
283+ . map ( |parent| Ok ( OperationByEndTime ( parent) ) )
284+ . collect_vec ( ) ,
285+ Err ( err) => vec ! [ Err ( err) ] ,
286+ } ,
280287 |_| panic ! ( "graph has cycle" ) ,
281288 ) )
282289 . map_ok ( |OperationByEndTime ( op) | op)
@@ -306,7 +313,13 @@ pub fn walk_ancestors_range(
306313 let trailing_iter = dag_walk:: topo_order_reverse_lazy_ok (
307314 start_ops. into_iter ( ) . map ( Ok ) ,
308315 |OperationByEndTime ( op) | op. id ( ) . clone ( ) ,
309- |OperationByEndTime ( op) | op. parents ( ) . map_ok ( OperationByEndTime ) . collect_vec ( ) ,
316+ |OperationByEndTime ( op) | match op. parents ( ) . block_on ( ) {
317+ Ok ( parents) => parents
318+ . into_iter ( )
319+ . map ( |op| Ok ( OperationByEndTime ( op) ) )
320+ . collect_vec ( ) ,
321+ Err ( err) => vec ! [ Err ( err) ] ,
322+ } ,
310323 |_| panic ! ( "graph has cycle" ) ,
311324 )
312325 . map_ok ( |OperationByEndTime ( op) | op) ;
@@ -320,7 +333,13 @@ fn collect_ancestors_until_roots(
320333 let sorted_ops = match dag_walk:: topo_order_reverse_chunked (
321334 start_ops,
322335 |OperationByEndTime ( op) | op. id ( ) . clone ( ) ,
323- |OperationByEndTime ( op) | op. parents ( ) . map_ok ( OperationByEndTime ) . collect_vec ( ) ,
336+ |OperationByEndTime ( op) | match op. parents ( ) . block_on ( ) {
337+ Ok ( parents) => parents
338+ . into_iter ( )
339+ . map ( |op| Ok ( OperationByEndTime ( op) ) )
340+ . collect_vec ( ) ,
341+ Err ( err) => vec ! [ Err ( err) ] ,
342+ } ,
324343 |_| panic ! ( "graph has cycle" ) ,
325344 ) {
326345 Ok ( sorted_ops) => sorted_ops,
0 commit comments