From 019959198608f328d8aadc611f0a402bae74a322 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Mon, 9 May 2022 17:49:51 +0200 Subject: [PATCH] query: also teard down on ctx done --- flatfs.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flatfs.go b/flatfs.go index b8d4ce3..fa9dc89 100644 --- a/flatfs.go +++ b/flatfs.go @@ -766,7 +766,7 @@ func (fs *Datastore) Query(ctx context.Context, q query.Query) (query.Results, e // to `Close`. b := query.NewResultBuilder(q) b.Process.Go(func(p goprocess.Process) { - err := fs.walkTopLevel(fs.path, b) + err := fs.walkTopLevel(ctx, fs.path, b) if err == nil { return } @@ -782,7 +782,7 @@ func (fs *Datastore) Query(ctx context.Context, q query.Query) (query.Results, e return query.NaiveQueryApply(q, b.Results()), nil } -func (fs *Datastore) walkTopLevel(path string, result *query.ResultBuilder) error { +func (fs *Datastore) walkTopLevel(ctx context.Context, path string, result *query.ResultBuilder) error { dir, err := os.Open(path) if err != nil { return err @@ -801,13 +801,15 @@ func (fs *Datastore) walkTopLevel(path string, result *query.ResultBuilder) erro continue } - err = fs.walk(filepath.Join(path, dir), result) + err = fs.walk(ctx, filepath.Join(path, dir), result) if err != nil { return err } // Are we closing? select { + case <-ctx.Done(): + return ctx.Err() case <-result.Process.Closing(): return nil default: @@ -1123,7 +1125,7 @@ func (fs *Datastore) tempFileOnce() (*os.File, error) { } // only call this on directories. -func (fs *Datastore) walk(path string, qrb *query.ResultBuilder) error { +func (fs *Datastore) walk(ctx context.Context, path string, qrb *query.ResultBuilder) error { dir, err := os.Open(path) if err != nil { if os.IsNotExist(err) { @@ -1174,6 +1176,8 @@ func (fs *Datastore) walk(path string, qrb *query.ResultBuilder) error { select { case qrb.Output <- result: + case <-ctx.Done(): + return ctx.Err() case <-qrb.Process.Closing(): return nil }