Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
REesultsBuilder waits for results goroutine to finish before closing …
…output channel
  • Loading branch information
gammazero committed Feb 20, 2025
commit 1d3418bc546f0fbac85a65dfa82c75c5fa109140
29 changes: 23 additions & 6 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package query
import (
"context"
"fmt"
"sync"
"time"
)

Expand Down Expand Up @@ -149,6 +150,7 @@ type Results interface {
NextSync() (Result, bool) // blocks and waits to return the next result, second parameter returns false when results are exhausted
Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once.
Close() // client may call Close to signal early exit
Done() <-chan struct{} // signals that Results is closed
}

// results implements Results
Expand Down Expand Up @@ -189,6 +191,10 @@ func (r *results) Query() Query {
return r.query
}

func (r *results) Done() <-chan struct{} {
return r.ctx.Done()
}

// ResultBuilder is what implementors use to construct results
// Implementors of datastores and their clients must respect the
// Process of the Request:
Expand All @@ -205,6 +211,7 @@ type ResultBuilder struct {

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

// Results returns a Results to to this builder.
Expand Down Expand Up @@ -232,6 +239,7 @@ func NewResultBuilder(q Query) *ResultBuilder {
}
b.ctx, b.cancel = context.WithCancel(context.Background())
context.AfterFunc(b.ctx, func() {
b.wg.Wait()
close(b.Output)
})
return b
Expand All @@ -240,8 +248,11 @@ func NewResultBuilder(q Query) *ResultBuilder {
// ResultsWithChan returns a Results object from a channel
// of Result entries.
//
// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it
// will leave anything trying to write to the result channel hanging.
// DEPRECATED: This iterator takes sepcial care to cancel correctly. Canceling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good chance to get rid of this method...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could get rid of it... or leave it now that it is technically fixed because callers can now select over Results.Done when writing to the results input channel.

// it will leave anything trying to write to the result channel hanging, unless
// that write can select the result channel and Results.Done(). This requires
// creating the result channel, calline ResultsWithChan, and then writing to
// the results channel.
func ResultsWithChan(q Query, res <-chan Result) Results {
return ResultsWithContext(q, func(ctx context.Context, out chan<- Result) {
for {
Expand All @@ -263,14 +274,15 @@ func ResultsWithChan(q Query, res <-chan Result) Results {
})
}

// ResultsWithCtxs returns a Results object with the results generated by the
// passed proc function called in a separate goroutine.
// ResultsWithContext returns a Results object with the results generated by
// the passed proc function called in a separate goroutine.
func ResultsWithContext(q Query, proc func(context.Context, chan<- Result)) Results {
b := NewResultBuilder(q)

b.wg.Add(1)
go func() {
defer b.cancel()
proc(b.ctx, b.Output)
b.cancel()
b.wg.Done()
}()

return b.Results()
Expand Down Expand Up @@ -381,6 +393,11 @@ func (r *resultsIter) Query() Query {
return r.query
}

func (r *resultsIter) Done() <-chan struct{} {
r.useLegacyResults()
return r.legacyResults.Done()
}

func (r *resultsIter) useLegacyResults() {
if r.legacyResults != nil {
return
Expand Down